以Rocket MQ一段代码为例
if (response != null) {
response.setOpaque(opaque);
response.markResponseType();
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(response.toString());
}
} else {}
ChannelHandlerContext.writeAndFlush(Object)
- 业务线程编码使用堆外内存
- 最终worker线程,HeadContext写入数据
- 写入到ChannelOutboundBuffer维护的链表中
- 使用java channel写数据,缓冲区满了搭配OP_WRITE事件使用
1. AbstractChannelHandlerContext.writeAndFlush(Object)
public ChannelFuture writeAndFlush(Object msg) {
return writeAndFlush(msg, newPromise());
}
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}
// 乱七八糟校验
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return promise;
}
write(msg, true, promise);
return promise;
}
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
invokeWrite0、invokeFlush0
2. write
2.1 NettyConnectManageHandler.write
class NettyConnectManageHandler extends ChannelDuplexHandler {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
}
}
继续执行下一个handler
2.2 NettyEncoder.write
public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {
// 泛型指定, 处理类型校验
if (acceptOutboundMessage(msg)) {
@SuppressWarnings("unchecked")
I cast = (I) msg;
// ByteBuf类型, 堆内或堆外, 默认使用堆外
buf = allocateBuffer(ctx, cast, preferDirect);
try {
// 写入编码数据
encode(ctx, cast, buf);
} finally {
// 如果msg是ReferenceCounted, 释放一次使用
ReferenceCountUtil.release(cast);
}
// 存在数据
if (buf.isReadable()) {
ctx.write(buf, promise);
} else {
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
// 数据发送成功, 设置为null
// finally中不进行释放内存, 等待后续处理成功释放内存
buf = null;
}
// 不是泛型指定处理类型, 直接写入
else {
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable e) {
throw new EncoderException(e);
} finally {
// buf不为空, 说明没有使用到, 尝试释放内存(如果没有其他引用)
if (buf != null) {
buf.release();
}
}
}
@Override
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
throws Exception {
try {
ByteBuffer header = remotingCommand.encodeHeader();
out.writeBytes(header);
byte[] body = remotingCommand.getBody();
if (body != null) {
out.writeBytes(body);
}
} catch (Exception e) {
log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
if (remotingCommand != null) {
log.error(remotingCommand.toString());
}
RemotingUtil.closeChannel(ctx.channel());
}
}
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
// 从当前handler.pre查找ChannelOutboundHandler
AbstractChannelHandlerContext next = findContextOutbound();
// ReferenceCounted类型, touch
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
Runnable taskWrapper = new NamedRunnable() {
@Override
public void run() {
task.run();
}
@Override
public String name() {
return "AbstractChannelHandlerContext[895] > write";
}
};
safeExecute(executor, taskWrapper, promise, m);
}
}
向ByteBuf中写入数据编码后要发送的数据
继续执行下一个handler
2.3 HeadContext
最终执行到Worker线程的HeadContext, 添加线程任务
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
2.3.1 AbstractUnsafe.write
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
// 异常
safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
// release message now to prevent resource-leak
// 如果是ReferenceCounted类型msg
// 尝试去释放
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
// 把所有的非直接内存转换成直接内存DirectBuffer
// FileRegion 直接返回
msg = filterOutboundMessage(msg);
// DefaultMessageSizeEstimator.HandleImpl返回数据大小
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
// 添加到Entity 链表中
outboundBuffer.addMessage(msg, size, promise);
}
2.3.2 ChannelOutboundBuffer.addMessage 维护一个发送数据的链表
// 待发送数据
private Entry flushedEntry;
// The Entry which is the first unflushed in the linked-list structure
// 等待发送数据
private Entry unflushedEntry;
// The Entry which represents the tail of the buffer
// 最后一个数据
private Entry tailEntry;
// The number of flushed entries that are not written yet
// flushedEntry个数
private int flushed;
// BUFFER个数
private int nioBufferCount;
// BUFFER总数大小
private long nioBufferSize;
// 正在处理失败
private boolean inFail;
public void addMessage(Object msg, int size, ChannelPromise promise) {
// Recycler 循环获得一个Entry
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
// 没有数据
flushedEntry = null;
} else {
// 维护链表
Entry tail = tailEntry;
tail.next = entry;
}
tailEntry = entry;
// 设置待发送数据
if (unflushedEntry == null) {
unflushedEntry = entry;
}
// increment pending bytes after adding message to the unflushed arrays.
// See https://github.com/netty/netty/issues/1619
// 阈值计算
incrementPendingOutboundBytes(entry.pendingSize, false);
}
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
// 水位过高
// 如果之前的状态是可写,现在的状态是不可写
// 执行handler的 channelWritabilityChanged
// channel调用isWriteable的时候就会返回false
setUnwritable(invokeLater);
}
}
3. flush
3.1 HeadContext.flush
public void flush(ChannelHandlerContext ctx) throws Exception {
unsafe.flush();
}
3.2 AbstractUnsafe.flush
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
// 将unflushedEntry 转换为flushedEntry
outboundBuffer.addFlush();
// 写入到JDK nio channel
flush0();
}
3.3 ChannelOutboundBuffer.addFlush
将待写入数据(unflushedEntry),设置为可写数据(flushedEntry)
public void addFlush() {
// There is no need to process all entries if there was already a flush before and no new messages
// where added in the meantime.
//
// See https://github.com/netty/netty/issues/2577
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {
// there is no flushedEntry yet, so start with the entry
// 将待发送数据设置为可发送数据
flushedEntry = entry;
}
// 设置所有待写入的消息的promise临时值(UNCANCELLABLE), 感觉多余设置
do {
flushed ++;
if (!entry.promise.setUncancellable()) {
// Was cancelled so make sure we free up memory and notify about the freed bytes
int pending = entry.cancel();
// 减少待写数据大小
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null);
// All flushed so reset unflushedEntry
unflushedEntry = null;
}
}
3.4 AbstractNioUnsafe.flush0
protected final void flush0() {
// Flush immediately only when there's no pending flush.
// If there's a pending flush operation, event loop will call forceFlush() later,
// and thus there's no need to call it now.
// 如果注册了write事件, 说明TCP缓冲区满了
// 所以会注册write事件, 当缓冲区可以写入在执行: ch.unsafe().forceFlush();
if (!isFlushPending()) {
super.flush0();
}
}
3.5 AbstractUnsafe.flush0
protected void flush0() {
// 正在flush, 返回
if (inFlush0) {
// Avoid re-entrance
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}
inFlush0 = true;
// Mark all pending write requests as failure if the channel is inactive.
if (!isActive()) {
try {
// 清空buffer
// ChannelPromise set fail
// 回收Entry
if (isOpen()) {
outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
}
} finally {
inFlush0 = false;
}
return;
}
try {
// 写数据
doWrite(outboundBuffer);
}
// 清理ChannelOutboundBuffer
// 关闭JDK nio channel
catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
/**
* Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
* failing all flushed messages and also ensure the actual close of the underlying transport
* will happen before the promises are notified.
*
* This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
* may still return {@code true} even if the channel should be closed as result of the exception.
*/
close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
} else {
try {
shutdownOutput(voidPromise(), t);
} catch (Throwable t2) {
close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
}
}
} finally {
inFlush0 = false;
}
}
3.6 NioSocketChannel.doWrite
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
// 循环次数(默认16)
int writeSpinCount = config().getWriteSpinCount();
do {
if (in.isEmpty()) {
// All written so clear OP_WRITE
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
}
// Ensure the pending writes are made of ByteBufs only.
// 最大可写数据
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
// 从flushedEntry开始, 获得最大1024个JDK nio ByteBuffer和总大小不超过maxBytesPerGatheringWrite总大小的数据
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
// 获得可写ByteBuffer数量
int nioBufferCnt = in.nioBufferCount();
// Always us nioBuffers() to workaround data-corruption.
// See https://github.com/netty/netty/issues/2761
switch (nioBufferCnt) {
// FileRegion
case 0:
// We have something else beside ByteBuffers to write so fallback to normal writes.
writeSpinCount -= doWrite0(in);
break;
case 1: {
// Only one ByteBuf so use non-gathering write
// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
// to check if the total size of all the buffers is non-zero.
ByteBuffer buffer = nioBuffers[0];
int attemptedBytes = buffer.remaining();
// 写数据
final int localWrittenBytes = ch.write(buffer);
if (localWrittenBytes <= 0) {
// TCP缓冲区满了, 注册OP_WRITE事件, 等待有空闲继续写入
incompleteWrite(true);
return;
}
//
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
// 减少flushedEntry对应localWrittenBytes可写数据
// 并释放相应内存
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
default: {
// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
// to check if the total size of all the buffers is non-zero.
// We limit the max amount to int above so cast is safe
// 最大可写数据大小
long attemptedBytes = in.nioBufferSize();
// 写入数据
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes <= 0) {
// 说明底层的写缓冲区已经满了(这里应该指的是linux底层的写缓冲区满了)
incompleteWrite(true);
return;
}
// Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
}
} while (writeSpinCount > 0);
// 对应的Selector为感兴趣的事件,这样当写缓冲区有空间时
// 就会触发SelectionKey.OP_WRITE就绪事件,
// NioEventLoop的事件循环在处理SelectionKey.OP_WRITE事件时会执行forceFlush()以继续发送外发送完的数据。
incompleteWrite(writeSpinCount < 0);
}
3.6.1 ChannelOutboundBuffer.nioBuffers
public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) {
assert maxCount > 0;
assert maxBytes > 0;
long nioBufferSize = 0;
int nioBufferCount = 0;
final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);
// 从flushedEntry开始
Entry entry = flushedEntry;
while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {
if (!entry.cancelled) {
ByteBuf buf = (ByteBuf) entry.msg;
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes > 0) {
// 超过maxBytes大小
if (maxBytes - readableBytes < nioBufferSize && nioBufferCount != 0) {
// If the nioBufferSize + readableBytes will overflow maxBytes, and there is at least one entry
// we stop populate the ByteBuffer array. This is done for 2 reasons:
// 1. bsd/osx don't allow to write more bytes then Integer.MAX_VALUE with one writev(...) call
// and so will return 'EINVAL', which will raise an IOException. On Linux it may work depending
// on the architecture and kernel but to be safe we also enforce the limit here.
// 2. There is no sense in putting more data in the array than is likely to be accepted by the
// OS.
//
// See also:
// - https://www.freebsd.org/cgi/man.cgi?query=write&sektion=2
// - http://linux.die.net/man/2/writev
break;
}
nioBufferSize += readableBytes;
int count = entry.count;
if (count == -1) {
//noinspection ConstantValueVariableUse
entry.count = count = buf.nioBufferCount();
}
int neededSpace = min(maxCount, nioBufferCount + count);
if (neededSpace > nioBuffers.length) {
// 超过nioBuffers.length(默认1024), 按neededSpace扩容
nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
NIO_BUFFERS.set(threadLocalMap, nioBuffers);
}
// 填充nioBuffers, 元素为JDK nio的ByteBuffer, 不是netty ByteBuf
if (count == 1) {
ByteBuffer nioBuf = entry.buf;
if (nioBuf == null) {
// cache ByteBuffer as it may need to create a new ByteBuffer instance if its a
// derived buffer
entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
}
nioBuffers[nioBufferCount++] = nioBuf;
} else {
ByteBuffer[] nioBufs = entry.bufs;
if (nioBufs == null) {
// cached ByteBuffers as they may be expensive to create in terms
// of Object allocation
entry.bufs = nioBufs = buf.nioBuffers();
}
for (int i = 0; i < nioBufs.length && nioBufferCount < maxCount; ++i) {
ByteBuffer nioBuf = nioBufs[i];
if (nioBuf == null) {
break;
} else if (!nioBuf.hasRemaining()) {
continue;
}
nioBuffers[nioBufferCount++] = nioBuf;
}
}
if (nioBufferCount == maxCount) {
break;
}
}
}
entry = entry.next;
}
this.nioBufferCount = nioBufferCount;
this.nioBufferSize = nioBufferSize;
return nioBuffers;
}