8、spring cloud stream

spring cloud stream 是一个构建消息驱动微服务的框架。

应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中binder 交互,通过我们配置来 binding ,而 Spring Cloud Stream 的 binder 负责与消息中间件交互。官方文档的架构图如下:

spring cloud stream

为什么使用spring cloud stream

比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange,kafka有Topic,partitions分区,这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。

spring cloud stream 中消息的发布和消费,有4个组件:

  • source
  • channel
  • binder
  • sink
  1. source
    当一个服务准备发布消息时,它将使用一个source发布消息,它接受普通的Java对象,该对象代表发布的消息,source将Java对象序列化并将消息发布到channel
  2. channel
    通道是队列的一种抽象,通道始终与目标队列名称关联,队列名称不会在代码中暴露,因此我们可以通过修改配置更改队列名称,而不是修改代码
  3. binder
    如spring cloud stream架构图那样,binder允许开发人员处理消息而不必依赖特定平台的api来发送和消费消息
  4. sink
    服务通过sink从队列中接受消息,将消息反序列化为pojo。

搭建消息生产和消费环境

  1. 搭建生产者
    首先添加maven依赖,这里我是用的是rabbitMQ,所以添加了rabbitMQ依赖
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>

接着在启动类上添加@EnableBinding(Source.class)注解,这个注解给我们绑定消息通道的,Source是Stream给我们提供的

@SpringBootApplication
@EnableDiscoveryClient
@EnableBinding(Source.class)
public class ProviderApplicatioin {

    public static void main(String[] args) {
        SpringApplication.run(ProviderApplicatioin.class, args);
    }
}

然后向消息代理发布消息,创建一个Java类,模拟发送

@Component
public class SimpleSourceBean {

    private final Logger LOGGER = Logger.getLogger(SimpleSourceBean.class);

    private Source source;

    @Autowired

    public SimpleSourceBean(Source source) {
        this.source = source;
    }

    public void publish() {
        Map<String, Object> map = Maps.newHashMap();
        List<String> list = Lists.newArrayList();

        list.add("test");
        map.put("A", list);
        map.put("B","123");

        source
                .output()
                .send(MessageBuilder.withPayload(map).build());
    }
}

这里代码较简单,使用source发送消息。

下一步配置application配置文件

spring:
  application:
    name: provider-server
  cloud:
    stream:
      bindings: # 服务的整合处理
        output: # 这个名字是一个通道的名称,在分析具体源代码的时候会进行说明
          destination: publish # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
#         binder: defaultRabbit # 设置要绑定的消息服务的具体设置
  rabbitmq:
    addresses: 192.168.33.10:5672
    username: guest
    password: guest
server:
  port: 8081

这里也是一些配置,配置binding和rabbit

最后配置一个controller,调用发送接口。

@RestController
public class ProviderController {

    private final Logger LOGGER = Logger.getLogger(ProviderController.class);


    @Autowired
    private SimpleSourceBean simpleSourceBean;

    /**
     * get方式接口
     * @param request 请求参数
     */
    @RequestMapping(value = "/provider", method = RequestMethod.GET)
    public String provider(@RequestParam String request) {
        LOGGER.info("========================================");
        LOGGER.info("provider service 被调用 ");
        LOGGER.info("========================================");

        LOGGER.info("========================================");
        simpleSourceBean.publish();
        LOGGER.info("发送消息");
        LOGGER.info("========================================");
        return "provider, " + request;
    }
}

直接一个get请求调用发送接口。

  1. 搭建消费者
    首先也是添加maven依赖
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>

然后配置sink,接受消息

@SpringBootApplication
@EnableDiscoveryClient
@EnableBinding(Sink.class)
public class ConsumerApplication {

    private final Logger LOGGER = Logger.getLogger(ConsumerApplication.class);

    @StreamListener(Sink.INPUT)
    public void sink(Map<String, Object> map) {
        LOGGER.info("=======================");
        LOGGER.info(map);
        LOGGER.info("=======================");
    }

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}

这里每次从input通道接收到消息,都会调用@StreamListener绑定的方法。

接着配置application配置文件

spring:
  application:
    name: consumer-server
  cloud:
    stream:
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称,在分析具体源代码的时候会进行说明
          destination: publish # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
          group: consumer-server

  rabbitmq:
    addresses: 192.168.33.10:5672
    username: guest
    password: guest


server:
  port: 8111

配置完成后,分别启动服务。调用发送消息接口,就可以看到消费端打印的日志:

2018-11-26 21:25:56.997  INFO 42320 --- [nsumer-server-1] c.h.w.spring.cloud.ConsumerApplication   : =======================
2018-11-26 21:25:56.997  INFO 42320 --- [nsumer-server-1] c.h.w.spring.cloud.ConsumerApplication   : {A=[test], B=123}
2018-11-26 21:25:56.997  INFO 42320 --- [nsumer-server-1] c.h.w.spring.cloud.ConsumerApplication   : =======================

这样使用spring cloud stream创建消息生成者和消费者就完成了。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,539评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,911评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,337评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,723评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,795评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,762评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,742评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,508评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,954评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,247评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,404评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,104评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,736评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,352评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,557评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,371评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,292评论 2 352

推荐阅读更多精彩内容