消息中间件Consumer通用化封装思路

上一篇讲到Producer的封装思路,比较的是AMQ和Kafka两个比较流行的中间件,这篇就不多啰嗦了,继续沿着上一篇的思路,将AMQ和Kafka尝试封装成对客户更简单更易用的接口。

拐一句,我有想过为什么不直接对外暴露官方的接口,也就是做通用化封装的意义何在?在一个一两个人的小项目里,如果MQ只是用于系统间解耦,当然没必要封装,直接拿来用即可。但是如果一个公司有成千上百个项目,大量项目都希望使用消息中间件,这时候最合适的方法就是对这些项目暴露一个简单易用的消息中间件的接口,由专门的团队提供消息平台的服务,让业务系统只需要简单的培训就可以使用消息服务,简化公司内重复的劳动。

说回正题,仍然从AMQ和Kafka的官方接口开始,consumer考虑统一只使用拉取(pull)的方式。

AMQ的接收一条消息的流程为:

  1. 建立连接工厂 ConnectionFactory
ConnectionFactory cf = new ActiveMQConnectionFactory("admin","admin",url);
  1. 通过连接工厂建立连接并启动 Connection
Connection conn = cf.createConnection();
conn.start
  1. 通过连接建立会话 Session
Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
  1. 通过会话建立目的地 Destination
Destination dest = session.createQueue("test");
  1. 在会话中指定目的地建立消费者Consumer
MessageConsumer consumer = session.createConsumer(dest)
  1. 使用Consumer收取一条消息
Message msg = consumer.receive(1000);

与生产者主要的区别就是从第五步以后的步骤,建立连接等方式都是一样的。一般来说,使用pull方式需要在外面套上一层while(true)的循环,连接保持长连接的方式,不停的从服务器端拉取消息。

Kafka的消息消费与一般消息中间件有所不同。

Kafka接受消息的流程为:
注:使用kafka 0.9版本后提供的新的consumer API

  1. 实例化一个Properties类
     Properties props = new Properties();
  1. 往Properties中填入bootstrap服务器,groupID等属性
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "1000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  1. 将Properties作为入参实例化一个KafkaConsumer
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  1. 设置consumer订阅的topic
     consumer.subscribe(Arrays.asList("foo", "bar"));
  1. 接收一条消息
     ConsumerRecords<String, String> records = consumer.poll(100);

简直感动,当初我以为要使用high level的consumer来设计了,要从流中获取消息啊,都不知道怎么封装了。没想到0.9版本以后简化了更易用的consumer版本,业界良心!

通用化封装

闲话少说,沿袭上一篇的思路,直接开始:

  1. 实例化一个MQClient对象
MQClient mc = new MQClient("mqclient.properties");

对应AMQ的1,2两步,Kafka的1,2两步
在实例化时,从mqclient.properties文件中读取出所有的配置,并建立连接。

  1. 建立一个消费者
MQConsumer consumer = client.getConsumer(destination);

对应AMQ的3,4,5步,Kafka的第3,4步
建立消费者时,需要指定目的地,因为很明显消费者和目的地应该是对应好的。destination应该支持组合的方式,不止是ActiveMQ支持组合目的地,kafka也同样支持。

  1. 获取一条消息
MQMessage(s) msg = consumer.receive(time);

对应AMQ的第6步,Kafka的第5步
AMQ和Kafka的API都支持传入时间,表示持续消费的意思。外面包上一层while()循环即可。获取到的MQMessage是一个我们自己写的类,在这个类中,如果是AMQ,需要将消息的类型(TextMessage,ObjectMessage)等标注清楚,如果是Kafka的类,需要注意的是获取到的是一个ConsumerRecords的类,里面可能包含多条消息。

我们可以在这里选择三种处理方式:

  • 将AMQ也改为获取到一组消息,MQMessages类中包含多个MQMessage。
  • 限制Kafka每次只能获取一条消息,继续使用MQMessage类。
  • MQMessage类中,为Kafka单独处理,对于AMQ来说,MQMessage获取到的是一条消息,Kafka获取到的是一组消息。

由于Kakfa的新API我还没用过,等到实际使用后我再来选择具体方案。

结合上一篇Producer的封装思路,消息中间件的面向客户的API封装思路基本理顺了,生下来的就是各种细节,需要仔细思考的有:

  1. 线程池,连接池的使用
  2. 快速安装broker
  3. 用户的权限管理,topic和queue的安全访问控制
  4. 定制化需求的实现
  5. ……

原来还有一堆事等着我啊。。。。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,837评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,551评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,417评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,448评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,524评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,554评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,569评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,316评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,766评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,077评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,240评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,912评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,560评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,176评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,425评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,114评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,114评论 2 352

推荐阅读更多精彩内容

  • 消息中间件就我目前接触过的主要有ActiveMQ,Kafka,RabbitMQ,IBM MQ,RocketMQ。目...
    MisterCH阅读 2,052评论 1 4
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,647评论 18 139
  • Kafka入门经典教程-Kafka-about云开发 http://www.aboutyun.com/threa...
    葡萄喃喃呓语阅读 10,822评论 4 54
  • 一、基本概念 介绍 Kafka是一个分布式的、可分区的、可复制的消息系统。它提供了普通消息系统的功能,但具有自己独...
    ITsupuerlady阅读 1,627评论 0 9
  • kafka的定义:是一个分布式消息系统,由LinkedIn使用Scala编写,用作LinkedIn的活动流(Act...
    时待吾阅读 5,314评论 1 15