基于 JMS 标准的 Android MQTT 客户端

关键词

JMS、ActiveMQ(ActivityMQ)、Apollo、MQTT、Android

摘要

由于项目开发需要,涉及到 Android 客户端接收来自 JMS 中间件的消息推送,本文以学习过程为线索,进行记录。

目录

一、简述 JMS (引出 ActiveMQ、Apollo)
二、MQTT 协议
三、MQTT 服务器搭建
四、 MQTT Android 客户端具体实现
—— 1、添加依赖
—— 2、添加权限
—— 3、基本字段说明
—— 4、注册Service
—— 5、Android 端具体实现

正文

一、简述 JMS (引出 ActiveQM、Apollo)

企业消息系统(Java Message Service)
  • 又称之为面向消息的中间件(MOM),提供了应用程序之间,异步数据的存储和转发,即应用程序彼此不直接通信,而是与作为中介的 MOM 通信。应用程序开发人员无需了解消息的发送和协议的细节。

  • 过程如下图,应用程序 A 只需要将 Message 发送到服务器上,然后应用程序 B 即可从服务器中接收到 A 发来的消息,由此可知 JMS 具有提供消息灵活性,松散耦合等优点。

JMS通讯示意图
  • JMS 由 SUN 提出,是一系列的接口及相关语义的集合,通过这些接口和和其中的方法,JMS 客户端可以访问消息系统,完成消息的创建、发送、接收及读取。
  • JMS 通过 MOM 为 Java 程序提供了一个发送和接收消息的标准。根据 JMS 编写的客户端程序可以访问任何实现 JMS 标准的 MOM。
JMS两种消息模型:

发送消息的客户端:生产者, 接收消息的客户端:消费者, MOM :消息传递系统

点到点(P2P)消息传递模型
  • 生产者发送消息到MOM的一个特定队列(Queue)中,而消费者从一个消息队列中得到消息。
  • 每条消息只有一个消费者,如果一条消息被消息者接收,那么其他的消费者就不能得到这条消息了。
  • 收到消息后消费者必须确认消息已被接收,否则消息传递系统会认为该消息没有被接收,那么这条消息仍然可以被其他人接收(程序可以自动进行确认,不需要人工干预)。
发布/订阅(Pub/Sub)消息传递模型
  • 发布/订阅传递消息类型与主题(Topic)有关。生产者发布消息,而消费者订阅感兴趣的消息,生产者将消息和一个特定的主题(Topic)连在一起,消息传递系统根据消费者注册的兴趣,将消息传递给消费者。这种类型非常类似出版报纸、杂志的形式。
  • 每个消息都可以有多个(0,1,……)订阅者。即每条消息可以有多个消费者,如果报纸和杂志一样。
  • 订阅者只能消费他们订阅之后出版的消息。这就要求订阅者必须先订阅,再等待生产者的运行,发布。
  • 订阅者必须保持为活动状态才能获得这些消息,即订阅者必须保持活动状态等待发布者发布的消息。

关于 JMS 更详细的介绍,推荐阅读下面的贴,更加深入浅出,通俗易懂
深入浅出JMS(一)——JMS简介
深入浅出JMS(二)——JMS的组成

ActiveMQ (或ActivityMQ)

由 Apache 贡献的 ActiveMQ 正是 MOM 中优秀的一员。它是一个非常流行、强大、开源的消息和集成模式服务器,速度快、支持多种跨语言客户端和协议,易于使用企业集成模式,拥有许多先进的特性,完全支持 JMS 1.1和 J2EE 1.4 规范。

Apollo

是以 ActiveMQ5.x 为基础,采用全新的线程和消息调度架构重新实现的 MOM,针对多核处理器进行了优化处理,它的速度更快、更可靠、更易于维护。Apollo 与 ActiveMQ 一样支持多协议:STOMP、AMQP、MQTT、Openwire、 SSL、WebSockets,本文只介绍 MQTT 协议的 Android 客户端使用。

二、MQTT 协议

1、 Android 端实现消息推送的几种方式
  • 轮询:客户端定时向服务器请求数据,属于伪推送。缺点:费电,费流量。
  • XMPP:是基于可扩展标记语言(XML)的协议。缺点:XML 格式的,冗余很大,花费流量。
  • MQTT:由 IBM 开发的传输协议,它被设计用于轻量级的发布/订阅式消息传输,旨在为低带宽和不稳定的网络环境中的设备提供可靠的网络服务。相比于 XMPP 等传统协议,MQTT 是专门针对移动互联网开发的轻量级传输协议,这种传输协议连接稳定、心跳数据包小,所以具备耗电量低、耗流量低的优势。推送服务的最佳协议!
2、 MQTT 协议简介

MQTT官网:http://mqtt.org/
MQTT介绍:http://www.ibm.com
MQTT Android github:https://github.com/eclipse/paho.mqtt.android
MQTT API:http://www.eclipse.org/paho/files/javadoc/index.html
MQTT Android API: http://www.eclipse.org/paho/files/android-javadoc/index.html

3、 Apollo + MQTT 协议的消息推送特征
  • Apollo 使用 MQTT 协议,加上 Android 上的 paho 包,即可简单实现消息通知功能,但是 MQTT 协议仅支持主题消息广播(Topic),即发布/订阅消息传递模式,而且不能用Selector,不能直接实现点对点的消息投递。
  • 利用上述的特征,可将 Android 客户端划分为众多的部落,创建不同的主题, 针对特定订阅者群体进行消息传递。该特征特别适用于智能家居等物联网系统。

三、MQTT 服务器搭建

1、下载Apollo服务器,解压(免安装的)。
2、进入解压后文件夹的 bin 目录下(例: E:\MQTT\apache-apollo-1.7.1\bin),按住 Shift 键,点击鼠标右键选择 "在此处打开命令窗口";
3、在命令窗口,输入 apollo create 服务器实例名称(例:apollo create mybroker),之后会在 bin 目录下创建该名称的文件夹。该文件夹中, etc\apollo.xml 文件是配置服务器信息的文件。etc\users.properties 文件包含连接 MQTT 服务器时用到的用户名和密码,默认为 admin=password,即账号为admin,密码为 password,可自行更改。
4、在命令窗口,输入 cd xxx/bin, 进入该实例的bin目录下,执行 apollo-broker.cmd run 命令,开启服务器,看到如下界面代表搭建完成。

cmd 窗口

之后在浏览器输入 http://127.0.0.1:61680/,查看是否安装成功。

四、 MQTT Android 客户端具体实现

1、在 module 的 build.gradle 添加依赖

repositories {
    maven {
        url "https://repo.eclipse.org/content/repositories/paho-releases/"
    }
}

dependencies {
    compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
    compile 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.0'
}

2、添加权限

    <uses-permission android:name="android.permission.INTERNET" />
    <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
    <uses-permission android:name="android.permission.WAKE_LOCK" />

3、基本字段说明

topic:中文意思是“话题”。在 MQTT 中订阅了( subscribe )同一话题(topic)的客户端会同时收到消息推送。直接实现了“群聊”功能。
clientId:客户身份唯一标识。
qos:服务质量。
retained:要保留最后的断开连接信息。
MqttAndroidClient#subscribe():订阅某个话题。
MqttAndroidClient#publish(): 向某个话题发送消息,之后服务器会推送给所有订阅了此话题的客户。
userName:连接到MQTT服务器的用户名。
passWord :连接到MQTT服务器的密码。

4、注册Service

 <!-- Mqtt Service -->
        <service android:name="org.eclipse.paho.android.service.MqttService" />
        <service android:name="com.mqtt.demo.MQTTService"/>

5、Android 端具体实现

public class MQTTService extends Service {
    public static final String TAG = MQTTService.class.getSimpleName();
    private static MQTTService instance;
    private static MqttAndroidClient client;

    private MqttConnectOptions conOpt;
    private String host = "tcp://192.168.7.31:61613";
    private String userName = "admin";
    private String passWord = "password";
    private static String myTopic = "topic";
    private String clientId = "test2";

    @Override
    public void onCreate() {
        super.onCreate();
        instance = this;
    }

    @Override
    public int onStartCommand(Intent intent, int flags, int startId) {
        init();
        return super.onStartCommand(intent, flags, startId);
    }

    @Nullable
    @Override
    public IBinder onBind(Intent intent) {
        return null;
    }

    @Override
    public void onDestroy() {
        instance = null;
        try {
            client.disconnect();
            client.unregisterResources();

        } catch (MqttException e) {
            e.printStackTrace();
        }
        super.onDestroy();
    }

    public static void publish(String msg){
        String topic = myTopic;
        Integer qos = 0;
        Boolean retained = false;
        try {
            client.publish(topic, msg.getBytes(), qos.intValue(), retained.booleanValue());
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }


    // MQTT监听连接情况
    private IMqttActionListener iMqttActionListener = new ActionListener();

    // MQTT监听并且接受消息
    private MqttCallback mqttCallback = new CallBack();

    private void init() {
        clientId = MacAddressUtil.getLocalMacAddress(this);

        // 服务器地址(协议+地址+端口号)
        String uri = host;
        client = new MqttAndroidClient(this, uri, clientId);
        // 设置MQTT监听并且接受消息
        client.setCallback(mqttCallback);

        conOpt = new MqttConnectOptions();
        // 清除缓存
        conOpt.setCleanSession(true);
        // 设置超时时间,单位:秒
        conOpt.setConnectionTimeout(10);
        // 心跳包发送间隔,单位:秒
        conOpt.setKeepAliveInterval(20);
        // 用户名
        conOpt.setUserName(userName);
        // 密码
        conOpt.setPassword(passWord.toCharArray());

        // last will message
        boolean doConnect = true;
        String message = "{\"terminal_uid\":\"" + clientId + "\"}";
        String topic = myTopic;
        Integer qos = 0;
        Boolean retained = false;
        if (TextUtils.isEmpty(message) || TextUtils.isEmpty(topic)) {
            // 最后发送的消息
            try {
                conOpt.setWill(topic, message.getBytes(), qos.intValue(), retained.booleanValue());
            } catch (Exception e) {
                Log.i(TAG, "Exception Occured", e);
                doConnect = false;
                iMqttActionListener.onFailure(null, e);
            }
        }

        if (doConnect) {
            doClientConnection();
        }

    }

    /** 连接MQTT服务器 */
    private void doClientConnection() {
        if (!client.isConnected() && isConnectIsNomarl()) {
            try {
                client.connect(conOpt, null, iMqttActionListener);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    }

    /** 判断网络是否连接 */
    private boolean isConnectIsNomarl() {
        ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext().getSystemService(Context.CONNECTIVITY_SERVICE);
        NetworkInfo info = connectivityManager.getActiveNetworkInfo();
        if (info != null && info.isAvailable()) {
            String name = info.getTypeName();
            Log.i(TAG, "MQTT当前网络名称:" + name);
            return true;
        } else {
            Log.i(TAG, "MQTT 没有可用网络");
            return false;
        }
    }

    public static boolean isConnect(){
        if (instance != null && client != null){
            return instance.client.isConnected();
        }
        return false;
    }

    public static void connect(){
        if (instance != null && client != null){
            instance.doClientConnection();
        }
    }

    public static void disconnect(){
        if (instance != null){
            try {
                instance.client.disconnect();
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    }

    public static class ActionListener implements IMqttActionListener {

        @Override
        public void onSuccess(IMqttToken arg0) {
            Log.i(TAG, "连接成功 ");
            try {
                // 订阅myTopic话题
                client.subscribe(myTopic,1);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void onFailure(IMqttToken arg0, Throwable arg1) {
            arg1.printStackTrace();
            // 连接失败,重连
            Log.i("mtqq", "onFailure");
        }
    }


    public static class CallBack implements MqttCallback{

        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            String str1 = new String(message.getPayload());
            MQTTMessage msg = new MQTTMessage();
            msg.setMessage(str1);
            EventBus.getDefault().post(msg);
            String str2 = "topic:" + topic + ", qos:" + message.getQos() + ", retained:" + message.isRetained();
            Log.i(TAG, "messageArrived:" + str1);
            Log.i(TAG, str2);
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken arg0) {
            Log.i("mtqq", "deliveryComplete");
        }

        @Override
        public void connectionLost(Throwable arg0) {
            // 失去连接,重连
            String message = "null";
            if (arg0 != null) {
                message = arg0.getMessage();
            }
            Log.i("mtqq", "connectionLost:"+message);
        }
    }
}

5、手机显示

Paste_Image.png

6、服务端显示

打开服务端http://127.0.0.1:61680/,看到的是这个样子

serverPic

7、例子代码下载

https://github.com/jkjk66/AndroidMQTT.git

注:本文是通过搜集网络资源,项目实践后,整合撰写,转载请备注出处。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,634评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,951评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,427评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,770评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,835评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,799评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,768评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,544评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,979评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,271评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,427评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,121评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,756评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,375评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,579评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,410评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,315评论 2 352

推荐阅读更多精彩内容

  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 172,039评论 25 707
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,649评论 18 139
  • 写在前面 最近有需求要了解一下各个推送的协议,目前了解到实现推送的三个主要方式:MQTT、XMPP和Google ...
    xiasuhuei321阅读 5,258评论 3 9
  • 前言 MQTT是IBM开发的一个即时通讯协议,面向M2M和物联网的连接,采用轻量级发布和订阅消息传输机制,并且有可...
    阏男秀阅读 9,919评论 8 46
  • 笔点柔存,墨染心门。绘一纸、意暖情深。 微风雨后,雾霭清晨。 念弦中曲,诗中景,梦中人。 难寻缱绻,承藏盈梦。任浮...
    小薇_阅读 864评论 9 23