概述:
每个 MQTT 控制报文都包含一个固定报头。固定报头由两个字节组成,第一个字节的高4位为报文类型,低4位为报文标志;第二个字节开始为剩余长度,最多4个字节,也就是说剩余长度最少占用一个字节,最多占用4个字节。由此可以看出, MQTT 报文最少就只有两个字节,确实是个很轻量级的协议了。
MQTT 规范:https://github.com/mcxiaoke/mqtt/blob/master/mqtt/02-ControlPacketFormat.md
代码实现:
在 paho 中关于 MQTT 报文的编码解码的实现主要是 MqttWireMessage 类及其子类。MqttWireMessage 是个抽象类,里面存在两个未实现的抽象方法
getMessageInfo()
和getVariableHeader()
,通过跟踪代码我们可以发现getHeader()
与createWireMessage(InputStream inputStream)
是编解码的核心实现方法。
// MqttWireMessage#getHeader
public byte[] getHeader() throws MqttException {
try {
// 这行代码主要是计算出固定报头的第一个字节,
int first = ((getType() & 0x0f) << 4) ^ (getMessageInfo() & 0x0f);
// 可变报头通过调用子类实现获取
byte[] varHeader = getVariableHeader();
// 剩余长度=可变报头长度+有效荷载的长度
int remLen = varHeader.length + getPayload().length;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
dos.writeByte(first);
// 这里调用了encodeMBI方法来编码剩余长度
dos.write(encodeMBI(remLen));
dos.write(varHeader);
dos.flush();
return baos.toByteArray();
} catch (IOException ioe) {
throw new MqttException(ioe);
}
}
// MqttWireMessage#createWireMessage
private static MqttWireMessage createWireMessage(InputStream inputStream) throws MqttException {
try {
CountingInputStream counter = new CountingInputStream(inputStream);
DataInputStream in = new DataInputStream(counter);
// 读取第一个字节,并解析
int first = in.readUnsignedByte();
// 报文类型
byte type = (byte) (first >> 4);
// 标记位,不同的报文会由不同的涵义,交给子类来处理
byte info = (byte) (first &= 0x0f);
// 这里调用了readMBI方法来解码剩余长度
long remLen = readMBI(in).getValue();
long totalToRead = counter.getCounter() + remLen;
MqttWireMessage result;
long remainder = totalToRead - counter.getCounter();
// 剩余有效数据,包括可变报头和有效荷载,交由子类处理
byte[] data = new byte[0];
// The remaining bytes must be the payload...
if (remainder > 0) {
data = new byte[(int) remainder];
// 读取剩余有效数据
in.readFully(data, 0, data.length);
}
// 根据类型构造不同的消息
if (type == MqttWireMessage.MESSAGE_TYPE_CONNECT) {
result = new MqttConnect(info, data);
} else if (type == MqttWireMessage.MESSAGE_TYPE_PUBLISH) {
result = new MqttPublish(info, data);
} else if (type == MqttWireMessage.MESSAGE_TYPE_PUBACK) {
result = new MqttPubAck(info, data);
} else if (type == MqttWireMessage.MESSAGE_TYPE_PUBCOMP) {
result = new MqttPubComp(info, data);
} else if (type == MqttWireMessage.MESSAGE_TYPE_CONNACK) {
result = new MqttConnack(info, data);
} else if (type == MqttWireMessage.MESSAGE_TYPE_PINGREQ) {
result = new MqttPingReq(info, data);
} else if (type == MqttWireMessage.MESSAGE_TYPE_PINGRESP) {
result = new MqttPingResp(info, data);
} else if (type == MqttWireMessage.MESSAGE_TYPE_SUBSCRIBE) {
result = new MqttSubscribe(info, data);
} else if (type == MqttWireMessage.MESSAGE_TYPE_SUBACK) {
result = new MqttSuback(info, data);
} else if (type == MqttWireMessage.MESSAGE_TYPE_UNSUBSCRIBE) {
result = new MqttUnsubscribe(info, data);
} else if (type == MqttWireMessage.MESSAGE_TYPE_UNSUBACK) {
result = new MqttUnsubAck(info, data);
} else if (type == MqttWireMessage.MESSAGE_TYPE_PUBREL) {
result = new MqttPubRel(info, data);
} else if (type == MqttWireMessage.MESSAGE_TYPE_PUBREC) {
result = new MqttPubRec(info, data);
} else if (type == MqttWireMessage.MESSAGE_TYPE_DISCONNECT) {
result = new MqttDisconnect(info, data);
} else {
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR);
}
return result;
} catch (IOException io) {
throw new MqttException(io);
}
}
// MqttWireMessage#encodeMBI
public static byte[] encodeMBI(long number) {
validateVariableByteInt((int) number);
int numBytes = 0;
long no = number;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
// Encode the remaining length fields in the four bytes
do {
byte digit = (byte) (no % 128);
no = no / 128;
if (no > 0) {
digit |= 0x80;
}
bos.write(digit);
numBytes++;
} while ((no > 0) && (numBytes < 4));
return bos.toByteArray();
}
// MqttWireMessage#readMBI
public static MultiByteInteger readMBI(DataInputStream in) throws IOException {
byte digit;
int msgLength = 0;
int multiplier = 1;
int count = 0;
do {
digit = in.readByte();
count++;
msgLength += ((digit & 0x7F) * multiplier);
multiplier *= 128; // multiplier <<= 7;
} while ((digit & 0x80) != 0);
if (msgLength < 0 || msgLength > VARIABLE_BYTE_INT_MAX) {
throw new IOException("This property must be a number between 0 and " + VARIABLE_BYTE_INT_MAX
+ ". Read value was: " + msgLength);
}
return new MultiByteInteger(msgLength, count);
}