准备工作
案例中通过rabbitMQ作为消息中间件,完成SpringCloud Stream的案例。需要自行安装
消息生产者
(1)创建工程引入依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
(2)定义bingding
发送消息时需要定义一个接口,不同的是接口方法的返回对象是 MessageChannel,下面是 Spring Cloud Stream 内置的接口:
public interface Source {
String OUTPUT = "output";
@Output("output")
MessageChannel output();
}
这就接口声明了一个 binding 命名为 “output”。这个binding 声明了一个消息输出流,也就是消息的生 产者。
(3)配置application.yml
spring:
cloud:
stream:
bindings:
output:
destination: muziwk-default
contentType: text/plain
- contentType:用于指定消息的类型。具体可以参考 spring cloud stream docs
- destination:指定了消息发送的目的地,对应 RabbitMQ,会发送到 exchange 是 muziwk-default 的所有消息队列中。
(4)测试发送消息
@SpringBootApplication
@EnableBinding(Source.class)
public class Application implements CommandLineRunner {
@Autowired
@Qualifier("output")
MessageChannel output;
@Override
public void run(String... strings) throws Exception {
//发送MQ消息
output.send(MessageBuilder.withPayload("hello world").build());
}
public static void main(String[] args) {
SpringApplication.run(Application.class);
}
}
消息消费者
(1)创建工程引入依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
(2)定义bingding
同发送消息一致,在Spring Cloud Stream中接受消息,需要定义一个接口,如下是内置的一个接口。
public interface Sink {
String INPUT = "input";
@Input("input")
SubscribableChannel input();
}
注释 @Input 对应的方法,需要返回 SubscribableChannel ,并且参入一个参数值。 这就接口声明了一个 binding 命名为 “input” 。
(3)配置application.yml
spring:
cloud:
stream:
bindings:
input:
destination: muziwk-default
destination:指定了消息获取的目的地,对应于MQ就是 exchange,这里的exchange就是 muziwk-default
(4) 测试
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
// 监听 binding 为 Sink.INPUT 的消息
@StreamListener(Sink.INPUT)
public void input(Message<String> message) {
System.out.println("监听收到:" + message.getPayload());
}
public static void main(String[] args) {
SpringApplication.run(Application.class);
}
}
- 定义一个 class (这里直接在启动类),并且添加注解@EnableBinding(Sink.class) ,其中 Sink 就是上述的接口。同时定义一个方法(此处是 input)标明注解为 @StreamListener(Processor.INPUT),方法参数为 Message 。
- 所有发送 exchange 为“muziwk-default ” 的MQ消息都会被投递到这个临时队列,并且触发上述的方 法。