Spring Cloud Stream 消息驱动 RabbitMQ 基础使用

项目的快速搭建参照官方 Creating a Sample Application by Using Spring Initializr

RabbitMQ环境使用

RabbitMQ部署在DockerSwarm集群

加入依赖

      <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>

再选择Kafka或RabbitMQ

  • Kafka
  • RabbitMQ
    比如我选择RabbitMQ,那么我项目的pom
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>

消息处理

修改启动类

@SpringBootApplication
@EnableBinding(Sink.class)
public class RabbitmqStreamExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(RabbitmqStreamExampleApplication.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void handle(Person person) {
        System.out.println("Received: " + person);
    }

    public static class Person {
        private String name;
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public String toString() {
            return this.name;
        }
    }
}
  • @EnableBinding(Sink.class) 是绑定一个输入通道,Sink是提供的开箱即用的输入通道
  • @StreamListener(Sink.INPUT) 监听输入进来的消息

Sink的源码

public interface Sink {
    String INPUT = "input";

    @Input("input")
    SubscribableChannel input();
}

试试从RabbitMQ手动发消息

先启动项目,启动前配置一下rabbitmq连接

spring:
  application:
    name: rabbitmq-stream-example
  rabbitmq:
    host: 172.16.10.172
    port: 5672
    username: guest
    password: guest
server:
  port: 8080

启动项目
启动日志中有rabbitmq的连接及注册通道的信息

Initializing ExecutorService 'taskScheduler'
Registering MessageChannel input
Registering MessageChannel nullChannel
Registering MessageChannel errorChannel
Registering MessageHandler errorLogger
Channel 'rabbitmq-stream-example.input' has 1 subscriber(s).
Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
Channel 'rabbitmq-stream-example.errorChannel' has 1 subscriber(s).
started _org.springframework.integration.errorLogger
declaring queue for inbound: input.anonymous._RE-Zx6tQKWHDKGfc0NV9g, bound to: input
Attempting to connect to: [172.16.10.172:5672]
Created new connection: rabbitConnectionFactory#e72dba7:0/SimpleConnection@5f303ecd [delegate=amqp://guest@172.16.10.172:5672/, localPort= 55249]
Registering MessageChannel input.anonymous._RE-Zx6tQKWHDKGfc0NV9g.errors
Channel 'rabbitmq-stream-example.input.anonymous._RE-Zx6tQKWHDKGfc0NV9g.errors' has 1 subscriber(s).
Channel 'rabbitmq-stream-example.input.anonymous._RE-Zx6tQKWHDKGfc0NV9g.errors' has 2 subscriber(s).
started inbound.input.anonymous._RE-Zx6tQKWHDKGfc0NV9g
Started RabbitmqStreamExampleApplication in 2.376 seconds (JVM running for 3.764)

查看Rabbitmq的queue

input.anonymous._RE-Zx6tQKWHDKGfc0NV9g

手动发消息

{"name":"Sam Spade"}
发送消息

查看控制台,已接收到消息


控制台结果

应用模型

SCSt-with-binder.png

应用程序能过Spring Cloud Stream注入的input和output与外界的连通是通过Binder实现,Spring Cloud Stream 提供了KafkaRabbitMQ的Binder实现。

给消费者分组 spring.cloud.stream.bindings.<channelName>.group

举个例子,假如只有一个消息生产者和一个消费者,消息能正常处理,在微服中可能一个消费者会有多个实例,一个消息会被多个实例处理,这样就出现了消息重复的问题,给消费者分组之后,一个消费者的多个实例中只会有一个实例处理消息

spring:
  application:
    name: rabbitmq-stream-example
  rabbitmq:
    host: 172.16.10.172
    port: 5672
    username: guest
    password: guest
  cloud:
    stream:
      bindings:
        input:
          destination: mqtestDefault # 指定了消息获取的目的地,对应于MQ就是 exchange,这里的exchange就是 mqTestDefault
          group: user-channel
        output:
          destination: mqtestDefault
          contentType: text/plain

server:
  port: 8080

项目改造
启动类

@SpringBootApplication
@EnableBinding(Source.class)
public class RabbitmqStreamExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(RabbitmqStreamExampleApplication.class, args);
    }

}

创建一个消息监听 SinkMsgRecvicer

@EnableBinding(Sink.class)
public class SinkMsgRecvicer {

    private static Logger logger = LoggerFactory.getLogger(SinkMsgRecvicer.class);

    @StreamListener(Sink.INPUT)
    public void msg(String value) {
        logger.info("Recvicer : {}", value);
    }
}

写一个测试的TestController
需要增加web依赖

@RestController
public class TestController {

    @Autowired
    Source source;

    @RequestMapping("/send")
    public String send(String name) {
        source.output().send(MessageBuilder.withPayload("send to : " + name).build());

        return "发送成功 " + name;
    }
}

启动项目 访问 http://localhost:8080/send?name=liangwang
控制台会有输出

 Recvicer : send to : liangwang
RabbitMq Exchanges

待续。。。

源码 rabbitmq-stream-example
官方文档 Elmhurst.RELEASE

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,951评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,606评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,601评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,478评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,565评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,587评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,590评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,337评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,785评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,096评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,273评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,935评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,578评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,199评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,440评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,163评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,133评论 2 352

推荐阅读更多精彩内容