rocketmq网络部分的整体的架构
remoting 模块是 mq 的基础通信模块,理解通信层的原理对理解模块间的交互很有帮助。RocketMQ Remoting 模块底层基于 Netty 网络库驱动,因此需要先了解一些基本的Netty原理。
Netty 使用 Reactor 模式,将监听线程、IO 线程、业务逻辑线程隔离开来。对每个连接,都对应一个 ChannelPipeline。ChannelPipeline 的默认实现 DefaultChannelPipeline 中用一个双向链表储存着若干 ChannelHandlerContext,每个ChannelHandlerContext 又对应着一个 ChannelHandler。链表的头部是一个 ChannelOutboundHandler:
class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler {
...
}
尾部是一个 ChannelInboundHandler:
class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
...
}
这里的 Inbound 是指这个 Handler 处理 外界触发 的事件,典型的就是对端发送了数据过来; Outbound 是指事件是 自己触发 的,比如向对端发送数据。同时,一个 inbound 的事件将在 ChannelPipeline 的 ChannelHandlerContext 链表中从头到尾传播;而一个 outbound 的事件将会在 ChannelPipeline 的 ChannelHandlerContext 链表中从尾向头传播。这样,就能将数据解码、数据处理、数据编码等操作分散到不同的 ChannelHandler 中去了。
另外,RocketMQ 的协议格式如下,开头4字节表示整个消息长度,随后4字节表示头部数据的长度,最后就是消息体的长度:
<4 byte length> <4 byte header length> <N byte header data> <N byte body data>
最后,我们再来看一下 RocketMQ remoting 部分的 UML 图,了解一下其大概由哪些部分组成:
上图这些类中,最重要的是 NettyRemotingClient 和 NettyRemotingServer,它们的一些公共方法就被封装在 NettyRemotingAbstract 中。
RemotingServer
有了上面的基本认识,就可以开始着手分析 RemotingServer 的源码了。
启动
public void start() {
...
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
new NettyConnetManageHandler(),
new NettyServerHandler());
}
});
...
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
...
}
可以看到,在 NettyRemotingServer 的 start() 方法中,启动了 netty,使用成员变量 eventLoopGroupBoss
接受连接,使用 eventLoopGroupSelector
处理 IO,并且使用 defaultEventExecutorGroup
来处理 ChannelHandler 中的业务逻辑。nettyServerConfig
用来封装对 Netty 的配置信息,包括 SendBufSize、RcvBufSize 等。最重要的是,添加了 NettyEncoder
、NettyDecoder
、IdleStateHandler
、NettyConnetManageHandler
、NettyServerHandler
几个ChannelHandler。
随后,如果 channelEventListener
不为 null, 则启动一个专门的线程监听 Channel 的各种事件。
if (this.channelEventListener != null) {
this.nettyEventExecuter.start();
}
这个类主要是循环的从一个 LinkedBlockingQueue 中读取事件,而后调用 channelEventListener 的不同方法处理事件:
class NettyEventExecuter extends ServiceThread {
//使用一个 LinkedBlockingQueue 来存储待处理的 NettyEvent
private final LinkedBlockingQueue<NettyEvent> eventQueue = new LinkedBlockingQueue<NettyEvent>();
private final int maxSize = 10000;
//添加待处理事件,如果队列大小没用超过限制,则将事件入队
public void putNettyEvent(final NettyEvent event) {
if (this.eventQueue.size() <= maxSize) {
this.eventQueue.add(event);
} else {
PLOG.warn("event queue size[{}] enough, so drop this event {}", this.eventQueue.size(), event.toString());
}
}
@Override
public void run() {
PLOG.info(this.getServiceName() + " service started");
final ChannelEventListener listener = NettyRemotingAbstract.this.getChannelEventListener();
//循环读取事件,并处理
while (!this.isStopped()) {
try {
NettyEvent event = this.eventQueue.poll(3000, TimeUnit.MILLISECONDS);
if (event != null && listener != null) {
switch (event.getType()) {
case IDLE:
listener.onChannelIdle(event.getRemoteAddr(), event.getChannel());
break;
case CLOSE:
listener.onChannelClose(event.getRemoteAddr(), event.getChannel());
break;
case CONNECT:
listener.onChannelConnect(event.getRemoteAddr(), event.getChannel());
break;
case EXCEPTION:
listener.onChannelException(event.getRemoteAddr(), event.getChannel());
break;
default:
break;
}
}
} catch (Exception e) {
PLOG.warn(this.getServiceName() + " service has exception. ", e);
}
}
PLOG.info(this.getServiceName() + " service end");
}
...
}
随后,则启动一个定时器,每隔一段时间查看 responseTable 是否有超时未回应的请求,并完成一些清理工作,responseTable 的作用将在后文说明发送请求过程时说明:
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Exception e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
至此,NettyRemotingServer 的启动过程就结束了。
ChannelHandler
在启动时,向 ChannelPipeline 中添加了以下ChannelHandler,我们分别来解释其作用。
NettyEncoder
对发送请求按照上文提到的格式进行编码,没用什么特殊的:
@Override
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
throws Exception {
try {
ByteBuffer header = remotingCommand.encodeHeader();
out.writeBytes(header);
byte[] body = remotingCommand.getBody();
if (body != null) {
out.writeBytes(body);
}
} catch (Exception e) {
log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
if (remotingCommand != null) {
log.error(remotingCommand.toString());
}
RemotingUtil.closeChannel(ctx.channel());
}
}
NettyDecoder
与 NettyEncoder 相反,这是一个 Inbound ChannelHandler,对接收到的数据进行解码,注意由于 RocketMQ 的协议的头部是定长的,所以它继承了 LengthFieldBasedFrameDecoder:
@Override
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = null;
try {
frame = (ByteBuf) super.decode(ctx, in);
if (null == frame) {
return null;
}
ByteBuffer byteBuffer = frame.nioBuffer();
return RemotingCommand.decode(byteBuffer);
} catch (Exception e) {
log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
RemotingUtil.closeChannel(ctx.channel());
} finally {
if (null != frame) {
frame.release();
}
}
return null;
}
IdleStateHandler
这个 Handler 是用来进行 keepalive 的,当一段时间没有发送或接收到数据时,则触发 IdleStateEvent。
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
NettyConnetManageHandler
负责处理各种连接事件,尤其是 IdleState,将其交给 channelEventListener
处理。
IdleStateEvent evnet = (IdleStateEvent) evt;
if ( evnet.state().equals( IdleState.ALL_IDLE ) )
{
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr( ctx.channel() );
log.warn( "NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress );
RemotingUtil.closeChannel( ctx.channel() );
if ( NettyRemotingServer.this.channelEventListener != null )
{
NettyRemotingServer.this
.putNettyEvent( new NettyEvent( NettyEventType.IDLE, remoteAddress.toString(), ctx.channel() ) );
}
}
NettyServerHandler
调用 NettyRemotingAbstract 的 processMessageReceived 方法处理请求。
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0( ChannelHandlerContext ctx, RemotingCommand msg ) throws Exception
{
processMessageReceived( ctx, msg );
}
}
public void processMessageReceived( ChannelHandlerContext ctx, RemotingCommand msg ) throws Exception
{
final RemotingCommand cmd = msg;
if ( cmd != null )
{
switch ( cmd.getType() )
{
case REQUEST_COMMAND:
processRequestCommand( ctx, cmd );
break;
case RESPONSE_COMMAND:
processResponseCommand( ctx, cmd );
break;
default:
break;
}
}
}
在这里,请求可以分为两类,一类是处理别的服务发来的请求;另外一类是处理自己发给别的服务的请求的处理结果。所有的请求其实都是异步的,只是将请求相关的 ResponseFuture记在一个 ConcurrentHashMap 中,map 的 key 为与请求相关的一个整数。
protected final ConcurrentHashMap<Integer /* opaque */, ResponseFuture> responseTable =
new ConcurrentHashMap<Integer, ResponseFuture>(256);
另外需要注意的时,对不同类型的请求(由 RemotingCommand 的 code 字段标识),会提前注册对应的 NettyRequestProcessor
以及 ExecutorService
,对请求的处理将放在注册好的线程池中进行:
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
final int opaque = cmd.getOpaque();
...
}
对于 ResponseRequest 的处理则较为简单,只是将其从 responseTable 中删掉,然后再调用 ResponseFuture 的 putResponse 方法设置返回结果,或是调用 responseFuture 中预设的回掉方法。
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
final int opaque = cmd.getOpaque();
final ResponseFuture responseFuture = responseTable.get(opaque);
if (responseFuture != null) {
responseFuture.setResponseCommand(cmd);
responseFuture.release();
responseTable.remove(opaque);
if (responseFuture.getInvokeCallback() != null) {
executeInvokeCallback(responseFuture);
} else {
responseFuture.putResponse(cmd);
}
} else {
PLOG.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
PLOG.warn(cmd.toString());
}
}
向其他服务发起请求
请求有三种,分别是异步请求、同步请求以及单向请求,分别调用了 NettyRemotingAbstract 的对应方法。从上文的分析我们可以看到,异步请求其实是使用 opaque
字段标识了一次请求,然后生成一个占位符 ResponseFuture 并存储起来。接收方在处理完请求后,发送一个相同 opaque
值的回应请求,从而通过 opaque
找到对应的 ResponseFuture,返回结果或是运行预设的回调函数。同步请求其实也是一个异步请求,只不过通过 CountdownLatch 使调用者发生阻塞。单向请求最简单,只发送,不关注请求结果。
下面以 invokeAsync 为例分析整个过程:
//调用 NettyRemotingAbstract 的 invokeAsyncImpl 方法
@Override
public void invokeAsync(Channel channel, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
}
public void invokeAsyncImpl( final Channel channel, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback )
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException
{
final int opaque = request.getOpaque();
boolean acquired = this.semaphoreAsync.tryAcquire( timeoutMillis, TimeUnit.MILLISECONDS );
if ( acquired )
{
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce( this.semaphoreAsync );
final ResponseFuture responseFuture = new ResponseFuture( opaque, timeoutMillis, invokeCallback, once );
this.responseTable.put( opaque, responseFuture );
try {
channel.writeAndFlush( request ).addListener( new ChannelFutureListener()
{
@Override
public void operationComplete( ChannelFuture f ) throws Exception {
if ( f.isSuccess() )
{
responseFuture.setSendRequestOK( true );
return;
} else {
responseFuture.setSendRequestOK( false );
}
responseFuture.putResponse( null );
responseTable.remove( opaque );
try {
executeInvokeCallback( responseFuture );
} catch ( Throwable e ) {
PLOG.warn( "excute callback in writeAndFlush addListener, and callback throw", e );
} finally {
responseFuture.release();
}
PLOG.warn( "send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr( channel ) );
}
} );
} catch ( Exception e ) {
responseFuture.release();
PLOG.warn( "send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr( channel ) + "> Exception", e );
throw new RemotingSendRequestException( RemotingHelper.parseChannelRemoteAddr( channel ), e );
}
} else {
String info =
String.format( "invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", /* */
timeoutMillis, /* */
this.semaphoreAsync.getQueueLength(), /* */
this.semaphoreAsync.availablePermits() /* */
);
PLOG.warn( info );
throw new RemotingTooMuchRequestException( info );
}
}
在请求时如果失败成功则直接返回,如果失败则从 responseTable 删除本次请求,并调用 responseFuture.putResponse( null )
,然后执行失败回调 executeInvokeCallback( responseFuture )
。而后,就是等待对方发来的 Response Request 了,上文已经有过分析,这里不再赘述。
下面,看看同步消息。在发送请求后,调用了 ResponseFuture 的 waitResponse 方法。这个方法调用了 CountDownLatch 的 await 方法。请求处理成功或失败后则会调用 ResponseFuture 的 putResponse 方法,设置处理结果并打开 CountDownLatch,从而实现了同步调用。
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
final int opaque = request.getOpaque();
try {
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
PLOG.warn("send a request command to channel <" + addr + "> failed.");
}
});
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
return responseCommand;
} finally {
this.responseTable.remove(opaque);
}
}
public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
return this.responseCommand;
}
public void putResponse(final RemotingCommand responseCommand) {
this.responseCommand = responseCommand;
this.countDownLatch.countDown();
}
NettyRemotingClient 的思路与 NettyRemotingServer 类似,这里不再进行分析。
以上。