1 概述
2 Netty通信服务端
3 Netty通信客户端
1 概述
看RocketMQ的源码可以知道,RocketMQ各组件的通信主要基于Netty实现,这里用“主要”是因为RocketMQ的通信也有采用原生Java NIO的实现方式,比如Master Broker和Slave Broker之间HA实现就采用原生Java NIO实现,具体可参考HAService
以及HAConnection
等相关类的实现(后面有机会会有专文介绍RocketMQ HA实现)。
下面我们会直接介绍RocketMQ使用Netty通信的相关类实现原理,如果对Netty不太了解,可以看本人Netty系列文章。在介绍通信原理时,我们重点介绍RocketMQ Netty通信服务端的实现原理,客户端的实现和服务端类似,只做简单描述。
2 Netty通信服务端
在RocketMQ中,Producer、Consumer会向Broker发送请求,进行消息存储和消费,Producer、Consumer、Broker也都会从Namesrv进行注册或查询路由信息。所以作为服务提供端的Broker和Namesrv在启动时都会实例化Netty服务端组件。这里我们主要介绍BrokerController
对象持有的NettyRemotingServer
类。
RocketMQ服务端接口为RemotingServer
,其提供的默认实现为NettyRemotingServer
,其在BrokerController.initialize
方法中被实例化,源码如下:
//BrokerController
public boolean initialize() throws CloneNotSupportedException {
···
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
...
}
通过上述源码可知,BrokerController
持有两个NettyRemotingServer
类实例,分别为remotingServer
和fastRemotingServer
,remotingServer
监听通过参数listenPort
(默认为10911)配置的端口号,而fastRemotingServer
则监听remotingServer
监听的端口号-2,这两个Netty服务类实例的区别我们会在后面章节介绍NettyRequestProcessor
时进行介绍。
下面我们继续看NettyRemotingServer
构造函数:
//NettyRemotingServer
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
final ChannelEventListener channelEventListener) {
//首先调用父类构造函数,传入参数为oneway(客户端单向访问无返回)
//以及客户端异步访问两种请求类型的并发数,这里不做过多介绍
super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
//熟悉的Netty服务端启动类
this.serverBootstrap = new ServerBootstrap();
this.nettyServerConfig = nettyServerConfig;
this.channelEventListener = channelEventListener;
//从配置中获取服务端事件处理线程池线程数量,注意这个线程池和
//Netty中的线程池没有关系,Netty线程读到网络事件之后,会调用
//RocketMQ自定义的Processor(此Processor不要和Netty的Handler
//混淆)进行逻辑处理,Processor的逻辑处理会被封装为Runnable
//放入下面的publicExecutor线程池。publicThreadNums则定义
//线程池publicExecutor线程池线程数量
int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
//根据是否使用Epoll创建不同的EventloopGroup,一个用于Accept
//一个用于接收的客户端channel的select
//如果平台为Linux且配置使用epoo且检测平台可以使用epoll,则
//useEpoll返回true
if (useEpoll()) {
this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
}
});
this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
} else {
this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));
}
});
this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
}
//有关安全套接字的上下文加载,本文不做介绍
loadSslContext();
}
好了,NettyRemotingServer
的构造函数已经介绍完毕,下面我们看其start
方法实现,在start
方法中机会见到我们熟悉的handler的注册等:
//NettyRemotingServer
@Override
public void start() {
//defaultEventExecutorGroup用于执行定时任务、用户任务等,具体可见
//本文Netty文件集中channelpipiline和channelhandler相关介绍
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
ServerBootstrap childHandler =
//accept线程池和select线程池
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
//相关option配置
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
//配置本地监听端口号
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
//用于初始化接受的客户端连接pipeline的初始化handler
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
//对于每个客户端连接,其pipeline注册了如下的
//handler
ch.pipeline()
//此handler用于ssl握手,第一次完成握手后会从
//pipeline中被移除
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
new HandshakeHandler(TlsSystemConfig.tlsMode))
.addLast(defaultEventExecutorGroup,
//编码器
new NettyEncoder(),
//解码器
new NettyDecoder(),
//空闲通道检测handler
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
//连接管理器,看其源码主要用于处理
//IDLE事件并在active/registered方法中
//记录info日志
new NettyConnectManageHandler(),
//RocketMQ的主要处理逻辑就在此handler中
//完成,比如如何响应客户端的请求等
new NettyServerHandler()
);
}
});
//是否使用池化bytebuf分配器
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
//进行端口绑定,开始客户端连接的实际监听
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
//定时任务,用于扫描那些自己发出正在等待服务端响应(如broker
//向Namesrv发出的请求)的请求,如果已经超时,则进行超时处理
//扫描时间间隔为1秒/次
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
RocketMQ中NettyRemotingServer
的初始化和启动已经介绍完毕,本文不分析RocketMQ使用Netty时的编码解码方案,下面主要看下RocketMQ是如何处理客户端请求的,经过上文的介绍,我们知道注册在客户端channel中的Handler NettyServerHandler
是该逻辑所在,下面我们看其源码:
//NettyServerHandler是NettyRemotingServer的内部类
//其在channelRead0方法中调用了NettyRemotingServer父类
//NettyRemotingAbstract的方法processMessageReceived
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}
NettyRemotingAbstract.processMessageReceived
定义如下:
//主要根据收到报文的类型为请求或者响应进行分别处理
//下面重点介绍对请求报文的处理,响应报文的处理比较简单
//根据报文中的id(RocketMQ为opaque)从等待响应的本地请求中
//取得其Future对象,设置返回结果,并解除阻塞
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
下面重点看NettyRemotingAbstract.processRequestCommand
函数的实现:
//NettyRemotingAbstract
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
//根据请求报文中的请求代码cmd.getCode()查询注册的processor,
//processor的注册我们后面介绍,如果没有匹配的,则使用默认的
//processor,即defaultRequestProcessor,这里返回的Pair则是
//processor和线程池的组合体,线程池就是NettyRemotingServer
//构造函数中定义的publicExecutor
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
//获取请求的id,放入响应结果中
final int opaque = cmd.getOpaque();
if (pair != null) {
//定义Runnable对象,Runnable.run方法则主要调用
//processor.processRequest方法进行请求处理,
//定义的Runnable对象会放到线程池publicExecutor进行
//处理
Runnable run = new Runnable() {
@Override
public void run() {
try {
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque);
response.markResponseType();
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(response.toString());
}
} else {
}
}
} catch (Throwable e) {
log.error("process request exception", e);
log.error(cmd.toString());
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
}
};
if (pair.getObject1().rejectRequest()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
return;
}
try {
//上面定义的Runnable被封装为RequestTask放入线程池中处理
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
if ((System.currentTimeMillis() % 10000) == 0) {
log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+ ", too many requests and system thread pool busy, RejectedExecutionException "
+ pair.getObject2().toString()
+ " request code: " + cmd.getCode());
}
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[OVERLOAD]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
} else {
//处理没有找到processor的异常情况
String error = " request type " + cmd.getCode() + " not supported";
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
response.setOpaque(opaque);
ctx.writeAndFlush(response);
log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
}
}
到这里我们对RocketMQ服务端的介绍已经基本完毕,下面我们看下服务端的最后一个问题,就是processor的注册,通过阅读源码可以发现,服务端processor的注册是在Controller中进行的,我们这里主要看下BrokerController
的实现,通过前文我们知道BrokerController
在initialize
方法中实例化了两个NettyRemotingServer
实例:remotingServer
和fastRemotingServer
,在实例化两个NettyRemotingServer
实例之后,在initialize
方法中调用了registerProcessor
方法:
//NettyRemotingServer
public void registerProcessor() {
//在介绍NettyRemotingAbstract.processRequestCommand
//方法时说道会根据请求报文中的请求类型代码查找processor,
//这里就为每个请求类型代码注册processor
//下面不再展开介绍,BrokerController注册了生产者发送消息、
//消费者拉取消息、消息查询、客户端管理(心跳、注册等)、消费管理
//事务消息处理等处理对应的processor
/**
* SendMessageProcessor
*/
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
sendProcessor.registerSendMessageHook(sendMessageHookList);
sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
//remotingServer和fastRemotingServer的不同就在这里,
//fastRemotingServer没有注册客户端拉取消息的processor,
//所以fastRemotingServer不能响应客户端拉取消息的请求报文
/**
* PullMessageProcessor
*/
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
/**
* QueryMessageProcessor
*/
NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
/**
* ClientManageProcessor
*/
ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
/**
* ConsumerManageProcessor
*/
ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
/**
* EndTransactionProcessor
*/
this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
//注册默认的processor
/**
* Default
*/
AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);
this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
}
从上面的介绍中可以知道remotingServer和fastRemotingServer的不同就是fastRemotingServer没有注册客户端拉取消息的processor,所以fastRemotingServer不能响应客户端拉取消息的请求报文,fast的含义应该就是fastRemotingServer不能响应客户端拉取消息的请求报文所以能够快速响应其他请求。
3 Netty通信客户端
RocketMQ中Netty通信客户端实现类为NettyRemotingClient
。
因为消息消费或者生产需要向Namesrv请求Broker信息,也需要连接Broker进行消息拉取、存储等,所以消息消费实现类DefaultMQPushConsumer
、DefaultMQPullConsumer
;消息生产者DefaultMQProducer
等,都会持有Netty客户端。
同样地,应为Broker需要从Namesrv发送注册等请求,所以也需要持有Netty客户端。
在了解了Netty框架的基本原理,结合着上面Netty通信服务端NettyRemotingServer
的介绍,NettyRemoingClient
的实现原理就十分容易理解,其实现和NettyRemotingServer
基本一致,无非是:配置线程池、Netty客户端Channel实现、配置handler、注册processor等,这里不再展开介绍。