本系列主要参考官网文档、芋道源码的源码解读和《深入理解Apache Dubbo与实战》一书。Dubbo版本为2.6.1。
本篇用以分析心跳、编码、解码相关的代码。
文章内容顺序:
1.心跳
1.1 为什么要有心跳?
1.2 HeaderExchangeClient
1.3 HeaderExchangeClient#startHeatbeatTimer
1.4 HeartBeatTask#run
2.编码
2.1 编码的链路:
2.2 消息头的字段的意义
2.3 ExchangeCodec
2.4 序列化的多种实现
3.解码
3.1 InternalDecoder#messageReceived
3.2 DubboCountCodec#decode
3.3 DubboCodec.decode(channel,buffer)
3.4 DubboCodec#decodeBody
3.5 DecodeableRpcInvocation#decode
3.6 解码的方法调用顺序
首先是心跳相关
1.心跳
在上一篇服务调用的消费端中,我们介绍到HeaderExchangeClient
的构造方法中会有心跳的一些逻辑,在那边一笔带过了,在这篇文章来详细看看。
1.1为什么要有心跳?
心跳间隔,对于长连接,当物理层断开时,比如拔网线,TCP的FIN消息来不及发送,对方收不到断开事件,此时需要心跳来帮助检查连接是否已断开
这里仍旧贴一下HeaderExchangeClient的构造方法
1.2HeaderExchangeClient
public HeaderExchangeClient(Client client, boolean needHeartbeat) {
if (client == null) {
throw new IllegalArgumentException("client == null");
}
this.client = client;
// 创建 HeaderExchangeChannel 对象
this.channel = new HeaderExchangeChannel(client);
// 以下代码均与心跳检测逻辑有关
String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
if (heartbeatTimeout < heartbeat * 2) {
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
}
if (needHeartbeat) {
// 开启心跳检测定时器
startHeartbeatTimer();
}
}
此方法最后调用 #startHeatbeatTimer()
方法,发起心跳定时器。直接来看startHeatbeatTimer()
方法的实现吧
1.3HeaderExchangeClient#startHeatbeatTimer
private void startHeatbeatTimer() {
// 停止原有定时任务
stopHeartbeatTimer();
// 发起新的定时任务
if (heartbeat > 0) {
heartbeatTimer = scheduled.scheduleWithFixedDelay(
new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
public Collection<Channel> getChannels() {
return Collections.<Channel>singletonList(HeaderExchangeClient.this);
}
}, heartbeat, heartbeatTimeout),
heartbeat, heartbeat, TimeUnit.MILLISECONDS);
}
}
可以看到这边就是直接调用线程池进行了一个定时任务HeartBeatTask
对象,scheduleWithFixedDelay
方法意为当当前任务执行完毕后再隔多少秒进行下一个任务。
接下来我们来看一下HeartBeatTask的实现。
1.4HeartBeatTask#run
public void run() {
try {
long now = System.currentTimeMillis();
for (Channel channel : channelProvider.getChannels()) {
if (channel.isClosed()) {
continue;
}
try {
Long lastRead = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_READ_TIMESTAMP);
Long lastWrite = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
// 最后读写的时间,任一超过心跳间隔,发送心跳
if ((lastRead != null && now - lastRead > heartbeat)
|| (lastWrite != null && now - lastWrite > heartbeat)) {
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true); // 需要响应
req.setEvent(Request.HEARTBEAT_EVENT);
channel.send(req);
if (logger.isDebugEnabled()) {
logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
+ ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms");
}
}
// 最后读的时间,超过心跳超时时间
if (lastRead != null && now - lastRead > heartbeatTimeout) {
logger.warn("Close channel " + channel
+ ", because heartbeat read idle time out: " + heartbeatTimeout + "ms");
// 客户端侧,重新连接服务端
if (channel instanceof Client) {
try {
((Client) channel).reconnect();
} catch (Exception e) {
//do nothing
}
// 服务端侧,关闭客户端连接
} else {
channel.close();
}
}
} catch (Throwable t) {
logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
}
}
} catch (Throwable t) {
logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t);
}
}
- 【任务一】:最后读或写的时间,任一超过心跳间隔 heartbeat ,发送心跳。
- 【任务二】:最后读的时间,超过心跳超时时间 heartbeatTimeout ,分成两种情况:
客户端侧,重连连接服务端。
服务端侧,关闭客户端连接。
至此,心跳的分析告一段落。
接下来来看看编码的操作。
2.编码
2.1编码的链路
看上面这张图,当运行到NettyChannel#send
的这一行后,就会跳进Netty
的执行逻辑,最后由NettyCodecAdapter
的内部类调用编码类执行编码操作
上图中继承的
OneToOneEncoder
是Netty
的抽象方法,那这个InternalEncoder
是什么时候传进来的呢?
在NettyClient#doOpen()
方法中有如下代码,会在编码器、解码器等一并设置进来,从而使得最后编码、解码逻辑能交由Dubbo
自己的类实现。
再贴一张编码的链路。
NettyChannel#send
->一系列Netty内部的方法
->NettyCodecAdapter内部类(继承了netty抽象类)#encode
->ExchangeCodec#encode
->ExchangeCodec#encodeRequest
可以看到最后是交由
ExchangeCodec
来执行编码的逻辑了。那么这个链路就简单介绍到这,直接来看他是怎么实现的编码吧。
2.2消息头的字段的意义
先简单列举一下消息头的内容,其中的魔数是用来分割处理粘包问题的。
接下来就直接来看看
ExchangeCodec
的实现
2.3 ExchangeCodec
public class ExchangeCodec extends TelnetCodec {
// 消息头长度
protected static final int HEADER_LENGTH = 16;
// 魔数内容
protected static final short MAGIC = (short) 0xdabb;
protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
protected static final byte FLAG_REQUEST = (byte) 0x80;
protected static final byte FLAG_TWOWAY = (byte) 0x40;
protected static final byte FLAG_EVENT = (byte) 0x20;
protected static final int SERIALIZATION_MASK = 0x1f;
private static final Logger logger = LoggerFactory.getLogger(ExchangeCodec.class);
public Short getMagicCode() {
return MAGIC;
}
@Override
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
if (msg instanceof Request) {
// 对 Request 对象进行编码
encodeRequest(channel, buffer, (Request) msg);
} else if (msg instanceof Response) {
// 对 Response 对象进行编码,后面分析
encodeResponse(channel, buffer, (Response) msg);
} else {
super.encode(channel, buffer, msg);
}
}
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
Serialization serialization = getSerialization(channel);
// 创建消息头字节数组,长度为 16
byte[] header = new byte[HEADER_LENGTH];
// 设置魔数
Bytes.short2bytes(MAGIC, header);
// 设置数据包类型(Request/Response)和序列化器编号
header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
// 设置通信方式(单向/双向)
if (req.isTwoWay()) {
header[2] |= FLAG_TWOWAY;
}
// 设置事件标识
if (req.isEvent()) {
header[2] |= FLAG_EVENT;
}
// 设置请求编号,8个字节,从第4个字节开始设置
Bytes.long2bytes(req.getId(), header, 4);
// 获取 buffer 当前的写位置
int savedWriteIndex = buffer.writerIndex();
// 更新 writerIndex,为消息头预留 16 个字节的空间
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
// 创建序列化器,比如 Hessian2ObjectOutput
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
if (req.isEvent()) {
// 对事件数据进行序列化操作
encodeEventData(channel, out, req.getData());
} else {
// 对请求数据进行序列化操作
encodeRequestData(channel, out, req.getData(), req.getVersion());
}
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}
bos.flush();
bos.close();
// 获取写入的字节数,也就是消息体长度
int len = bos.writtenBytes();
checkPayload(channel, len);
// 将消息体长度写入到消息头中
Bytes.int2bytes(len, header, 12);
// 将 buffer 指针移动到 savedWriteIndex,为写消息头做准备
buffer.writerIndex(savedWriteIndex);
// 从 savedWriteIndex 下标处写入消息头
buffer.writeBytes(header);
// 设置新的 writerIndex,writerIndex = 原写下标 + 消息头长度 + 消息体长度
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}
// 省略其他方法
}
以上就是请求对象的编码过程,该过程首先会通过位运算将消息头写入到 header
数组中。然后对 Request
对象的 data
字段执行序列化操作,序列化后的数据最终会存储到ChannelBuffer
中。序列化操作执行完后,可得到数据序列化后的长度 len
,紧接着将 len
写入到 header
指定位置处。最后再将消息头字节数组 header
写入到 ChannelBuffer
中,整个编码过程就结束了。
这里我们可以再来关注一下encodeEventData
方法(encodeRequestData
也是一样的实现。)
2.4 序列化的多种实现
image.png
通过一系列重载方法,我们可以看到最后调用了out.writeObeject
而这个out
,则是在encodeRequest
方法中通过url传过来的参数设置的,有多种不同的实现。
image.pngimage.png
有关序列化的协议,可以简单参照下这篇博文: Dubbo协议及序列化
说完了编码,再来说说解码。
同样的,还是从解码的链路开始说起
3.解码
3.1 InternalDecoder#messageReceived
NettyCodecAdapter
的内部类InternalDecoder#messageReceived
方法
同样的,这个类也与编码的类一样,都是通过netty的pipeline
设置进来的,上文已经介绍过了。
调用了DubboCountCodec#decode
方法
3.2 DubboCountCodec#decode
public final class DubboCountCodec implements Codec2 {
private DubboCodec codec = new DubboCodec();
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
// 记录当前读位置
int save = buffer.readerIndex();
// 创建 MultiMessage 对象
MultiMessage result = MultiMessage.create();
do {
// 解码
Object obj = codec.decode(channel, buffer);
// 输入不够,重置读进度
if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {
buffer.readerIndex(save);
break;
// 解析到消息
} else {
// 添加结果消息
result.addMessage(obj);
// 记录消息长度到隐式参数集合,用于 MonitorFilter 监控
logMessageLength(obj, buffer.readerIndex() - save);
// 记录当前读位置
save = buffer.readerIndex();
}
} while (true);
// 需要更多的输入
if (result.isEmpty()) {
return Codec2.DecodeResult.NEED_MORE_INPUT;
}
// 返回解析到的消息
if (result.size() == 1) {
return result.get(0);
}
return result;
}
//省略其他代码
}
这边的codec
指的就是DubboCodec
。调用的是DubboCodec.decode(channel,buffer)
3.3 DubboCodec.decode(channel,buffer)
public class ExchangeCodec extends TelnetCodec {
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
// 读取 Header 数组
int readable = buffer.readableBytes();
byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
buffer.readBytes(header);
// 解码
return decode(channel, buffer, readable, header);
}
@Override
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
// 非 Dubbo 协议,目前是 Telnet 命令。
// check magic number.
if (readable > 0 && header[0] != MAGIC_HIGH || readable > 1 && header[1] != MAGIC_LOW) {
// 将 buffer 完全复制到 `header` 数组中。因为,上面的 `#decode(channel, buffer)` 方法,可能未读全
int length = header.length;
if (header.length < readable) {
header = Bytes.copyOf(header, readable);
buffer.readBytes(header, length, readable - length);
}
// 【TODO 8026 】header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW ?
for (int i = 1; i < header.length - 1; i++) {
if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
buffer.readerIndex(buffer.readerIndex() - header.length + i);
header = Bytes.copyOf(header, i);
break;
}
}
// 提交给父类( Telnet ) 处理,目前是 Telnet 命令。
return super. decode(channel, buffer, readable, header);
}
// Header 长度不够,返回需要更多的输入
// check length.
if (readable < HEADER_LENGTH) {
return DecodeResult.NEED_MORE_INPUT;
}
// `[96 - 127]`:Body 的**长度**。通过该长度,读取 Body 。
// get data length.
int len = Bytes.bytes2int(header, 12);
checkPayload(channel, len);
// 总长度不够,返回需要更多的输入
int tt = len + HEADER_LENGTH;
if (readable < tt) {
return DecodeResult.NEED_MORE_INPUT;
}
// 解析 Header + Body
// limit input stream.
ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
try {
//子类重写的方法
return decodeBody(channel, is, header);
} finally {
// skip 未读完的流,并打印错误日志
if (is.available() > 0) {
try {
if (logger.isWarnEnabled()) {
logger.warn("Skip input stream " + is.available());
}
StreamUtils.skipUnusedStream(is);
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
}
//省略其他代码
}
这边对于再提一下,实际上这个方法是DubboCodec
里的方法,但是ExchangeCodec
是DubboCodec
的父类,并且在DubboCodec
没有重写这个方法,所以debug
会跳到父类的方法行(因为代码逻辑写在父类里)。
上面方法通过检测消息头中的魔数是否与规定的魔数相等,提前拦截掉非常规数据包,比如通过 telnet
命令行发出的数据包。接着再对消息体长度,以及可读字节数进行检测。最后调用 decodeBody
方法进行后续的解码工作。
注意在最后的try块中,会调用到DubboCodec的实现——DubboCodec#decodeBody
。注意,从头到尾我们调用的都是DubboCodec
类。
3.4 DubboCodec#decodeBody
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2];
// 获得 Serialization 对象
byte proto = (byte) (flag & SERIALIZATION_MASK);
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
// 获得请求||响应编号
// get request id.
long id = Bytes.bytes2long(header, 4);
// 解析响应
if ((flag & FLAG_REQUEST) == 0) {
// decode response.
Response res = new Response(id);
// 若是心跳事件,进行设置
if ((flag & FLAG_EVENT) != 0) {
res.setEvent(Response.HEARTBEAT_EVENT);
}
// 设置状态
// get status.
byte status = header[3];
res.setStatus(status);
// 正常响应状态
if (status == Response.OK) {
try {
Object data;
// 解码心跳事件
if (res.isHeartbeat()) {
data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
// 解码其它事件
} else if (res.isEvent()) {
data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
// 解码普通响应
} else {
DecodeableRpcResult result;
// 在通信框架(例如,Netty)的 IO 线程,解码
if (channel.getUrl().getParameter(Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
result = new DecodeableRpcResult(channel, res, is, (Invocation) getRequestData(id), proto);
result.decode();
// 在 Dubbo ThreadPool 线程,解码,使用 DecodeHandler
} else {
result = new DecodeableRpcResult(channel, res, new UnsafeByteArrayInputStream(readMessageData(is)), (Invocation) getRequestData(id), proto);
}
data = result;
}
// 设置结果
res.setResult(data);
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn("Decode response failed: " + t.getMessage(), t);
}
res.setStatus(Response.CLIENT_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
// 异常响应状态
} else {
res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF());
}
return res;
// 解析请求
} else {
// decode request.
Request req = new Request(id);
req.setVersion("2.0.0");
// 是否需要响应
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
// 若是心跳事件,进行设置
if ((flag & FLAG_EVENT) != 0) {
req.setEvent(Request.HEARTBEAT_EVENT);
}
try {
Object data;
// 解码心跳事件
if (req.isHeartbeat()) {
data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
// 解码其它事件
} else if (req.isEvent()) {
data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
// 解码普通请求
} else {
// 在通信框架(例如,Netty)的 IO 线程,解码
DecodeableRpcInvocation inv;
if (channel.getUrl().getParameter(Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
inv = new DecodeableRpcInvocation(channel, req, is, proto);
inv.decode();
// 在 Dubbo ThreadPool 线程,解码,使用 DecodeHandler
} else {
inv = new DecodeableRpcInvocation(channel, req, new UnsafeByteArrayInputStream(readMessageData(is)), proto);
}
data = inv;
}
req.setData(data);
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn("Decode request failed: " + t.getMessage(), t);
}
// bad request
req.setBroken(true);
req.setData(t);
}
return req;
}
}
如上,decodeBody
对部分字段进行了解码,并将解码得到的字段封装到 Request 中。随后会调用 DecodeableRpcInvocation#decode
方法进行后续的解码工作。
要么是在本线程内解码,要么是交由work
线程池执行,会在Dubbo的线程模型、handler
讲解如何交给其执行,又是怎么执行的。
再来看一下DecodeableRpcInvocation#decode
方法
3.5 DecodeableRpcInvocation#decode
public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Decodeable {
/**
* 是否已经解码完成
*/
private volatile boolean hasDecoded;
@Override
public void decode() {
if (!hasDecoded && channel != null && inputStream != null) {
try {
decode(channel, inputStream);
} catch (Throwable e) {
if (log.isWarnEnabled()) {
log.warn("Decode rpc invocation failed: " + e.getMessage(), e);
}
request.setBroken(true);
request.setData(e);
} finally {
hasDecoded = true;
}
}
}
@Override
public Object decode(Channel channel, InputStream input) throws IOException {
ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
.deserialize(channel.getUrl(), input);
// 通过反序列化得到 dubbo version,并保存到 attachments 变量中
String dubboVersion = in.readUTF();
request.setVersion(dubboVersion);
setAttachment(Constants.DUBBO_VERSION_KEY, dubboVersion);
// 通过反序列化得到 path,version,并保存到 attachments 变量中
setAttachment(Constants.PATH_KEY, in.readUTF());
setAttachment(Constants.VERSION_KEY, in.readUTF());
// 通过反序列化得到调用方法名
setMethodName(in.readUTF());
try {
Object[] args;
Class<?>[] pts;
// 通过反序列化得到参数类型字符串,比如 Ljava/lang/String;
String desc = in.readUTF();
if (desc.length() == 0) {
pts = DubboCodec.EMPTY_CLASS_ARRAY;
args = DubboCodec.EMPTY_OBJECT_ARRAY;
} else {
// 将 desc 解析为参数类型数组
pts = ReflectUtils.desc2classArray(desc);
args = new Object[pts.length];
for (int i = 0; i < args.length; i++) {
try {
// 解析运行时参数
args[i] = in.readObject(pts[i]);
} catch (Exception e) {
if (log.isWarnEnabled()) {
log.warn("Decode argument failed: " + e.getMessage(), e);
}
}
}
}
// 设置参数类型数组
setParameterTypes(pts);
// 通过反序列化得到原 attachment 的内容
Map<String, String> map = (Map<String, String>) in.readObject(Map.class);
if (map != null && map.size() > 0) {
Map<String, String> attachment = getAttachments();
if (attachment == null) {
attachment = new HashMap<String, String>();
}
// 将 map 与当前对象中的 attachment 集合进行融合
attachment.putAll(map);
setAttachments(attachment);
}
// 对 callback 类型的参数进行处理
for (int i = 0; i < args.length; i++) {
args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]);
}
// 设置参数列表
setArguments(args);
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read invocation data failed.", e));
} finally {
if (in instanceof Cleanable) {
((Cleanable) in).cleanup();
}
}
return this;
}
}
可以看到DecodeableRpcInvocation #decode
会先判断是否已经解码完成(这很重要,在交由Dubbo线程池执行的时候也会进到这个方法,如果已经解码过,就不进行下面的流程,如果已经没解码过,那么就会帮助执行解码操作),如果没有解码过,调用decode的重载方法。
重载方法通过反序列化将诸如 path
、version
、调用方法名、参数列表等信息依次解析出来,并设置到相应的字段中,最终得到一个具有完整调用信息的 DecodeableRpcInvocation
对象。
3.6解码的方法调用顺序
所以解码调用的顺序为:
NettyCodecAdapter的内部类InternalDecoder#messageReceived
->DubboCountCodec#decode
->DubboCodec#decode(channel,buffer)//父类实现的方法
->DubboCodec#decode(channel,buffer,readable,header)//父类实现的方法
->Dubbo#decodeBody
->DecodeableRpcInvocation#decode//交由本线程或者业务线程池执行