最近遇到开发任务,需要实现基于MQTT5的同步通信,在搜了一番资料后,有一篇博客(https://blog.csdn.net/pipiang/article/details/119060194)把实现原理讲的很透彻,
实现原理
原理这块大家可以去看下上面的博客,特别是文末的几篇链接,但是上文还没说明在集群部署的时候,server1 根据 topic1 pub完消息, server2 sub 到topic1的消息 再将响应结果 通过 topic2 pub 出去,怎么保证只有 server1 能sub 到server2 pub 到 topic2 的消息。
这块实现的原理可以看下阿里云物联网的相关文档 (https://help.aliyun.com/document_detail/90568.html)
通过通信相关的topic定义可以看出实现的原理,就是通过每次业务请求生成唯一messageId进行动态发布订阅。server1 根据 topic1+messageId, pub完消息,再订阅约定好的topic2+messageId; server2 sub 到topic1/# 的消息,解析出messageId 再将响应结果 通过约定的 topic2+messageId, pub 出去,这下就能保证 只有server1 能接收到返回消息了,server1处理完之后再取消订阅刚刚的topic2+messageId。
代码实现
在上文我引用的博客中,已经有对应的基于mqtt3实现的代码,但是还不能满足真正业务上的需求,而且不是基于mqtt5,所以不能使用mqtt5的一些新特性,我在参考了他的代码,自己再去看了org.eclipse.paho.mqttv5的部分代码,实现了基于mqtt5的同步通信
代码如下