RocketMQ源码阅读(二)-通信模块

简介

在使用了消息队列的通信方之间, 总体的通信架构图如下:

在消息生产者, broker和消息消费者之间都会发生通信, RocketMQ的通信层是基于通信框架netty之上做了简单的协议封装. 本人阅读的RocketMQ版本是4.1.0-incubating-SNAPSHOT, 依赖的netty版本是4.0.36.Final. RocketMQ的代码结构图如下:

大体上分为broker, client, common filtersrv, namesrv和remoting等模块, 通信框架就封装在remoting模块中.

本文从协议格式, 消息编解码, 通信方式(同步, 异步, 单向)和通信流程(详细介绍同步调用流程)等方面介绍RocketMQ的通信模块.

设计要素

对于一个消息队列的RPC网络通信来说,要求并不像服务框架那样苛刻, 满足一下几点即可:

  • 编解码处理(负责通信中的编码和解码, 序列化, 通信协议设计等必要功能)
  • 双向消息处理(包括同步或异步, MQ中有异步消息的功能)
  • 单向消息处理(一般指心跳消息或者注册消息这样的类型)

类图

类层次结构

以RemotingService为最上层接口,提供了三个接口:

void start();
void shutdown();
void registerRPCHook(RPCHook rpcHook);

RemotingClient和RemotingServer都继承了RemotingService接口, 并增加了自己特有的接口.
RemotingClient:

void updateNameServerAddressList(final List<String> addrs);

List<String> getNameServerAddressList();

RemotingCommand invokeSync(final String addr, final RemotingCommand request,
        final long timeoutMillis) throws InterruptedException, RemotingConnectException,
        RemotingSendRequestException, RemotingTimeoutException;

void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
        RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;

void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
        RemotingTimeoutException, RemotingSendRequestException;

void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
        final ExecutorService executor);

boolean isChannelWriteable(final String addr);

NettyRemotingClient和NettyRemotingServer分别实现了RemotingClient和RemotingServer, 并且都继承了NettyRemotingAbstract类. NettyRemotingAbstract这个抽象类包含了很多公共数据处理,也包含了很多重要的数据结构, 这个稍后介绍.
其它还有NettyEvent, NettyEncoder, NettyDecoder和RemotingCommand等一系列通信过程中使用到的类.

协议设计与编码解码

在分析具体的api接口之前, 先介绍一下RocketMQ的通信协议是如何设计的.
具体的通信协议格式如下(重点理解, 能根据通信协议格式来对网络中读取的二进制数据进行编解码):

协议格式

消息共分为四个部分:

  • 1.消息长度(总长度, 四个字节存储, 占用一个int类型)
  • 2.序列化类型&消息头长度(同样占用一个int类型, 第一个字节表示序列化类型, 后面三个字节表示消息头长度)
  • 3.消息头数据
  • 4.消息主体数据

消息编码过程由类RemotingCommand中的encode()方法完成. 代码如下:

public ByteBuffer encode() {
    // 1> header length size
    int length = 4;    //消息总长度

    // 2> header data length
    //将消息头编码成byte[]
    byte[] headerData = this.headerEncode(); 
    //计算头部长度 
    length += headerData.length;              

    // 3> body data length
    if (this.body != null) {
        //消息主体长度
        length += body.length;                
    }
    //分配ByteBuffer, 这边加了4, 
    //这是因为在消息总长度的计算中没有将存储头部长度的4个字节计算在内
    ByteBuffer result = ByteBuffer.allocate(4 + length);  

    // length
    //将消息总长度放入ByteBuffer
    result.putInt(length);   

    // header length
    //将消息头长度放入ByteBuffer
    result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); 

    // header data
    //将消息头数据放入ByteBuffer
    result.put(headerData);    

    // body data;
    if (this.body != null) {
        //将消息主体放入ByteBuffer
        result.put(this.body); 
    }
    //重置ByteBuffer的position位置
    result.flip();     

    return result;
}

public static byte[] markProtocolType(int source, SerializeType type) {
    byte[] result = new byte[4];

    result[0] = type.getCode();
    result[1] = (byte) ((source >> 16) & 0xFF);
    result[2] = (byte) ((source >> 8) & 0xFF);
    result[3] = (byte) (source & 0xFF);
    return result;
}

相应的,decode的代码如下(也在类RemotingCommand中):

public static RemotingCommand decode(final byte[] array) {
    ByteBuffer byteBuffer = ByteBuffer.wrap(array);
    return decode(byteBuffer);
}

public static RemotingCommand decode(final ByteBuffer byteBuffer) {
    int length = byteBuffer.limit();
    int oriHeaderLen = byteBuffer.getInt();
    //计算消息头长度
    int headerLength = getHeaderLength(oriHeaderLen);

    byte[] headerData = new byte[headerLength];
    byteBuffer.get(headerData);

    RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));

    int bodyLength = length - 4 - headerLength;
    byte[] bodyData = null;
    if (bodyLength > 0) {
        bodyData = new byte[bodyLength];
        byteBuffer.get(bodyData);
    }
    cmd.body = bodyData;

    return cmd;
}
public static int getHeaderLength(int length) {
    return length & 0xFFFFFF;
}

通信方式和通信流程

接下来看一下RocketMQ的通信方式, RocketMQ支持三种方式的通信:

  • 同步(sync)
  • 异步(async)
  • 单向(oneway)

下面以Remoting模块中的一个单元测试为例, 说明同步调用的通信过程.

首先看一下同步调用的整体流程(客户端):


同步调用客户端流程图

下面详细分析流程图中涉及的源代码.
先由RemotingClient的实现类NettyRemotingClient构造请求(一个RemotingCommand实例), 然后根据addr获取相应的channel, 调用invokeSyncImpl方法, 将数据流转给抽象类NettyRemotingAbstract处理. 那么NettyRemotingClient是如何启动的呢, 示例代码如下:

//类:org.apache.rocketmq.remoting.RemotingServerTest
public static RemotingClient createRemotingClient() {
    NettyClientConfig config = new NettyClientConfig();
    RemotingClient client = new NettyRemotingClient(config);
    client.start();
    return client;
}

先实例化RemotingClient, 其构造函数如下:

public NettyRemotingClient(final NettyClientConfig nettyClientConfig) {
    this(nettyClientConfig, null);
}

public NettyRemotingClient(final NettyClientConfig nettyClientConfig, //
    final ChannelEventListener channelEventListener) {
    //调用父类的构造函数, 主要是设置单向调用和异步调用两种模式下的最大并发数
    super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
    this.nettyClientConfig = nettyClientConfig;
    //NettyEventExecuter处理线程会不断从eventQueue中读取消息, 调用注册的ChannelEventListener进行处理
    this.channelEventListener = channelEventListener;

    //执行用户回调函数的线程数
    int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();
    if (publicThreadNums <= 0) {
        publicThreadNums = 4;
    }
    //执行用户回调函数的线程池
    this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
        private AtomicInteger threadIndex = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
        }
    });

    //netty eventLoopGroupWorker
    this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
        private AtomicInteger threadIndex = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
        }
    });
}

接着启动NettyRemotingClient, 代码如下:

public void start() {
    //构建一个DefaultEventExecutorGroup, 用于处理netty handler中的操作
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(//
        nettyClientConfig.getClientWorkerThreads(), //
        new ThreadFactory() {

            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
            }
        });

    //初始化netty, 对netty的用法不做介绍
    Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)//
        .option(ChannelOption.TCP_NODELAY, true)
        .option(ChannelOption.SO_KEEPALIVE, false)
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
        .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
        .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
        .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(
                    defaultEventExecutorGroup,
                    //编码handler
                    new NettyEncoder(),
                    //解码handler
                    new NettyDecoder(),
                    //心跳检测
                    new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                    //连接管理handler,处理connect, disconnect, close等事件
                    new NettyConnectManageHandler(),
                    //处理接收到RemotingCommand消息后的事件, 收到服务器端响应后的相关操作
                    new NettyClientHandler());
            }
        });

    //定时扫描responseTable,获取返回结果,并且处理超时
    this.timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            try {
                NettyRemotingClient.this.scanResponseTable();
            } catch (Exception e) {
                log.error("scanResponseTable exception", e);
            }
        }
    }, 1000 * 3, 1000);

    if (this.channelEventListener != null) {
        this.nettyEventExecuter.start();
    }
}

接下来看消息同步调用的逻辑.
同步调用的单元测试:

public void testInvokeSync() throws InterruptedException, RemotingConnectException,
    RemotingSendRequestException, RemotingTimeoutException {
    //消息头
    RequestHeader requestHeader = new RequestHeader();
    requestHeader.setCount(1);
    requestHeader.setMessageTitle("Welcome");
    //构建请求
    RemotingCommand request = RemotingCommand.createRequestCommand(0, requestHeader);
    //同步发送消息
    RemotingCommand response = remotingClient.invokeSync("localhost:8888", request, 1000 * 3);
    assertTrue(response != null);
    assertThat(response.getLanguage()).isEqualTo(LanguageCode.JAVA);
    assertThat(response.getExtFields()).hasSize(2);
}

class RequestHeader implements CommandCustomHeader {
    @CFNullable
    private Integer count;

    @CFNullable
    private String messageTitle;

    @Override
    public void checkFields() throws RemotingCommandException {
    }

    public Integer getCount() {
        return count;
    }

    public void setCount(Integer count) {
        this.count = count;
    }

    public String getMessageTitle() {
        return messageTitle;
    }

    public void setMessageTitle(String messageTitle) {
        this.messageTitle = messageTitle;
    }
}
public interface CommandCustomHeader {
    void checkFields() throws RemotingCommandException;
}

首先构建消息头, 然后调用RemotingCommand.createRequestCommand创建一个request(一个RemotingCommand实例),然后调用remotingClient.invokeSync发送请求.

public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
    throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
    //根据addr获得channel
    final Channel channel = this.getAndCreateChannel(addr);
    if (channel != null && channel.isActive()) {
        try {
            //RocketMQ允许用户定义rpc hook,可在发送请求前,或者接受响应后执行
            if (this.rpcHook != null) {
                this.rpcHook.doBeforeRequest(addr, request);
            }
            //将数据流转给抽象类NettyRemotingAbstract
            RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis);
            //rpc hook
            if (this.rpcHook != null) {
                this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
            }
            return response;
        } catch (RemotingSendRequestException e) {
            log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
            this.closeChannel(addr, channel);
            throw e;
        } catch (RemotingTimeoutException e) {
            if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                this.closeChannel(addr, channel);
                log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
            }
            log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
            throw e;
        }
    } else {
        this.closeChannel(addr, channel);
        throw new RemotingConnectException(addr);
    }
}

可以看到, 真正发送请求的是invokeSyncImpl方法, 该方法定义在类NettyRemotingAbstract中, 代码如下:

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
        //相当于request ID, RemotingCommand会为每一个request产生一个request ID, 从0开始, 每次加1
        final int opaque = request.getOpaque();

        try {
            //根据request ID构建ResponseFuture
            final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
            //将ResponseFuture放入responseTable
            this.responseTable.put(opaque, responseFuture);
            final SocketAddress addr = channel.remoteAddress();
            //刷出数据
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                //消息发送后执行
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    if (f.isSuccess()) {
                        responseFuture.setSendRequestOK(true);
                        return;
                    } else {
                        responseFuture.setSendRequestOK(false);
                    }

                    responseTable.remove(opaque);
                    responseFuture.setCause(f.cause());
                    responseFuture.putResponse(null);
                    PLOG.warn("send a request command to channel <" + addr + "> failed.");
                }
            });
            //等待服务器端响应结果
            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
            if (null == responseCommand) {
                if (responseFuture.isSendRequestOK()) {
                    throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                        responseFuture.getCause());
                } else {
                    throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
                }
            }

            return responseCommand;
        } finally {
            this.responseTable.remove(opaque);
        }
    }

从代码中可以看到, 即使是同步调用模式, 在RocketMQ内部依然是采用异步的方式完成. 客户端的流程大体就是如此, 下面介绍服务器端接收到请求后的处理流程.
先看流程图:


服务器端消息处理流程图

先由Netty接收消息, 接着由handler将数据流转给NettyRemotingServer处理. 先看如何初始化一个RemotingServer. 示例代码如下:

public static RemotingServer createRemotingServer() throws InterruptedException {
    NettyServerConfig config = new NettyServerConfig();
    //初始化RemotingServer, 此处的逻辑与RemotingClient大体相当
    RemotingServer remotingServer = new NettyRemotingServer(config);
    //注册一个处理器,根据requestCode, 获取处理器,处理请求
    remotingServer.registerProcessor(0, new NettyRequestProcessor() {
        @Override
        public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
            request.setRemark("Hi " + ctx.channel().remoteAddress());
            return request;
        }

        @Override
        public boolean rejectRequest() {
            return false;
        }
    }, Executors.newCachedThreadPool());
    //启动RemotingServer
    remotingServer.start();

    return remotingServer;
}

首先实例化一个NettyRemotingServer对象, 此逻辑与NettyRemotingClient大致相当. 接着, 在remotingServer启动之前注册一个processor用于处理对应requestcode的处理器, 示例中用的是0, 这与remotingClient示例中的code是对应的(RemotingCommand.createRequestCommand(0, requestHeader)), 当remotingServer收到code=0的请求时,会使用这个processor去处理请求. 接着
启动RemotingServer, 这个过程大致就是一个启动netty ServerBootstrap的过程, 代码如下:

public void start() {
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
        nettyServerConfig.getServerWorkerThreads(),
        new ThreadFactory() {

            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
            }
        });

    ServerBootstrap childHandler =
        this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 1024)
            .option(ChannelOption.SO_REUSEADDR, true)
            .option(ChannelOption.SO_KEEPALIVE, false)
            .childOption(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
            .option(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
            .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(
                        defaultEventExecutorGroup,
                        new NettyEncoder(),
                        new NettyDecoder(),
                        new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                        new NettyConnetManageHandler(),
                        new NettyServerHandler());
                }
            });

    if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
        childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
    }

    try {
        ChannelFuture sync = this.serverBootstrap.bind().sync();
        InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
        this.port = addr.getPort();
    } catch (InterruptedException e1) {
        throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
    }

    if (this.channelEventListener != null) {
        this.nettyEventExecuter.start();
    }

    this.timer.scheduleAtFixedRate(new TimerTask() {

        @Override
        public void run() {
            try {
                NettyRemotingServer.this.scanResponseTable();
            } catch (Exception e) {
                log.error("scanResponseTable exception", e);
            }
        }
    }, 1000 * 3, 1000);
}

整个启动过程与RemotingClient类似, 使用的handler也类似, 区别在于RemotingServerz. 服务器端以后添加的handler是NettyServerHandler(客户端用的是NettyClientHandler). 服务器端接收到请求后, 对消息的处理逻辑在NettyRemotingAbstract中(此处省略了一大波netty框架接收到消息后的数据流转过程), 如下:

public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
    //根据RemotingCommand中的code获取processor和ExecutorService
    final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
    final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
    final int opaque = cmd.getOpaque();

    if (pair != null) {
        Runnable run = new Runnable() {
            @Override
            public void run() {
                try {
                    //rpc hook
                    RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
                    if (rpcHook != null) {
                        rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                    }
                    //processor处理请求
                    final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
                    //rpc hook
                    if (rpcHook != null) {
                        rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
                    }

                    if (!cmd.isOnewayRPC()) {
                        if (response != null) {
                            response.setOpaque(opaque);
                            response.markResponseType();
                            try {
                                ctx.writeAndFlush(response);
                            } catch (Throwable e) {
                                PLOG.error("process request over, but response failed", e);
                                PLOG.error(cmd.toString());
                                PLOG.error(response.toString());
                            }
                        } else {

                        }
                    }
                } catch (Throwable e) {
                    if (!"com.aliyun.openservices.ons.api.impl.authority.exception.AuthenticationException"
                        .equals(e.getClass().getCanonicalName())) {
                        PLOG.error("process request exception", e);
                        PLOG.error(cmd.toString());
                    }

                    if (!cmd.isOnewayRPC()) {
                        final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, //
                            RemotingHelper.exceptionSimpleDesc(e));
                        response.setOpaque(opaque);
                        ctx.writeAndFlush(response);
                    }
                }
            }
        };

        if (pair.getObject1().rejectRequest()) {
            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                "[REJECTREQUEST]system busy, start flow control for a while");
            response.setOpaque(opaque);
            ctx.writeAndFlush(response);
            return;
        }

        try {
            //封装requestTask
            final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
            //想线程池提交requestTask
            pair.getObject2().submit(requestTask);
        } catch (RejectedExecutionException e) {
            if ((System.currentTimeMillis() % 10000) == 0) {
                PLOG.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) //
                    + ", too many requests and system thread pool busy, RejectedExecutionException " //
                    + pair.getObject2().toString() //
                    + " request code: " + cmd.getCode());
            }

            if (!cmd.isOnewayRPC()) {
                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                    "[OVERLOAD]system busy, start flow control for a while");
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
            }
        }
    } else {
        String error = " request type " + cmd.getCode() + " not supported";
        //构建response
        final RemotingCommand response =
            RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
        response.setOpaque(opaque);
        ctx.writeAndFlush(response);
        PLOG.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
    }
}

服务器端的流程大体如此.
将客户端和服务器端联合起来的流程图如下:

同步调用整体流程图

异步调用和单项调用的原理与同步调用大致相当, 此处不再重复介绍.

其他部分的处理过程

前文中讲到, 每次有消息需要发送, 就会生成resposneFuture用于接收消息回应, 但是如果始终没有收到回应, Map(scanResponseTable)中的responseFuture就会堆积.
这个时候就需要一个线程来专门做Map的清理回收, 即前文提到的定时扫描responseTable的任务, 这个线程会1s调用一次来检查所有的responseFuture, 判断是否有效, 是否已经得到返回, 并进行相应的处理. 代码如下:

//类NettyRemotingAbstract
public void scanResponseTable() {
    final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
    Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<Integer, ResponseFuture> next = it.next();
        ResponseFuture rep = next.getValue();

        if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
            rep.release();
            it.remove();
            rfList.add(rep);
            PLOG.warn("remove timeout request, " + rep);
        }
    }

    for (ResponseFuture rf : rfList) {
        try {
            executeInvokeCallback(rf);
        } catch (Throwable e) {
            PLOG.warn("scanResponseTable, operationComplete Exception", e);
        }
    }
}

总结

消息队列的网络通信模块总的来说并不复杂, 比较关键的几个部分就是协议格式的设计, 维护request ID和responseFuture的对应关系, 超时处理等几个方面.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,817评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,329评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,354评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,498评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,600评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,829评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,979评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,722评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,189评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,519评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,654评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,329评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,940评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,762评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,993评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,382评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,543评论 2 349

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,633评论 18 139
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 171,787评论 25 707
  • 【日精进:小煜妈打卡第173天】-20171125 今天和小叔小姑等两三家人一起回老家,一来是“常回家看看...
    b79ddab78458阅读 40评论 0 0
  • 悠悠孤月轮,寥寥星河浪。 日日守如一,千年犹在傍。 今夜短相逢,喜鹊连桥唱。 明朝久离别,唯愿人无恙。 若无两心知...
    君怀璧阅读 954评论 37 52
  • 听陕北当地居民说,他们修窑不打地基,直接就在原土上砌,顶多打个灰土已经算好的了,不会存在坍塌问题。 于是,我们就照...
    刘冰杰阅读 2,366评论 0 0