(*文章基于Netty4.1.22版本)
ctx.writeAndFlush相当于先调用ctx.write然后再调用ctx.flush,所以下面分析write和flush
write
write和flush会经过pipeline的每个outbound的Handler,之前文章分析过流程,这里不再分析。
write方法最终到达HeadContext的write方法,然后什么都没做,将请求转发到底层实现unsafe,实现在AbstractUnsafe中
public final void write(Object msg, ChannelPromise promise) {
//Netty的缓冲区
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
//....
int size;
try {
msg = filterOutboundMessage(msg);// 对msg类型进行过滤和包装
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
//....
}
将msg加入缓冲区
outboundBuffer.addMessage(msg, size, promise);
}
ChannelOutboundBuffer
Netty调用write的时候,不是真的将数据写出去,而是使用了一个缓冲区去存放这份数据,这个缓冲区的实现就是ChannelOutboundBuffer,看下其主要变量
//
// 链表中第一个已经被标记为已flush的Entry元素
private Entry flushedEntry;
// 链表中第一个已经被标记为未flush的Entry元素
private Entry unflushedEntry;
// 链表最后一个Entry元素
private Entry tailEntry;
// 已经标记为flush但是还未写出去的Entry数量
private int flushed;
// ByteBuffer 的数量和大小
private int nioBufferCount;
private long nioBufferSize;
ChannelOutboundBuffer是一个链表,每一个元素类型是Entry,结构如下
Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
接下来看下addMessage方法
public void addMessage(Object msg, int size, ChannelPromise promise) {
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
tailEntry = entry;
} else {
Entry tail = tailEntry;
tail.next = entry;
tailEntry = entry;
}
if (unflushedEntry == null) {
unflushedEntry = entry;
}
incrementPendingOutboundBytes(entry.pendingSize, false);
}
链表的操作,先构造一个Entry,然后加入到链表尾部,最后调用了一下incrementPendingOutboundBytes方法
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
// 更新写入的大小
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {// 如果当前缓冲区大小大于指定的值,则调用setWritable方法
setUnwritable(invokeLater);
}
}
private void setWritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue & ~1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue != 0 && newValue == 0) {
fireChannelWritabilityChanged(invokeLater);// 触发channelWritabilityChanged事件
}
break;
}
}
}
write涉及的ChannelOutboundBuffer功能就这些,其他的在flush的时候分析
flush
同理,flush最终调用的是unsafe的flush方法
public final void flush() {
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
outboundBuffer.addFlush();
flush0();
}
先调用了ChannelOutboundBuffer的addFlush方法,然后具体逻辑在flush0中,先看下addFlush方法实现
public void addFlush() {
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {// 将链表中第一个元素标记为flushed
flushedEntry = entry;
}
do {
flushed ++;//进行flush的数量
if (!entry.promise.setUncancellable()) {// 如果已经取消了,更新TOTAL_PENDING_SIZE_UPDATER的保存的数量
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true);//
}
entry = entry.next;//
} while (entry != null);
// 链表遍历完毕,所有的entry都flush,unflushedEntry代表为未flush的Entry,此时不存在了,所以设为null
unflushedEntry = null;
}
}
addFlush方法只是将缓冲区中的Entry标记为flush(更新unflushedEntry和flushedEntry的值)
flush0实际上会调用到AbstractNioUnsafe的flush0方法
protected final void flush0() {
if (!isFlushPending()) {
super.flush0();
}
}
AbstractNioUnsafe重写了父类的flush0方法,只是在其之上加了一层判断,通过isFlushPending返回值来判断是否需要进行flush操作,具体的flush操作还是在父类AbstractUnsafe中,看下isFlushPending
private boolean isFlushPending() {
SelectionKey selectionKey = selectionKey();
return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
}
就两行代码,首先要SelectionKey有效啦,然后最重要的是后面那句
selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0
从代码上看,就是SelectionKey设置了OP_WRITE作为感兴趣的事件,那么即如果监听了OP_WRITE事件,那么就不flush。
- 为什么要这样做?
- OP_WRITE
要弄明白这个判断,首先要知道OP_WRITE这个事件。那么什么时候Selector会轮询到OP_WRITE事件?是通道可写的时候。
回想一下一个简单的NIO demo,貌似没有处理这个OP_WRITE,程序也可以非常爽的在自己的电脑中的Client和Server中将Hello world发送来发送去,其实这种时候,通道都是可写的,所以监听一个可写事件,等到通道可写的时候再去进行某些操作貌似就没有意义了,因为在demo中,一般不会出现不可写的情况,而在实际运用中写入的数据太多,或者网络有问题,导致缓冲区满了,Channel.write可能返回0,即写出去的字节数为0,那么这时候代表Channel已经不可写了,再继续写也没有用,那么这个时候如果注册一个OP_WRITE事件,在Channel可写的时候,Selector就会轮询到OP_WRITE事件,这个事件,再去写,就OK了,所以这里判断如果监听了OP_WRITE事件,代表通道不可写,所以才监听的OP_WRITE事件,那么就不处理,等到NioEventLoop轮询到OP_WRITE事件再进行flush操作(这段是个人理解总结,不一定正确),具体代码如下
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
//....
try {
int readyOps = k.readyOps();
//....
// 如果监听
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
} catch (CancelledKeyException ignored) {
}
}
当通道可写的时候,进行flush操作,看下forceFlush实现
public final void forceFlush() {
super.flush0();
}
可以看到,还是调用了父类的flush0方法,没有isFlushPending的判断,直接刷新
接下来看下flush0方法
protected void flush0() {
//....
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
//....
try {
doWrite(outboundBuffer);
} catch (Throwable t) {
//....
} finally {
inFlush0 = false;
}
}
看下doWrite方法
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
// 类似并发编程的自旋次数
int writeSpinCount = config().getWriteSpinCount();
do {
if (in.isEmpty()) {// 如果缓冲区为空
clearOpWrite();// 清除OP_WRITE监听
return;
}
// 获取每次写入最大的数量.
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
// 转换成原生的ByteBuffer数组
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);、
// buffer的数量
int nioBufferCnt = in.nioBufferCount();
switch (nioBufferCnt) {
case 0:
writeSpinCount -= doWrite0(in);
break;
case 1: {
ByteBuffer buffer = nioBuffers[0];
int attemptedBytes = buffer.remaining();
final int localWrittenBytes = ch.write(buffer);// 直接写入
if (localWrittenBytes <= 0) {// 通道可能不可写
incompleteWrite(true);
return;
}
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
default: {
long attemptedBytes = in.nioBufferSize();// 这一批buffer的大小
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes <= 0) {// 通道可能不可写
incompleteWrite(true);
return;
}
adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
maxBytesPerGatheringWrite);
// 这一次write写出localWrittenBytes大小的数据,需要将缓冲区中的数据清除
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
}
} while (writeSpinCount > 0);
incompleteWrite(writeSpinCount < 0);
}
这里要划几个重点:
writeSpinCount:这个东西干嘛用的?源码注释里说了"类似自旋锁",那么我的理解是,自旋锁是不断重试去获取锁,因为前面获取锁失败了,而这里不是去获取锁,而是尝试将数据写完,但是每次都只能写一部分出去,所以重复writeSpinCount次write的操作,所以和自旋有点类似
incompleteWrite(writeSpinCount < 0):1中说了会重复writeSpinCount操作,如果writeSpinCount都还未写完呢?如果writeSpinCount < 0证明在writeSpinCount次write中都还未写完,该表达式为true,那么看下incompleteWrite方法
protected final void incompleteWrite(boolean setOpWrite) {
// Did not write completely.
if (setOpWrite) {
setOpWrite();// 设置OP_WRITE事件
} else {
clearOpWrite();
eventLoop().execute(flushTask);
}
}
可以看到,如果setOpWrite为true,则设置了OP_WRITE事件,让Selector异步去处理OP_WRITE事件,强制flush,就是上面讲的内容。而如果入参为false,那么需要清除OP_WRITE事件,因为大部分情况下通道可写,如果一直监听这个事件,会不听轮询出该事件,导致类似空轮询的结果
in.isEmpty():在缓冲区为空的情况下,也需要清除OP_WRITE事件,原因和2一样
if (localWrittenBytes <= 0):这个和上面分析OP_WRITE事件的时候说过,通道不可写的情况
- 那么还有个问题,由于一次write并不能将数据全部写出去,那Netty是如何处理缓冲区还未处理的数据的呢?
这时需要看一下in.removeBytes方法
public void removeBytes(long writtenBytes) {
for (;;) {
Object msg = current();// 等于flushedEntry.msg
if (!(msg instanceof ByteBuf)) {
assert writtenBytes == 0;
break;
}
final ByteBuf buf = (ByteBuf) msg;
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes <= writtenBytes) {
if (writtenBytes != 0) {
progress(readableBytes);
writtenBytes -= readableBytes;
}
remove();
} else { // readableBytes > writtenBytes
if (writtenBytes != 0) {
buf.readerIndex(readerIndex + (int) writtenBytes);
progress(writtenBytes);
}
break;
}
}
clearNioBuffers();
}
这里首先获取缓冲区的flushedEntry节点,flushedEntry在一开始的时候已经讲过,代表链表中第一个被标记为flush的节点,而这个节点在addFlush的时候被初始化,这里首先会获取flushedEntry,可以看出,write的时候是从flushedEntry开始处理的,即flushedEntry使标记为待写入但是还未写成功的Entry。
接下来会获取ByteBuf,获取当前ByteBuf可读的大小readableBytes(readerIndex和writerIndex含义如有不明白的另行百度),而根据大小分为两个分支:
- 成功写入的字节数大于等于当前ByteBuf可读大小
- 成功写入的字节数大于当前ByteBuf可读大小
通过图来理解一下这两种情况
假设有4个Entry,每个Entry里的ByteBuf大小如图所示,当前写入的大小为120,过程如下:
第一个循环-> 获取第一个Entry,其readableBytes=34<120,所以代表当前ByteBuf已经全部写入(总共写入120,当前为34,所以当前ByteBuf完全写入,另外86大小是其他ByteBuf),这时候调用progress方法,且writtenBytes=writtenBytes-34=86,然后调用remove方法,看下remove方法实现
public boolean remove() {
Entry e = flushedEntry;
if (e == null) {// 如果为空了,代表全部写入,清空nioBufferCount和NIO_BUFFERS
clearNioBuffers();
return false;
}
Object msg = e.msg;
ChannelPromise promise = e.promise;
int size = e.pendingSize;
removeEntry(e);
//....
return true;
}
private void removeEntry(Entry e) {
// flushed在addFlush的时候会增加,在remove的时候会减少
if (-- flushed == 0) {
// 所有Entry的ByteBuf都写入完毕,置空flushedEntry tailEntry和unflushedEntry
flushedEntry = null;
if (e == tailEntry) {
tailEntry = null;
unflushedEntry = null;
}
} else {// 还有Entry没有处理完毕,当前Entry已经写入完毕,flushedEntry往后移动
flushedEntry = e.next;
}
}
经过第一个循环后缓冲区结构如下:
第二个循环->和第一个循环类似,处理完第二个后结构如下:
第三个循环-> 可以看到此时满足第二种情况了,这种情况ByteBuf只写了3个字节的大小,而当前ByteBuf大小而10,所以和第一个情况不同的是,这个将readerIndex设置为3
整个write和flush的流程就分析完毕了,总结一下:
- write和flush经过pipeline最终都会到达HeadContext,HeadContext依赖unsafe去进行底层操作
- write只是将数据放到Netty的缓冲区ChannelOutboundBuffer中,内部实现是一个链表
3.flush的时候,先判断当前是否监听了OP_WRITE事件,如果监听了则不进行flush操作,由selector异步轮询到OP_WRITE事件的时候调用foreceFlush进行flush - 当SocketChannel.write返回的字节数小于等于0的时候代表当前通过不可写,则监听OP_WRITE事件,由selector触发flush操作
- 由于没法一次性将所有数据写入,Netty使用了类似自旋锁的操作,重复一定次数进行SocketChannel.write操作,如果最后还是没能全部将数据写入,则监听OP_WRITE事件
- 当某一个SocketChannel.write后,Netty遍历通过缓冲区中的Entry,将已经完全处理完毕的Entry移除,flushedEntry不停向后移动