项目背景
今年,相信很多人都会在各个商场或者是电影院中可以看到各种娃娃机、幸运盒子、口红挑战等等类似机器。在抖音上《口红挑战》这款机子也是火的一塌糊涂,你只要花 10 块钱就有可能赢走一个 YSL 口红,想想就觉得很有诱惑力。或许这些机器被程序员一瞧就知道其中的猫腻,但是这个机器瞄准的是那些容易冲动消费的消费者,比如情侣、女生、带小孩的大人;像程序员这么奇葩的生物,一般都是直接被无视的哈哈。
然后呢,我们公司就是为这些设备的正常运行提供解决方案的。因此才有我今天的爬坑总结,哈哈哈哈哈.....
我们提供的解决方案是这样的,在一个门店里面会包含如下的设备:娃娃机、口红挑战、排行榜、中控,当然其中还有我们的后台服务。那么首先我会先介绍一下整个系统的架构以及各个设备的职责:
- 系统架构图
- 服务后台
- 服务后台不属于 Android 这端负责的,因此不需要去关注太多,服务后台相对于门店设备而言,它的一个职责是负责给机器发送上分质量,监测设备状态,及一些其他的信息。
- 中控(本地服务:不连接外网):这个中控也是也 Android 设备,它的功能有两个:
- 排行榜:用来接收娃娃机中发送过来的用户夹中娃娃的信息,并最后将其显示在排行榜上。
- 资源分发中控:作为资源的分发中心,中控需要分发 apk 安装包、图片、
- 娃娃机:这一块其实包含了两个部分,一个是 Android 设备,另外一个硬件设备。
- Android 设备主要是用来显示 banner、处理服务器数据(例如:上分)、对接中控(资源更新、数据反馈等)、对接硬件设备
- 硬件设备主要是处理用户上分、出礼、心跳等信息,并将这些信息交给 Android 设备处理。
- 口红挑战:关于口红挑战这个设备可以划分为 3 个模块,分别为见缝插针游戏、常规程序模块、硬件模块
- 关于见缝插针的这个游戏是用白鹭引擎做的,最后以 h5 的形式嵌入到 APP 中,主要负责游戏的主逻辑以及和程序主模块进行游戏逻辑数据的反馈。
- 常规模块主要是有处理服务后台的数据、对接硬件模块(格子选中、打开格子等)、对接游戏(启动游戏、游戏结果反馈等)、物料后台管理。
- rocket:这个程序有点特殊,因为用户是看不到它的,在出厂的时候,这个程序就被写进去了,那么它负责的工作如下:
- 屏蔽设备的 systemui 程序和 launcher 程序,防止用户做一些非法的操作。
- 检测 U 盘是否插入,然后移动或者负责制定文件(apk 安装包、ipconfig.json 文件、三元组配资文件config.json、h5 资源文件等)
- 接收广播,自动安装或者更新娃娃机或者口红挑战程序
要解决的问题
同步加载资源
关于资源同步的,首先我们先理一下我们需要同步的资源有哪些,这些资源分别为: apk 安装包、图片、h5 相关的 index 资源。
资源更新的方式
关于更新的方式,这里其实就有一个比较坑的地方了,一开始的时候我们选择的资源更新方式比较傻,直接使用 websocket 进行资源更新的,一开始的时候只有一个设备进行连接,问题倒是不大,但是后来发现多台设备连接同时更新资源的时候问题特别大,连接经常断开,导致资源更新失败。那么这里是我遇到的第一个坑。发现这个坑之后呢,我的选择资源更新的方式就更改为:NanoHttpd。NanoHttpd 是一个开源库,是用 Java 实现的,它可以在 Android 设备上建立一个轻量级的 web server。其实在 Android 设备上创建一个轻量级的 web server 才是我们一开始就应该要选择的方向。为什么呢?首先 NanoHttpd 的使用是比较简单的,因此我们只需要几行代码就可以实现一个 web server 了;其次呢,NanoHttpd 是比较稳定的,相对于我们手动使用 websocket 去实现一个资源分发要稳定太多了。
那么在我们选择了资源的更新方式之后,有另外一个问题浮出水面了,关于服务器的 IP 地址。我们都知道,关于 Android 设备连接上移动互联网或者 WiFi 的时候都会被自动分配一个 IP 地址,因此这个 IP 地址是会变化的,我们的设备在每天晚上都会关机,然后在第二天开启重启的时候又会被分配到一个新的 IP 地址,因此服务器的 IP 地址是一直在变化的,所以这里我们需要做的是想办法把某个设备的 IP 地址给固定下来。那么接下来就来讲讲关于 NanoHttpd 创建轻量级的 web server 和如何解决 IP 变化的问题。
NanoHttpd 实现 web server
- NanoHttpd 项目地址
- gradle 依赖
implementation 'org.nanohttpd:nanohttpd-webserver:2.3.1'
- 实现方式
File resourceDir = new File(Environment.getExternalStorageDirectory(), "myRootDir");
SimpleWebServer httpServer = new SimpleWebServer(null, 18103, resourceDir, true, "*");
httpServer.start(NanoHTTPD.SOCKET_READ_TIMEOUT, true);
- SimpleWebServer 构造函数的参数
- host:服务器 ip 地址
- port:端口号(取值范围:1024~65535)
- wwwroot:放置静态资源的根目录
- quiet:是否为安静模式
- cors:
- 访问方式
- 在同个局域网下,那么我们在浏览器中输入地址:http://10.0.0.34:18103,我们就可以访问到我们服务器中的资源了,当然目前实现的服务器是静态的,只能处理 get 资源请求,不能够处理 post、put 等其他请求,目前是处理不了的,如果需要自己再处理 post 和 put 等其他的请求的话,那么可以自己去 项目原地址 中参考它的用法去实现,在这里就不多讲了。
解决 IP 变化的问题
在 Android 设备中,它的一个 IP 地址是会变化的,而且每个门店都会有一个自己的内部中控机,那么我们是必须要处理 IP 地址变化的这个问题的。我们的解决方案有如下两个步骤:
- 在路由器中根据 Mac 地址,为门店内的中控设备设置固定的 IP 地址
- 为每个娃娃机和口红挑战设备提供一个 IP 地址的配置文件,这个文件里面有门店中控的 IP 地址信息,放在 U 盘的指定目录下,但插入设备的时候,由 Rocket 程序将文件从 U 盘中将配置文件 copy 到设备的制定目录下,设备每次启动的时候都需要先读取配置文件,再连接本地的服务器。
资源什么时候更新
关于资源更新的,我们首先需要明确我们需要更新的资源有哪些以及我们需要更新的方式。
更新的资源
- Resource.json
- apk 包
- 娃娃机中的显示器轮播图
- 娃娃机中显示 banner 的 h5 资源
更新的配置文件
- 关于我们资源跟新的所有数据都是保存在 Resource.json 这个文件夹里面的,那么我们每隔 5min 就从中控服务端(局域网内)获取 Resource.json,然后每个类型的资源就根据写在 Resource.json 中的数据进行判断。那么写入 Resource.json 文件中的实现及具体内容如下:
- 资源的 ResList model
public class ResListModel { // 娃娃机 banner 的 h5 资源(index.html等文件) public HashMap<String, String> bannerFiles = new HashMap(); // 门店中所有娃娃机都会显示的轮播图 // key 为 图片的 hash 值 // value 为图片的在服务器中的相对路径 public HashMap<String, String> PublicFiles = new HashMap(); // 门店中特定娃娃机的私有显示轮播图 // key 为设备的 id // value 为图片图片的 hash 及路径信息(对应 PublicFiles) public HashMap<String, HashMap<String, String>> PrivateFiles = new HashMap(); // 更新的 apk 路径 public String UpdateApk; // 更新的 apk 包名 public String UpdateApkPackageName; // 更新的 apk 版本名 public String UpdateApkVersion; // 更新的 apk 版本号 public int UpdateApkVersionCode; }
- 写入到 Resourse.json 文件
ResListModel res = new ResListModel(); // 略过添加数据的过程 ...; File resourceFile = new File(baseDir, "Resource.json"); RandomAccessFile out = new RandomAccessFile(resourceFile, "rw"); byte[] json = JsonStream.serialize(res).getBytes("utf-8"); out.setLength(json.length); out.write(json); out.close();
- Resourse.json 的内容
{ "PrivateFiles":{}, "PublicFiles": { "1A7D3394A6F10D3668FB29D8CCA1CA8B":"Public/timg.jpg" }, "UpdateApk":null, "UpdateApkPackageName":null, "UpdateApkVersion":null, "UpdateApkVersionCode":0, "bannerFiles": { "C609D70832710E3DCF0FB88918113B18":"banner/Resource.json", "FC1CF2C83E898357E1AD60CEF87BE6EB":"banner/app.8113390c.js", "27FBF214DF1E66D0307B7F78FEB8266F":"banner/manifest.json", "A192A95BFF57FF326185543A27058DE5":"banner/index.html", "61469B10DBD17FDEEB14C35C730E03C7":"banner/app.8113390c.css" } }
资源图片和 banner 的资源文件的更新
- 关于图片和 banner 的资源文件的更新方式是类似的,只是存放的路径不在同一个目录下而已。那么对这类资源的更新,我们是通过技术资源的 hash 值和文件名来进行判断的。娃娃机或者口红挑战设备会每隔 5min 从中控中获取 Resourse.json 文件,然后取出 ResListModel,ResListModel 在之前介绍过了,是保存资源更新的配置文件;之后我们从中取出相对于的配置,首先根据文件名判断该文件是否已经存在本地了,如果不存在,则直接添加到资源更新的列表中,如果存在则再判断 hash 值是否相同,相同就不更新,不相同先将本地的文件删除,然后再将其就添加到更新资源的列表中。
- 图片和 banner 资源更新流程图:
- 中控中计算资源你的 hash 值
try {
// banner 资源文件
String fileName = fileFilter.getAbsolutePath().substring(baseDirLength);
RandomAccessFile randomAccessFile = new RandomAccessFile(fileFilter,"r");
byte[] buf = new byte[(int) randomAccessFile.length()];
randomAccessFile.read(buf);
randomAccessFile.close();
MessageDigest md5 = MessageDigest.getInstance("md5");
byte[] hash = md5.digest(buf);
String hashStr = ByteToHex(hash,0,hash.length);
res.bannerFiles.put(hashStr,fileName);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
// 字节转换为 16 进制
public static String ByteToHex(byte[] bt, int offset, int len) {
StringBuffer sb = new StringBuffer();
for (int i = offset; i < offset + len; i++) {
int tmp = bt[i] & 0xff;
String tmpStr = Integer.toHexString(tmp);
if (tmpStr.length() < 2)
sb.append("0");
sb.append(tmpStr);
}
return sb.toString().toUpperCase();
}
- 娃娃机设备检查更新(例如:banner 资源文件)
public static Observable<Boolean> updateBannerRes(ResListBean resListBean) throws IOException, NoSuchAlgorithmException {
// 获取远程 banner 的文件
HashMap<File, String> remoteFiles = new HashMap();
for (HashMap.Entry<String, String> entry : resListBean.bannerFiles.entrySet()) {
remoteFiles.put(new File(entry.getValue()), entry.getKey());
}
FileUtils.GetFilesInDir(bannerDir,localBannerList,null);
int baseDirLength = resDir.getAbsolutePath().length()+1;
// step1:删除本地文件(远程 banner 中没有的文件)
for (File localFile : localBannerList) {
File chileFile = new File(localFile.getAbsolutePath().substring(baseDirLength));
if (!remoteFiles.containsKey(chileFile)) {
MainActivity.appendAndScrollLog(String.format("删除 banner 资源文件 %s\n", localFile.getAbsolutePath()));
localFile.delete();
}
}
// 下载本地没有的文件
ArrayList<Observable<File>> taskList = new ArrayList();
for (Map.Entry<File, String> fileEntry : remoteFiles.entrySet()) {
File file = new File(resDir,fileEntry.getKey().getAbsolutePath());
// step2:本地中存在和远程相同的文件名
if (localBannerList.contains(file)) {
// step3:根据 hash 值判断是否为同一文件
String hashStr = FileUtils.getFileHashStr(file);
if (TextUtils.equals(hashStr,fileEntry.getValue())){
MainActivity.appendAndScrollLog(String.format("保留 banner 文件 %s\n", file.getAbsolutePath()));
taskList.add(Observable.just(file));
continue;
}
}
// step4:下载本地没有的文件
String url = new URL("http", Config.instance.centralServerAddress,
Config.instance.httpPort,
new File(BuildConfig.APPLICATION_ID, fileEntry.getKey().getAbsolutePath()).getAbsolutePath()).toString();
// step5:加入文件下载列表
taskList.add(DownLoadUtils.getDownLoadFile(url,file));
}
return Observable.concat(taskList)
.toFlowable(BackpressureStrategy.MISSING)
.parallel()
.runOn(Schedulers.io())
.sequential()
.toList()
.observeOn(Schedulers.computation())
.map(new Function<List<File>, ArrayList<File>>() {
@Override
public ArrayList<File> apply(List<File> files) throws Exception {
ArrayList<File> list = new ArrayList();
for (File file : files) {
if (!file.getAbsolutePath().isEmpty()) {
list.add(file);
}
}
if (list.size() > 0) {
if (!Utils.EqualCollection(list, localBannerList)) {
Collections.sort(list);
} else {
list.clear();
}
}
return list;
}
})
.observeOn(AndroidSchedulers.mainThread())
.map(new Function<ArrayList<File>, Boolean>() {
@Override
public Boolean apply(ArrayList<File> list) throws Exception {
if (list.size() > 0) {
localBannerList = list;
webViewHasLoad = false;
loadH5();
}
return true;
}
})
.observeOn(Schedulers.io())
.map(new Function<Boolean, Boolean>() {
@Override
public Boolean apply(Boolean aBoolean) throws Exception {
FileUtils.DelEmptyDir(resDir);
return true;
}
})
.toObservable();
}
程序升级的问题
关于程序的升级,相比较于图片资源的更新要简单许多。
- 我们的实现版本更新的步骤如下:
- step1:找出本地存在的 apk 文件(设备的中的 apk 都是制定路径和制定文件名的),将其删除。
- step2:判断中控中的安装包的版本号是否大于本地程序的版本号,如果是则进入 step3;否则忽略,不需要程序升级
- step3:下载最新版本的 apk 安装包
- step4:下载成功后,发送广播(action:包名;extra:apk文件路径)给 rocket 程序
- step5:rocket 程序接收到广播之后就升级程序
- 程序升级流程图
- 具体代码实现
public static Observable<Boolean> updateGame(ResListBean res) throws IOException, InterruptedException {
ArrayList<File> apkList = new ArrayList();
FileUtils.GetFilesInDir(resDir, apkList, new String[]{
".apk",
});
// 删除本地存在的 apk 包
for (File file : apkList) {
file.delete();
}
do {
if (res.UpdateApk == null || res.UpdateApkVersion == null) {
break;
}
// 判断是否需要升级
if (BuildConfig.VERSION_CODE >= res.UpdateApkVersionCode) {
break;
}
// apk 的 URL
final String url = new URL("http", Config.instance.centralServerAddress, Config.instance.httpPort, new File(BuildConfig.APPLICATION_ID, res.UpdateApk).getAbsolutePath()).toString();
MainActivity.appendAndScrollLog(String.format("下载升级文件 %s\n", url));
// 下载 apk 文件
return DownLoadUtils.getDownLoadFile(url,resDir.getAbsolutePath(),res.UpdateApk)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.flatMap(new Function<File, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(File file) throws Exception {
String path = file.getAbsolutePath();
MainActivity.appendAndScrollLog(String.format("升级文件下载完成 %s %s\n", path, url));
PackageManager pm = MainActivity.instance.getPackageManager();
PackageInfo pi = pm.getPackageArchiveInfo(path, 0);
if (pi == null) {
MainActivity.appendAndScrollLog(String.format("升级文件打开失败 %s\n", path));
return Observable.just("");
}
MainActivity.appendAndScrollLog(String.format("升级文件对比:Native(%s %s)/Remote(%s %s)\n", BuildConfig.APPLICATION_ID, BuildConfig.VERSION_NAME, pi.packageName, pi.versionName));
if (!BuildConfig.APPLICATION_ID.equals(pi.packageName)
|| BuildConfig.VERSION_CODE >= pi.versionCode) {
return Observable.just("");
}
return Observable.just(path);
}
})
.flatMap(new Function<String, Observable<Boolean>>() {
@Override
public Observable<Boolean> apply(String updateApk) throws Exception {
if (!updateApk.isEmpty()) {
Log.e(TAG, "等待游戏结束后安装升级文件...");
MainActivity.appendAndScrollLog("等待游戏结束后安装升级文件...\n");
synchronized (GamePlay.class) {//防止在游戏运行时更新版本
Log.e(TAG, "发布广播");
Intent intent = new Intent();
intent.setAction(Config.updateBroadcast);
intent.putExtra("apk", updateApk);
MainActivity.instance.sendBroadcast(intent);
System.exit(0);
}
}
return Observable.just(true);
}
});
} while (false);
return Observable.just(true);
}
资源文件下载
关于资源文件的下载,我是选择 okdownload。okdownload 是一个支持多线程,多任务,断点续传,可靠,灵活,高性能以及强大的下载引擎。详情可以去看 okdownload GitHub 地址
- 依赖方式
implementation 'com.liulishuo.okdownload:okdownload:1.0.5'
implementation 'com.liulishuo.okdownload:okhttp:1.0.5'
- 简单实用示例
单文件下载
DownloadTask task = new DownloadTask.Builder(url, parentFile)
.setFilename(filename)
// the minimal interval millisecond for callback progress
.setMinIntervalMillisCallbackProcess(30)
// do re-download even if the task has already been completed in the past.
.setPassIfAlreadyCompleted(false)
.build();
task.enqueue(listener);
// cancel
task.cancel();
// execute task synchronized
task.execute(listener);
多文件下载
final DownloadTask[] tasks = new DownloadTask[2];
tasks[0] = new DownloadTask.Builder("url1", "path", "filename1").build();
tasks[1] = new DownloadTask.Builder("url2", "path", "filename1").build();
DownloadTask.enqueue(tasks, listener);
- 结合 Rxjava 实现文件下载
public class DownLoadUtils {
/**
* 从中控下载文件到本地
* @param url
* @param parentPath 保存到本地文件的父文件路径
* @param downloadFileName 保存到本地的文件名
* @return
*/
public static Observable<File> getDownLoadFile(String url,String parentPath,String downloadFileName){
// 下载本地没有的文件
MainActivity.appendAndScrollLog(String.format("开始下载资源文件 %s\n", url));
final DownloadTask task = new DownloadTask.Builder(url, parentPath, downloadFileName).build();
return Observable.create(new ObservableOnSubscribe<File>() {
@Override
public void subscribe(final ObservableEmitter<File> emitter) throws Exception {
task.enqueue(new DownloadListener2() {
@Override
public void taskStart(DownloadTask task) {
}
@Override
public void taskEnd(DownloadTask task, EndCause cause, Exception realCause) {
if (cause != EndCause.COMPLETED) {
MainActivity.appendAndScrollLog(String.format("资源文件下载失败 %s %s\n", cause.toString(), task.getUrl()));
emitter.onNext(new File(""));
emitter.onComplete();
return;
}
File file = task.getFile();
MainActivity.appendAndScrollLog(String.format("资源文件下载完成 %s\n", file.getAbsolutePath()));
emitter.onNext(file);
emitter.onComplete();
}
});
}
}).retry();
}
/**
* 从中控下载文件到本地
* @param url
* @param saveFile 保存到本地的文件
* @return
*/
public static Observable<File> getDownLoadFile(String url, File saveFile){
return getDownLoadFile(url,saveFile.getParentFile().getAbsolutePath(),saveFile.getName());
}
}
屏蔽下拉菜单和底部导航栏
像娃娃机和格子机这些设备都是在线下直接面向用户的,因此我们不能将我们的 Android 设备全部都展现给我们的用户,我们需要对用户的行为做些限制,例如禁止用户通过导航栏或者下拉菜单退出当前程序,防止他们做出一些危险的操作。我的解决方案是把当前的 rocket 程序设置为默认启动和桌面应用程序,并将 Android 设备中自带的 launcher 程序 和 systemui 程序给禁用掉,那么设备一开始启动的时候就会启动我们的 rocket 应用,并成功的禁止了用户使用导航栏和下拉菜单来做非法的操作。
-
查找 Android 设备中自带的 launcher 程序 和 systemui 程序的对应包名
- 我们使用 adb shell pm list packages 就可以找出设备中已经安装的程序列表,主要是以包名显示的。
- 查找 launcher 程序的包名,找出包名为:com.android.launcher3
LW-PC0920@lw1002022 MINGW64 ~/Desktop $ adb shell pm list packages | grep launcher package:com.android.launcher3
- 查找 systemui 程序的包名:找出包名为:com.android.systemui
LW-PC0920@lw1002022 MINGW64 ~/Desktop $ adb shell pm list packages | grep systemui package:com.android.systemui
-
禁止 Android 设备中自带的 launcher 程序 和 systemui 程序的使用
- 禁止 launcher 程序的使用
adb shell pm disable com.android.launcher3
- 禁止 systemui 程序的使用
adb shell pm disable com.android.systemui
代码实现禁止 Android 设备中自带的 launcher 程序 和 systemui 程序的使用
public static void enableLauncher(Boolean enabled) {
List<PackageInfo> piList = MainActivity.instance.packageManager.getInstalledPackages(0);
ArrayList<String> packages = new ArrayList();
for (PackageInfo pi : piList) {
String name = pi.packageName;
if (name.contains("systemui") || name.contains("launcher")) {
packages.add(name);
}
}
for (String packageName : packages) {
su(String.format("pm %s %s\n", enabled ? "enable" : "disable", packageName));
}
}
/**
* 执行 adb 指令
*
*/
public static int su(String cmd) {
try {
Process p = Runtime.getRuntime().exec("su");
DataOutputStream os = new DataOutputStream(p.getOutputStream());
os.writeBytes(cmd);
os.writeBytes("exit\n");
os.flush();
os.close();
return p.waitFor();
} catch (Exception ex) {
return -1;
}
}
Iot 的实现
关于 IoT 的实现,我们这边使用的是阿里的《微消息队列 for IoT》服务,关于《微消息队列 for IoT》服务,阿里的解释如下:
微消息队列 for IoT 是消息队列(MQ)的子产品。针对用户在移动互联网以及物联网领域的存在的特殊消息传输需求,消息队列(MQ) 通过推出微消息队列 for IoT 开放了对 MQTT 协议的完整支持
- MQTT 协议?
- MQTT 的全称是:Message Queuing Telemetry Transport( 消息队列遥测传输),是一种轻量的,基于发布订阅模型的即时通讯协议。该协议设计开放,协议简单,平台支持丰富,几乎可以把所有联网物品和外部连接起来,因此在移动互联网和物联网领域拥有众多优势。
- MQTT 的特点
- 使用发布/订阅(Pub/Sub)消息模式,提供一对多的消息分发,解除了应用程序之间的耦合;
- 对负载内容屏蔽的消息传输;
- 使用 TCP/IP 提供基础的网络连接;
- 有三种级别的消息传递服务;
- 小型传输,开销很小(头部长度固定为 2 字节),协议交换最小化,以降低网络流量。
- 关键名词的解释
名词 解释 Parent Topic MQTT 协议基于 Pub/Sub 模型,因此任何消息都属于一个 Topic。根据 MQTT 协议,Topic 存在多级,定义第一级 Topic 为父 Topic(Parent Topic),使用 MQTT 前,该 Parent Topic 需要先在 MQ 控制台创建。 Subtopic MQTT 的二级 Topic,甚至三级 Topic 都是父 Topic 下的子类。使用时,直接在代码里设置,无需创建。需要注意的是 MQTT 限制 Parent Topic 和 Subtopic 的总长度为64个字符,如果超出长度限制将会导致客户端异常。 Client ID MQTT 的 Client ID 是每个客户端的唯一标识,要求全局唯一,使用相同的 Client ID 连接 MQTT 服务会被拒绝
Android 中实现 iot
关于显示 iot 连接的实现过程是这样的:首先我们将设备的三元组从管理后台中批量生成,文件名的格式为 deviceName.json(例如:00001.json),里面是关于每个设备的三元组信息;接着我们将装有三元组文件的 U 盘插入到 Android 设备中(娃娃机或者口红挑战);rocket 程序会自动监测到 U 盘的插入并将文件剪切到 Android 设备的制定目录下;再接着 Android 设备可以去读取指定文件中三元组信息;最后使用此三元组进行连接 mqtt。
- 添加依赖
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0'
-
关于三元组
- 在 Android 设备中需要关心的三个东西,mqtt 协议中用来识别一个设备的必要三要素,如果存在相同的三元组,那么必然出错,导致mqtt 频繁断开重连。三元组这个主要是在阿里的管理后台生成的,Android 设备这端只需要拿来用就可以了。
属性 用处 productKey 对应程序的 key,类似于 appid deviceName 对应上述的 Client ID,用来唯一识别一台 Android 设备的 deviceSecret 使用 HmacSHA1 算法计算签名字符串,并将签名字符串设置到 Password 参数中用于鉴权 -
关于订阅的 topic
- 关于 topic 是在阿里云的后台管理中进行设置的,我们的收发消息都是通过这些 topic 来进行的。
-
代码实现 iot 连接
- 剪切三元组配置文件
/** * 剪切配置文件(三元组) * @param packageName */ public static void moveConfig(String packageName) { File usbConfigDir = new File(UsbStorage.usbPath, Config.wejoyConfigDirInUsb); File extProjectDir = new File(Environment.getExternalStorageDirectory(), Config.resourceDirName); File extConfigFile = new File(extProjectDir, Config.wejoyConfigFileInSdcard); if (!usbConfigDir.exists() || extConfigFile.exists()) { return; } extProjectDir.mkdirs(); File[] configFiles = usbConfigDir.listFiles(); if (configFiles.length > 0) { Arrays.sort(configFiles); moveFile(configFiles[0], extConfigFile); } } public static void moveFile(File src, File dst) { su(String.format("mv -f %s %s\n", src.getAbsolutePath(), dst.getAbsolutePath())); }
- 读取指定路径的配置文件信息(三元组)
public static File configFile = new File(new File(Environment.getExternalStorageDirectory(), "WejoyRes"), "Config.json"); static void read() throws IOException { if (configFile.exists()) { RandomAccessFile in = new RandomAccessFile(configFile, "r"); byte[] buf = new byte[(int) configFile.length()]; in.read(buf); in.close(); instance = JsonIterator.deserialize(new String(buf, "utf-8"), Config.class); } else { instance = new Config(); } mqttRequestTopic = String.format("/sys/%s/%s/rrpc/request/", instance.productKey, instance.deviceName); mqttResponseTopic = String.format("/sys/%s/%s/rrpc/response/", instance.productKey, instance.deviceName); mqttPublishTopic = String.format("/%s/%s/update", instance.productKey, instance.deviceName); }
- 连接 mqtt
static void init() { instance = new IoT(); DeviceInfo deviceInfo = new DeviceInfo(); deviceInfo.productKey = Config.instance.productKey; deviceInfo.deviceName = Config.instance.deviceName; deviceInfo.deviceSecret = Config.instance.deviceSecret; final LinkKitInitParams params = new LinkKitInitParams(); params.deviceInfo = deviceInfo; params.connectConfig = new IoTApiClientConfig(); LinkKit.getInstance().registerOnPushListener(instance); initDisposable = Observable.interval(0, Config.instance.mqttConnectIntervalSeconds, TimeUnit.SECONDS) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.io()) .map(new Function<Long, Boolean>() { @Override public Boolean apply(Long aLong) throws Exception { if (!initialized) { LinkKit.getInstance().init(MainActivity.instance, params, instance); } return initialized; } }) .subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { if (aBoolean) { initDisposable.dispose(); } } }); }
-
发送消息:
发送消息的时候,我们需要指定 topic,否则服务器无法接收到我们的消息。
static void publish(String json) { Log.e(TAG, "publish: "+json ); MqttPublishRequest res = new MqttPublishRequest(); res.isRPC = false; res.topic = Config.mqttPublishTopic; res.payloadObj = json; LinkKit.getInstance().publish(res, new IConnectSendListener() { @Override public void onResponse(ARequest aRequest, AResponse aResponse) { } @Override public void onFailure(ARequest aRequest, AError aError) { } }); }
- 接收消息: 接收消息的时候,我们也需要判断是来自哪个 topic 中的,除了我们指定的 topic,其他的 topic 我们都不做处理;当我们接收到服务器中发送来的消息的时候,我们是先判断消息的类型,然后根据相对应的类型做出不同的反应。例如我们收到后台请求给娃娃机的上分的指令,那么我们就向设备中的硬件模块发送上分的指令,并等待设备反应并给后台发送一条响应信息。这条响应的消息是需要在指定的时间内完成,否则认为超时。
@Override public void onNotify(String s, final String topic, final AMessage aMessage) { if (!topic.startsWith(Config.mqttRequestTopic)) { return; } Observable.create(new ObservableOnSubscribe<MqttMessage>() { @Override public void subscribe(ObservableEmitter<MqttMessage> emitter) throws Exception { MqttMessage msg = JsonIterator.deserialize(new String((byte[]) aMessage.data, "utf-8"), MqttMessage.class); if (msg == null) { return; } emitter.onNext(msg); emitter.onComplete(); } }) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.io()) .flatMap(new Function<MqttMessage, ObservableSource<MqttMessage>>() { @Override public ObservableSource<MqttMessage> apply(MqttMessage msg) throws Exception { Log.e(TAG, "收到消息 key:"+msg.key+" msg:"+msg.body.m); switch (msg.key) { case "h": {// SetHeartBeatDownstream setHeartBeatDownstream = msg.body.m.as(SetHeartBeatDownstream.class); // 和设备进行通信,并等待设备的响应 return Device.setHeartBeat(setHeartBeatDownstream); } case "b": {// AddCoinsDownstream addCoinsDownstream = msg.body.m.as(AddCoinsDownstream.class); // 和设备进行通信,并等待设备的响应 return Device.addCoins(addCoinsDownstream); } case "g": {// // 和设备进行通信,并等待设备的响应 return Device.getParam(); } case "s": {// SetParamDownstream setParamDownstream = msg.body.m.as(SetParamDownstream.class); // 和设备进行通信,并等待设备的响应 return Device.setParam(setParamDownstream); } } return Observable.never(); } }) .observeOn(Schedulers.io()) .map(new Function<MqttMessage, Boolean>() { @Override public Boolean apply(MqttMessage msg) throws Exception { MqttPublishRequest res = new MqttPublishRequest(); res.isRPC = false; res.topic = topic.replace("request", "response"); //res.msgId = topic.split("/")[6]; res.payloadObj = JsonStream.serialize(msg); LinkKit.getInstance().publish(res, new IConnectSendListener() { @Override public void onResponse(ARequest aRequest, AResponse aResponse) { } @Override public void onFailure(ARequest aRequest, AError aError) { } }); return true; } }) .subscribe(); }
Android 和硬件通信
在娃娃机和口红挑战的这两个设备中,我们都需要和设备进行通信,例如:娃娃机投币、娃娃机出礼反馈、按下选中口红的格子等等这些都是需要和硬件模块进行通信的。在关于串口通信的框架选择方面,我们主要是选择 Google 的 android-serialport-api 来实现。项目原地址
-
依赖方式
- 在根build.gradle中添加
allprojects { repositories { ... maven { url 'https://jitpack.io' } } }
- 子module添加依赖
dependencies { implementation 'com.github.licheedev.Android-SerialPort-API:serialport:1.0.1' }
修改su路径
// su默认路径为 "/system/bin/su"
// 可通过此方法修改
SerialPort.setSuPath("/system/xbin/su");
- 连接方式
连接串口的时候需要指定串口号以及波特率,之后定时处理机器发送的指令。
static void init() throws IOException {
SerialPort.setSuPath("/system/xbin/su");
// 设置串口号及波特率
serialPort = new SerialPort(Config.serialPort, Config.baudrate);
// 接收指令流
inputStream = serialPort.getInputStream();
// 发送指令流
outputStream = serialPort.getOutputStream();
// 每隔 100ms 处理机器信息
Observable.interval(100, TimeUnit.MILLISECONDS)
.observeOn(serialScheduler)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// 处理机器发送的指令
handleRecv();
}
});
}
- 向机器发送指令
向机器发送指令的时候是结合 Rxjava 来实现的。除此之外,向机器发送指令是需要有规定格式的(内部制定的通信协议),我们发送及接收数据都是一个字节数组,因此我们格式是需要严格按照我们制定的协议进行的,如下是娃娃机投币的简单示例:
static ObservableSource<MqttMessage> addCoins(final AddCoinsDownstream msg) {
return Observable.create(new ObservableOnSubscribe<MqttMessage>() {
@Override
public void subscribe(ObservableEmitter<MqttMessage> emitter) throws Exception {
currentUser = msg.u;
currentHeadUrl = msg.h;
currentNickname = msg.nk;
byte[] buf = new byte[]{0x11, addCoinsCmd, msg.num, msg.c, 0, 0x00, 0x00};
byte[] ret = sign(buf);
try {
outputStream.write(ret);
} catch (IOException e) {
e.printStackTrace();
}
penddingCmd = addCoinsCmd;
penddingEmitter = emitter;
}
})
.subscribeOn(serialScheduler);
}
- 接收机器指令
关于接受机器消息这一块是每隔 100ms 进行的,在处理机器指令的时候,首先需要过滤到无效的字节,之后再按照我们制定的协议来处理消息,判断是娃娃机上分,还是游戏结果等信息,最后并对机器的数据返回进行 CRC16 校验。
static void handleRecv() {
try {
for (; ; ) {
int len = inputStream.available();
if (len <= 0) {
break;
}
len = inputStream.read(buf, bufReadOffset, buf.length - bufReadOffset);
//Log.d("serialPort", String.format("read: %s", byteToHex(buf, bufReadOffset, len)));
bufReadOffset += len;
for (; ; ) {
if (bufParseEnd == -1) {
for (; bufParseStart < bufReadOffset; bufParseStart++) {
if (buf[bufParseStart] == (byte) 0xAA) {
bufParseEnd = bufParseStart + 1;
break;
}
}
}
if (bufParseEnd != -1) {
for (; bufParseEnd < bufReadOffset; bufParseEnd++) {
if (buf[bufParseEnd] == (byte) 0xAA) {
bufParseStart = bufParseEnd;
bufParseEnd += 1;
continue;
}
if (buf[bufParseEnd] == (byte) 0xDD) {
if (bufParseEnd - bufParseStart >= 5) {
bufParseEnd += 1;
byte size = buf[bufParseStart + 1];
byte index = buf[bufParseStart + 2];
byte cmd = buf[bufParseStart + 3];
byte check = (byte) (size ^ index ^ cmd);
for (int i = bufParseStart + 4; i < bufParseEnd - 2; i++) {
check ^= buf[i];
}
if (check == buf[bufParseEnd - 2]) {
//Log.d("serialPort", String.format("protocol: %s, size: %d, index: %d, cmd: %d, check: %d, data: %s", byteToHex(buf, bufParseStart, bufParseEnd - bufParseStart), size, index, cmd, check, byteToHex(buf, bufParseStart + 4, size - 3)));
switch (cmd) {
// 心跳
case heartBeatCmd: {
}
break;
// 上分
case addCoinsCmd: {
}
break;
// 游戏结果
case gameResultCmd: {
boolean gift = buf[bufParseStart + 7] != 0;
IoT.sendGameResult(gift);
if (gift) {
// 发送用户信息到中控,进行排行榜显示
WSSender.getInstance().sendUserInfo(currentUser, currentHeadUrl, currentNickname);
}
}
break;
default:
break;
}
}
}
bufParseStart = bufParseEnd;
bufParseEnd = -1;
break;
}
}
}
if (bufParseStart >= bufReadOffset || bufParseEnd >= bufReadOffset) {
break;
}
}
if (bufReadOffset == buf.length) {
System.arraycopy(buf, bufParseStart, buf, 0, bufReadOffset - bufParseStart);
if (bufParseEnd != -1) {
bufParseEnd -= bufParseStart;
bufReadOffset = bufParseEnd;
} else {
bufReadOffset = 0;
}
bufParseStart = 0;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
websocket 通信
在中控和娃娃机进行通信的方式我们是选择 websocket 进行的。中控端是 server,然后娃娃机是 client。
server
- Server 的实现:目前 server 的实现只是为了接收娃娃机的数据反馈,所以并没有什么复杂的操作。
class WSServer extends WebSocketServer {
private MainActivity mainActivity;
public void setMainActivity(MainActivity mainActivity) {
this.mainActivity = mainActivity;
}
WSServer(InetSocketAddress address) {
super(address);
}
@Override
public void onOpen(WebSocket conn, ClientHandshake handshake) {
mainActivity.appendAndScrollLog("客户端:" + conn.getRemoteSocketAddress() + " 已连接\n");
}
@Override
public void onClose(WebSocket conn, int code, String reason, boolean remote) {
mainActivity.appendAndScrollLog("客户端:" + conn.getRemoteSocketAddress() + " 已断开\n");
}
@Override
public void onMessage(WebSocket conn, final String message) {
Observable.create(new ObservableOnSubscribe<SocketMessage>() {
@Override
public void subscribe(ObservableEmitter<SocketMessage> emitter) throws Exception {
final SocketMessage socketMessage = JsonIterator.deserialize(message, SocketMessage.class);
emitter.onNext(socketMessage);
emitter.onComplete();
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<SocketMessage>() {
@Override
public void accept(SocketMessage socketMessage) throws Exception {
if (socketMessage.getCode() == SocketMessage.TYPE_USER) {
// 夹到娃娃
} else if (socketMessage.getCode() == SocketMessage.TYPE_SAY_HELLO) {
// 连接招呼语
}
}
});
}
@Override
public void onError(WebSocket conn, Exception ex) {
}
@Override
public void onStart() {
}
}
- 简单使用方式
appendAndScrollLog("初始化WebSocket服务...\n");
WSServer wsServer = new WSServer(18104);
wsServer.setMainActivity(MainActivity.this);
wsServer.setConnectionLostTimeout(5);
wsServer.setReuseAddr(true);
wsServer.start();
appendAndScrollLog("初始化WebSocket服务完成\n");
client
在 client 端,目前需要做的人物有断开重连以及数据发送的操作。断开重连的时候需要在新的子线程中进行,否则会报如下错误:
You cannot initialize a reconnect out of the websocket thread. Use reconnect in another thread to insure a successful cleanup
因此,我们每次断开重新的时候是需要在新的子线程中进行的。除此之外,在发送数据的时候,如果刚好 socket 没有连接上,那么发送数据是会报异常的,因此我们有数据要发送的时候如果 socket 没有连接,那么就先缓存到本地,等到 socket 连接上之后再把滞留的数据一次性发送出去。
- 依赖配置
implementation 'org.java-websocket:Java-WebSocket:1.3.9'
- WSClient.java
class WSClient extends WebSocketClient {
private static final String TAG = "WSClient";
private static WSClient instance;
private static URI sUri;
private WSReceiver mWSReceiver;
private Disposable mReconnectDisposable;
private ConnectCallback mConnectCallback;
/**
* step 1:需要先调用,设置 url
* @param uri
*/
public static void setUri(URI uri){
sUri = uri;
}
/**
* step 1:
* 需要先调用,设置服务端的 url
* @param ipAddress
* @param port
*/
public static void setUri(String ipAddress,int port){
try {
sUri = new URI(String.format("ws://%s:%d", ipAddress, port));
} catch (URISyntaxException e) {
e.printStackTrace();
}
}
public static WSClient getInstance(){
if (instance == null) {
synchronized (WSClient.class){
if (instance == null) {
instance = new WSClient(sUri);
}
}
}
return instance;
}
/**
* step 2:连接 websocket
*/
public void onConnect(){
setConnectionLostTimeout(Config.instance.webSocketTimeoutSeconds);
setReuseAddr(true);
connect();
}
private WSClient(URI server) {
super(server);
// 初始化消息发送者
WSSender.getInstance().setWSClient(this);
// 初始化消息接收者
mWSReceiver = new WSReceiver();
mWSReceiver.setWSClient(this);
mWSReceiver.setWSSender(WSSender.getInstance());
}
@Override
public void onOpen(ServerHandshake handshakedata) {
Log.d(TAG, "onOpen: ");
MainActivity.appendAndScrollLog("websocket 已连接\n");
Observable.just("")
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
if (mConnectCallback != null) {
mConnectCallback.onWebsocketConnected();
}
}
});
// 清除滞留的所有消息
WSSender.getInstance().clearAllMessage();
}
@Override
public void onMessage(String message) {
Log.d(TAG, "onMessage: ");
mWSReceiver.handlerMessage(message);
}
@Override
public void onClose(int code, String reason, boolean remote) {
Log.d(TAG, "onClose: ");
MainActivity.appendAndScrollLog(String.format("websocket 已断开,断开原因:%s\n",reason));
Observable.just("")
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
if (mConnectCallback != null) {
mConnectCallback.onWebsocketClosed();
}
}
});
onReconnect();
}
@Override
public void onError(Exception ex) {
if (ex != null) {
Log.d(TAG, "onError: "+ex.getMessage());
MainActivity.appendAndScrollLog(String.format("websocket 出现错误,错误原因:%s\n",ex.getMessage()));
}
onReconnect();
}
public void onReconnect() {
if (mReconnectDisposable != null
&& !mReconnectDisposable.isDisposed()){
return;
}
mReconnectDisposable = Observable.timer(1, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG, "websocket reconnect");
WSClient.this.reconnect();
mReconnectDisposable.dispose();
}
});
}
public void setConnectCallback(ConnectCallback mConnectCallback) {
this.mConnectCallback = mConnectCallback;
}
public interface ConnectCallback{
void onWebsocketConnected();
void onWebsocketClosed();
}
}
- WSSender.java
/**
* Created by runla on 2018/10/26.
* 文件描述:Websocket 的消息发送者
*/
public class WSSender {
private static final String TAG = "WSSender";
public static final int MAX_MESSAGE_COUNT = 128;
private static WSSender instance;
private WSClient mWSClientManager;
// 消息队列
private LinkedList<String> mMessageList = new LinkedList<>();
private WSSender() {
}
public static WSSender getInstance() {
if (instance == null) {
synchronized (WSSender.class) {
if (instance == null) {
instance = new WSSender();
}
}
}
return instance;
}
public void setWSClient(WSClient wsClientManager) {
this.mWSClientManager = wsClientManager;
}
/**
* 发送所有滞留的消息
*/
public void clearAllMessage() {
if (mWSClientManager == null) {
return;
}
while (mMessageList.size() > 0
&& mMessageList.getFirst() != null) {
Log.d(TAG, "sendMessage: " + mMessageList.size());
mWSClientManager.send(mMessageList.getFirst());
mMessageList.removeFirst();
}
}
/**
* 发送消息,如果消息发送不出去,那么就等到连接成功后再次尝试发送
*
* @param msg
* @return
*/
public boolean sendMessage(String msg) {
if (mWSClientManager == null) {
throw new NullPointerException("websocket client is null");
}
if (TextUtils.isEmpty(msg)) {
return false;
}
// 将需要发送的数据添加到队列的尾部
mMessageList.addLast(msg);
while (mMessageList.size() > 0
&& mMessageList.getFirst() != null) {
Log.d(TAG, "sendMessage: " + mMessageList.size());
if (!mWSClientManager.isOpen()) {
// 尝试重连
mWSClientManager.onReconnect();
break;
} else {
mWSClientManager.send(mMessageList.getFirst());
mMessageList.removeFirst();
}
}
// 如果消息队列中超过我们设置的最大容量,那么移除最先添加进去的消息
if (mMessageList.size() >= MAX_MESSAGE_COUNT) {
mMessageList.removeFirst();
}
return false;
}
}
- WSReceiver.java
/**
* Created by runla on 2018/10/26.
* 文件描述:Websocket 的消息接收者
*/
public class WSReceiver {
private WSClient mWSClientManager;
private WSSender mWSSender;
private OnMessageCallback onMessageCallback;
public WSReceiver() {
}
public void setWSClient(WSClient mWSClientManager) {
this.mWSClientManager = mWSClientManager;
}
public void setWSSender(WSSender mWSSender) {
this.mWSSender = mWSSender;
}
/**
* 处理接收消息
* @param message
*/
public void handlerMessage(String message){
if (onMessageCallback != null){
onMessageCallback.onHandlerMessage(message);
}
}
public void setOnMessageCallback(OnMessageCallback onMessageCallback) {
this.onMessageCallback = onMessageCallback;
}
public interface OnMessageCallback{
void onHandlerMessage(String message);
}
}
- 连接调用
appendAndScrollLog("初始化WebSocket客户端...\n");
WSClient.setUri( Config.instance.centralServerAddress, Config.instance.webSocketPort);
WSClient.getInstance().onConnect();
WSClient.getInstance().setConnectCallback(MainActivity.this);
appendAndScrollLog("初始化WebSocket客户端完成\n");
- 数据发送
// 清除滞留的所有消息
WSSender.getInstance().clearAllMessage();
// 发送消息
WSSender.getInstance().sendMessage(msg);
数据库存储
在中控端,我们需要显示排行版,用来显示夹中娃娃机的用户在本月及本周夹中娃娃的排行,因此我们需要再中控端保存用户的夹中娃娃数量以及个人的其他信息,GreenDAO 是一款开源的面向 Android 的轻便、快捷的 ORM 框架,将 Java 对象映射到 SQLite 数据库中,我们操作数据库的时候,不在需要编写复杂的 SQL语句, 在性能方面,GreenDAO 针对 Android 进行了高度优化, 最小的内存开销 、依赖体积小,同时还是支持数据库加密。关于 GreenDAO 的用法我就不在这里做,具体的用法可以参考官网 GreenDAO。
写在最后
关于整个系统的架构搭建过程中遇到了好多坑,以上是我为这个项目提供的部分解决方案,当前全部的是不可能都放写出来的,此项目目前已经在西安和成都等地都有门店点了,据反馈,利润极大,不过这种类型的项目红利期不会太长,估计也是 2~3 年左右吧。如果有需要我们为 口红机开发 或者是 娃娃机开发 提供解决方案的,可以联系我们,目前我们在这个方面已经有相对较为成熟的解决方案了。