本文是Netty文集中“Netty 那些事儿”系列的文章。主要结合在开发实战中,我们遇到的一些“奇奇怪怪”的问题,以及如何正确且更好的使用Netty框架,并会对Netty中涉及的重要设计理念进行介绍。
本文是笔者和朋友(笔名:oojeek)一起讨论该问题的一个记录。文章以讨论过程中的思路来展现(也是我们解决问题的思路路线),因此可能会有些乱。再者,如果对Netty写数据流程不了解的朋友,可以先阅读Netty 源码解析 ——— writeAndFlush流程分析该篇文章,下面的讨论中会涉及不少这篇文章提及的概念。
问题
起因是这样的,朋友倒腾了个发送大数据包的demo,结果发现在发送大数据包时,写空闲超时事件被触发了。即便在设置了IdleStateHandler的observeOutput属性为true的情况下,依旧会发送在写一个大数据包的过程中,写空闲超时事件被触发。
先来简单看看朋友的demo,我们来看几个关键类
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addFirst("idleStateHandler", new IdleStateHandler(true,9, 2, 11, TimeUnit.SECONDS));
pipeline.addLast(new LengthFieldBasedFrameDecoder(ByteOrder.LITTLE_ENDIAN, Integer.MAX_VALUE,
0, 4, 0, 4, true));
pipeline.addLast(new LengthFieldPrepender(ByteOrder.LITTLE_ENDIAN, 4, 0, false));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new MyClientHandler());
}
}
我们定义了一个IdleStateHandler,并且设置了observeOutput属性为true(即,第一个参数),以及设置了写空闲超时时间为2秒(即,第二个参数)。
public class MyClientHandler extends SimpleChannelInboundHandler<String> {
private String tempString;
public MyClientHandler() {
StringBuilder builder = new StringBuilder();
for (int i = 0; i < 1024 * 1024; i++) {
builder.append("abcdefghijklmnopqrstuvwxyz");
}
tempString = builder.toString();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(LocalDateTime.now().toString() + "----" + ctx.channel().remoteAddress().toString() + "----" + msg.length());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
sendData(ctx);
}
private void sendData(ChannelHandlerContext ctx) {
if (!ctx.channel().isActive())
{
System.out.println("channel inactive...");
ctx.close();
return;
}
System.out.println("send a pack of data ...");
long tickCount = System.currentTimeMillis();
ChannelFuture future = ctx.writeAndFlush(tempString);
ChannelPromise promise = (ChannelPromise)future;
promise.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
System.out.println("send completed");
sendData(ctx);
}
});
System.out.println("Time elapse:" + (System.currentTimeMillis() - tickCount));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
//super.exceptionCaught(ctx, cause);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
//System.out.println(LocalDateTime.now().toString());
if (evt == IdleStateEvent.READER_IDLE_STATE_EVENT) {
System.out.println("READER_IDLE_STATE_EVENT");
} else if (evt == IdleStateEvent.WRITER_IDLE_STATE_EVENT){
// for heartbit
System.out.println("WRITER_IDLE_STATE_EVENT----" + LocalDateTime.now().toString());
//ctx.writeAndFlush("ACK");
} else if (evt == IdleStateEvent.ALL_IDLE_STATE_EVENT) {
//System.out.println("ALL_IDLE_STATE_EVENT");
} else if (evt == IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT) {
System.out.println("FIRST_READER_IDLE_STATE_EVENT");
} else if (evt == IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT) {
//System.out.println("FIRST_WRITER_IDLE_STATE_EVENT");
} else if (evt == IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT) {
//System.out.println("FIRST_ALL_IDLE_STATE_EVENT");
}
//super.userEventTriggered(ctx, evt);
}
}
这里,定义了一个27262976字节大小的tempString数据,用于发送。并实现了userEventTriggered方法,当写空闲超时事件发送时,会打印一条『"WRITER_IDLE_STATE_EVENT----" + LocalDateTime.now().toString()』信息。
然后启动程序,连接的服务端是朋友的腾讯云,服务器做了带宽限制,限制为1M,以重现问题。
运行程序的过程中,发现,当大数据包(即,27262976字节大小的tempString)在发送的过程中,写空闲超时不断的被触发调用。并且我们自定义handler中只发送了一个数据包,但到了底层却有两个数据包发送出去了。
然后就此情况我们开始了讨论。。。
寻找问题发送的根源
首先,IdleStateHandler的write操作确实确实只是将listener加到了write操作的listener集合中,write操作本身不会去修改lastWriteTime。
然后,我们晓得flush是一个出站操作,最终ChannelPipeline的head会对其进行处理。head底层会调用NioSocketChannel.doWrite()方法来将数据刷新到socket中。
doWrite()操作是一个写循环操作。第一次循环:
expectedWrittenBytes:27262980。这个字段表示本次flush操作我们希望写出去的数据大小,也就是之前我们write操作已经写入的数据。即:
ChannelFuture future = ctx.writeAndFlush(tempString);
public MyClientHandler() {
StringBuilder builder = new StringBuilder();
for (int i = 0; i < 1024 * 1024; i++) {
builder.append("abcdefghijklmnopqrstuvwxyz");
}
tempString = builder.toString();
}
为什么是2个待发送的ByteBuf了?
这和我们定义了『pipeline.addLast(new LengthFieldPrepender(ByteOrder.LITTLE_ENDIAN, 4, 0, false));』有关:
知道了为什么有2个待发送的ByteBuf,我们继续看doWrite()操作中写数据的操作:
最后,我们来看doWrite()操作中的『in.removeBytes(writtenBytes);』操作
『目前,我们可以先理解为,write操作的数据最终都会放到ChannelOutboundBuffer中,其中有两个属性private Entry unflushedEntry、private Entry flushedEntry。它们都是用Entry对象通过next指针来维护的一个单向链表。
unflushedEntry表示还未刷新的ByteBuf的链表头;flushedEntry表示调用flush()操作时将会进行刷新的ByteBuf的链表头。
在write的时候会将ByteBuf封装为一个Entry对象放到unflushedEntry的尾部。当调用flush时,就会将unflushedEntry赋值给flushedEntry,然后将unflushedEntry置null。
同时current()返回当前正在处理的Entry对象(Entry中封装了ByteBuf)』
到此为止,第一个ByteBuf,即记录着我们要发送消息长度大小的ByteBuf就发送出去了,并且触发了一次“IdleStateHandler 的 writeListener”的调用。
那么,第二个ByteBuf就是我们的大数据包了。通过上面的分析,我们知道大数据包走的是else流程。也就是说,本次真实写出去的数据 比 当前这个ByteBuf的可读取数据要小。也就说明,当前这个ByteBuf还没有被完全的写完。因此并不会通过调用『remove()』操作来触发“IdleStateHandler 的 writeListener”的回调。直到整个大数据包所有的内容都写出去了,那么这是if(readableBytes <= writtenBytes)才会为真,这是才会去触发“IdleStateHandler 的 writeListener”的回调。
也就是说,只有在一个ByteBuf的数据全部都写完了之后,才会去触发所有注册到这个write操作上的GenericFutureListener的回调。
netty其实有提供了一个ChannelProgressiveFuture来监控数据的发送过程,它可以实现在一个大数据发送的过程中回调注册到其上的ChannelProgressiveFutureListener,比如:
ChannelProgressivePromise progressivePromise = ctx.channel().newProgressivePromise();
progressivePromise.addListener(new ChannelProgressiveFutureListener(){
@Override
public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) throws Exception {
System.out.println("数据正在发送中。。。");
}
@Override
public void operationComplete(ChannelProgressiveFuture future) throws Exception {
System.out.println("数据已经发送完了!");
}
});
ctx.writeAndFlush(tempString, progressivePromise);
最后。说明下,当将大数据包拆成一个个小包发送时,为什么不会导致写空闲超时的触发。
因为当大数据包被拆分成一个个小包发送时,每个小数据包就是一个ByteBuf,每个ByteBuf待写出的数据量就很小,比如本例中,我一个ByteBuf就是一个长度为26的英文字符串,那么每次写操作完成后在removeBytes()操作:
到目前为止,我们已经知道导致写空闲超时的原因所在了。这时我们可以想到的解决方案有:
① 用变量来记录是否正在发送中,如果在发送中,即使写空闲超时被触发也不发送心跳
② 将打包拆分成小包的方式
更进一步
但是,我们还有一个疑惑未解决,那就是IdleStateHandler类中observeOutput属性到底是干啥用的?
我们先来看看observeOutput属性在IdleStateHandler中的使用:
首先在doc文档中,对observeOutput属性的描述是“在访问写空闲超时时,字节消费是否会被考虑进去,默认为false”,也就是说,当字节被消费时,写空闲超时事件否非该被触发。
从上文,我们已经得知,只有在每次真正写完一个Bytebuf后,该ByteBuf的异步写操作才算是完成,那么才会去触发该异步写操作上的listener,也就是这是才会修改IdleStateHandler的lastWriteTime属性。
起初,我们以为如果将“observeOutput”属性设置为true,那么即使ByteBuf包没有被完全写完,但是已经有字节数据在被写出了,那么此时也不应该触发写空闲超时事件。但,结果却是写空闲超时事件依旧被触发了。这是为什么了?
private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
if (observeOutput) {
// We can take this shortcut if the ChannelPromises that got passed into write()
// appear to complete. It indicates "change" on message level and we simply assume
// that there's change happening on byte level. If the user doesn't observe channel
// writability events then they'll eventually OOME and there's clearly a different
// problem and idleness is least of their concerns.
if (lastChangeCheckTimeStamp != lastWriteTime) {
lastChangeCheckTimeStamp = lastWriteTime;
// But this applies only if it's the non-first call.
if (!first) {
return true;
}
}
Channel channel = ctx.channel();
Unsafe unsafe = channel.unsafe();
ChannelOutboundBuffer buf = unsafe.outboundBuffer();
if (buf != null) {
int messageHashCode = System.identityHashCode(buf.current());
long pendingWriteBytes = buf.totalPendingWriteBytes();
if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
lastMessageHashCode = messageHashCode;
lastPendingWriteBytes = pendingWriteBytes;
if (!first) {
return true;
}
}
}
}
return false;
}
这里“observeOutput”为true情况下,主要会根据三对数值的比较情况来觉得输出是否有改变,① lastChangeCheckTimeStamp 与 lastWriteTime;② messageHashCode 与 lastMessageHashCode;③ pendingWriteBytes 与 lastPendingWriteBytes;
① 和 ② 都好理解,最让我们困惑的是③,也就是说,pendingWriteBytes属性并未像我们猜测的那样随着ByteBuf中的数据的写出而改变。 这又是为什么了?
为了解决这个问题,我们通过反向思考来尝试的解决。即,这个值(pendingWriteBytes)是在什么情况下会被修改?
ChannelOutboundBuffer:
好了,现在我们知道,其实pendingWriteBytes实际上也是在一个ByteBuf都写出后才会被修改的。。。 那么问题又来了,既然是这样,那么这个pendingWriteBytes又有什么用了?或者说observeOutput属性的使用到底是在什么场景下??
这个问题其实在hasOutputChanged方法注解的github issues 6150中给出了讨论。
目前能得到的结论是observeOutput属性是为了issues 6150问题所提供的解决方案,而这个问题是在通过HTTP2协议进行数据发送时导致的,讨论中提及netty在对HTTP2传输协议进行数据传输时可能会将多个数据包整合正一个包发送导致写空闲超时事件被触发了(因为,该问题与本文的问题并无关联,所以不做具体说明)。但是通过github issues 6150讨论中,我们得知了netty之所以不提供在写一个大数据包的过程中修改pendingWriteBytes的原因(即,netty不支持某个ByteBuf中写出部分数据就修改ChannelOutboundBuffer中totalPendingSize值。),这是为了防止ABA问题。
② 因为如果不是以ByteBuf或者FileRegion为单位修改pending bytes的话,可能出现ABA问题。即,因为write操作可以由多个不同的线程来操作(非EventLoop线程),这可能导致EventLoop线程在进行该OutboundBuffer中ByteBuf的flush操作时,其他线程再往这个OutboundBuffer中加数据,这可能使得最终pending bytes的值并没有改变,但实际上pending bytes是改变过的了,这样就会使得判断错误。(PS:目前NIO传输时,写完一个ByteBuf就会触发该ByteBuf的listener,那么lastWriteTime就会被修改,此时根本不会进入)
③ 而另一个ABA问题是,如果保持了ByteBuf的引用,如果使用池的ByteBuf的话(默认,Netty就是使用池的ByteBuf),如果我们存储OutBoundBuffer中的当前的(链表头)的那个ByteBuf对象的引用,在每次写空闲超时事件中判断这个ByteBuf对象的hashCode与上一次调用时的值做比较来得出是否是同一个ByteBuf。👈这种情况也可能出现ABA问题,正式因为ByteBuf是池的,那么就可能在写空闲超时事件回调方法中存有的ByteBuf引用还是一样的,但实际上是被回收后再次分配出去的,因此是逻辑上来说是不一样的ByteBuf对象了。
- “observeOutput” 字段的使用场景:
当在写一个大数据包的时候,且该在写超时已不是第一次触发的时候(即,first 为 false),这个大数据包还没写完。但在此时,我们已经有 ch.write(data)了其数据了,这会导致『pendingWriteBytes != lastPendingWriteBytes == true』(因为,channelOutboundBytes 只有在一个 ByteBuf 都写出去后,即,写到 socket 的写缓冲后。才会减少其totalPendingWriteBytes 的值。这样在👆这个场景中,在我们自此write一个data的时候,totalPendingWriteBytes的值会增加),因此来表示 outputChanged。也就是说,observeOutput 观察的是,是否有新的写数据操作,而非对已经操作的write的数据的观察!!!
解决方案
好了,到目前为止,我们已经知道为什么我们使用“observeOutput”属性无法达到我们预计的效果了。那么,关于发送大数据包我们到底可以做处理了。。
这里,我们觉得可以采用的一个方式是,使用“ChunkedWriteHandler”来实现大数据包的传输。
这个ChunkedWriteHandler又是怎么突然跑出来的。。是这样的,其实之前我们也不晓得有这个类,或者说因为了解不深给忽略了它。正好在解决这个问题的间隙,将Netty的写数据操作给过了边,在这其中发现了Netty自身目前仅对ChunkedWriteHandler和HTTP2的提供了WriteBufferWaterMark的支持,其余的需要我们程序自行添加支持。而WriteBufferWaterMark通常就是为了控制有大量待写出数据的情况下对写出流量进行控制的一个方式,这看似和我们的大数据包写出还是有些个关系的。因此,我们通过doc简单了解了下ChunkedWriteHandler的使用,发现确实是个可行的方式。在经过测试后也如预期般达到了我们要的效果!下面,我们就来说说如果通过ChunkedWriteHandler来实现大数据包发送的发送。
这里对ChunkedWriteHandler做一个简单的介绍:
ChunkedWriteHandler:一个handler,用于支持异步写大数据流并且不需要消耗大量内存也不会导致内存溢出错误( OutOfMemoryError )。
ChunkedWriteHandler仅支持ChunkedInput类型的消息。也就是说,仅当消息类型是ChunkedInput时才能实现ChunkedWriteHandler提供的大数据包传输功能(ChunkedInput是一个不确定长度的数据流)。
ChunkedWriteHandler中维护了一个待发送的数据包消息队列(Queue<PendingWrite> queue,其中PendingWrite封装了你待发送的消息以及异步写操作的promise)
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
queue.add(new PendingWrite(msg, promise));
}
这样使得你write的数据包在经过ChunkedWriteHandler的时候,会先被存储到这个消息队列中,并不会立即放入到ChannelOutboundBuffer里。
而当你执行flush操作时,ChunkedWriteHandler会依次取出消息队列中的大数据包,然后拆分成一个个小数据包(ByteBuf)后发给下游的ChannelOutboundHandler,并且在每次发送完一个ByteBuf包后都会立即执行依次ctx.flush()操作将该ByteBuf发送到网络中。但也正是因为,ChunkedWriteHandler将一个大数据包拆分成了一个个小数据包放入底层的ChannelOutboundBuffer进行传输,这使得你对大数据包的异步写操作注册的listener在底层的ChannelOutboundBuffer已经无法得到并且回调了,这就需要我们通过程序来进行状态的管理以保持我们原有逻辑的正确性。
ChunkedWriteHandler会为每个发送的小数据包注册一个listener,这个listener会在小数据包成功发送完成后调用原始大数据包的GenericProgressiveFutureListener,上面我们已经说了通过GenericProgressiveFutureListener我们可以监控数据包的发送进度(通过回调operationProgressed方法实现),以及在大数据包发送完后得到一个通知(通过回调operationComplete方法实现)。因此,我们可以在operationComplete回调方法中对写原始大数据包的异步操作上注册的listener进行回调(通过表示写异步操作promise完成来实现)。
值得一提的时,ChunkedWriteHandler对将大数据包拆分成小数据包发往下游进行的操作是受WriteBufferWaterMark控制的,当写缓冲区中的数据数量超过了设置的高水位标志,那么Channel#isWritable()方法将开始返回false,那么此时ChunkedWriteHandler就不会继续拆分大数据包。然后当写缓冲区中的字节数量减少至小于了低水位标志,Channel#isWritable()方法会重新开始返回true,而此时ChunkedWriteHandler会继续拆分未拆分完的大数据包,继续数据的写操作。
絮絮叨叨了这么多,来看看具体的实现:
首先修改了MyClientInitializer:
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addFirst("idleStateHandler", new IdleStateHandler(true, 9, 2, 11, TimeUnit.SECONDS));
pipeline.addLast(new LengthFieldBasedFrameDecoder(ByteOrder.LITTLE_ENDIAN, Integer.MAX_VALUE,
0, 4, 0, 4, true));
pipeline.addLast(new LengthFieldPrepender(ByteOrder.LITTLE_ENDIAN, 4, 0, false));
pipeline.addLast("chunkedWriteHandler", new ChunkedWriteHandler());
pipeline.addLast("myClientChunkHandler", new MyClientChunkHandler());
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new MyClientHandler());
}
}
在StringEncoder和LengthFieldPrepender两个编码器间添加了MyClientChunkHandler和ChunkedWriteHandler。
MyChunkedWriteHandler是一个出站处理器,它会完成将StringEncoder编码后的大数据包类型转换成ChannelInputStream类型,以使得其后的ChunkedWriteHandler能够对该大数据包实现拆分分发的作用。
接下来是自定义的 MyClientChunkHandler,用于将我们的待发送的大数据包类型转换成ChunkedInput类型,以使得ChunkedWriteHandler能够发挥作用。
public class MyServerChunkHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if(msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf)msg;
ByteInputStream in = new ByteInputStream();
byte[] data = null;
if(buf.hasArray()) {
System.out.println("+++ is array");
data = buf.array().clone();
} else {
System.out.println("--- is direct");
data = new byte[buf.readableBytes()];
buf.writeBytes(data);
}
// System.out.println("===== data length : " + data.length);
in.setBuf(data);
ChunkedStream stream = new ChunkedStream(in);
ReferenceCountUtil.release(msg);
ctx.write(stream, promise);
} else {
super.write(ctx, msg, promise);
}
}
}
👆实现了将数据包类型转换为ByteInputStream类型,传递个下一个ChannelOutboundHandler(也就是ChunkedWriteHandler)
后记
本次问题和朋友陆陆续续的讨论了两个晚上,印象还是比较深刻的。在第一次讨论问题的时候,我们对Netty的写数据流程也没有比较清晰的概念。后面将这块流程补上后,再重新回来看待问题,感觉又清晰了不少,再者对于IdleStateHandler的observeOutput属性确实是比较容易让人误解。如果没有去翻查github和源码的话,不容易明白这个属性真正的用意。但也正是因为对Netty写数据流程的梳理,让我们发现了一直忽略ChunkedWriteHandler,也让这个问题有了现在的这个解决方案。当然,可能随着后面进一步深入的学习,我们会发现更好的解决方案,那么到时候也会继续分享的。
若文章有任何错误,望大家不吝指教:)