Dubbo 服务引用过程(二)



    if (enabled) {
        invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
    public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        // create rpc invoker,这里的url对应的地址和接口否是provider信息,parameters是providerUrl和consumerUrl的汇总信息
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        return invoker;
     * 此代码负责NIO框架Client创建及初始化,和提供者初始化相似,消费者的
     * 初始化也基本上围绕着Client、Handler、Channel对象的创建与不断装饰
     * 的过程,不同的是消费者底层是与提供者Server建立连接,此过程在remote
     * 层下完成,remote层可分为消息交换层与网路传输层即Exchanger与
     * Transporter层,还有,我们在说提供者初始化时,说过同个JVM中相同协议
     * 的服务共享一个Server,同样在消费者初始化时,引用同一个提供者的所有
     * 服务可以共享一个Client进行通信,这也就实现了Server-Client在同一个
     * 通道中进行通信,实现长连接的高效通信,但是在服务请求数据量比较大时
     * 或请求数比较多时,可以设置每服务每连接或每服务多连接可以提高通信效
     * 率,具体是通过消费者方connections=2设置连接数。所有消费者端Client
     * 有两种,一种是共享型Client,一种是创建型Client,当然共享型Client
     * 属于创建型Client一部分,下面具体说说这两种Client创建的细节,也是服
     * 务引用的重要细节
    private ExchangeClient[] getClients(URL url){
        boolean service_share_connect = false;
        int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
        if (connections == 0){
            service_share_connect = true;
            connections = 1;
        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; i++) {
            if (service_share_connect){ //如果是共享连接的话(客户端只创建一个Client)
                clients[i] = getSharedClient(url);
            } else {
                clients[i] = initClient(url); //这个初始化Client很重要
        return clients;
     * 获取共享连接
     * 此为创建共享型Client,共享型Client是指消费者引用同一
     * 提供者的服务时,使用同一个Client来提高通信效率
    private ExchangeClient getSharedClient(URL url){
        String key = url.getAddress(); //消费者的地址
        ReferenceCountExchangeClient client = referenceClientMap.get(key);
        if ( client != null ){
            if ( !client.isClosed()){
                return client;
            } else {
        ExchangeClient exchagneclient = initClient(url);
        client = new ReferenceCountExchangeClient(exchagneclient, ghostClientMap);
        referenceClientMap.put(key, client);
        return client; 
     * 创建客户端新连接.
    private ExchangeClient initClient(URL url) {
        String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
        String version = url.getParameter(Constants.DUBBO_VERSION_KEY);
        boolean compatible = (version != null && version.startsWith("1.0."));
        url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() && compatible ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
        // BIO存在严重性能问题,暂时不允许使用
        if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported client type: " + str + "," +
                    " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
        ExchangeClient client ;
        try {
            if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)){
                client = new LazyConnectExchangeClient(url ,requestHandler);
            } else {
                client = Exchangers.connect(url ,requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to create remoting client for service(" + url
                    + "): " + e.getMessage(), e);
        return client;

上面提到过在包装的过程中,会初始化与服务端的连接,这一步是在NettyClient中完成的,实际上client = Exchangers.connect(url ,requestHandler);最后返回的实际类型也就是nettyClient,在NettyClient的构造函数中直接完成了与provider链接的建立,具体如下:

    public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException{
        //wrapChannelHandler(url, handler)操作即返回了后面提到的MultiMessageHandler
        super(url, wrapChannelHandler(url, handler));
    //super(url, wrapChannelHandler(url, handler))
    public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
    protected void doOpen() throws Throwable {
        bootstrap = new ClientBootstrap(channelFactory);
        // 设置全局的链接配置
        // @see org.jboss.netty.channel.socket.SocketChannelConfig
        bootstrap.setOption("keepAlive", true);
        bootstrap.setOption("tcpNoDelay", true);
        bootstrap.setOption("connectTimeoutMillis", getTimeout());
        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
    // 初始化或者重新覆盖channel
    protected void doConnect() throws Throwable {
        long start = System.currentTimeMillis();
        ChannelFuture future = bootstrap.connect(getConnectAddress());
            boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
            if (ret && future.isSuccess()) {
                Channel newChannel = future.getChannel();
                try {
                    // 关闭旧的连接,因为一个NettyClient就对应一个链接,所以当创建新的链接的时候就代表要移除旧的链接
                    Channel oldChannel = NettyClient.this.channel; // copy reference
                    if (oldChannel != null) {
                        try {
                            if (logger.isInfoEnabled()) {
                                logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
                        } finally {
                } finally {
                    if (NettyClient.this.isClosed()) {
                        try {
                            if (logger.isInfoEnabled()) {
                                logger.info("Close new netty channel " + newChannel + ", because the client closed.");
                        } finally {
                            NettyClient.this.channel = null;
                    } else {
                        NettyClient.this.channel = newChannel;
            } else if (future.getCause() != null) {
                throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                        + getRemoteAddress() + ", error message is:" + future.getCause().getMessage(), future.getCause());
            } else {
                throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                        + getRemoteAddress() + " client-side timeout "
                        + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
                        + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
            if (! isConnected()) {


  • requestHandler
  • HeaderExchangeHandler 包装了requestHandler
  • DecodeHandler 包装了HeaderExchangeHandler
  • AllChannelHandler 包装了DecodeHandler
  • HeartbeatHandler 包装了AllChannelHandler
  • MultiMessageHandler 包装了HeartbeatHandler


    public void received(Channel channel, Object message) throws RemotingException {
        if (message instanceof MultiMessage) {
            MultiMessage list = (MultiMessage)message;
            for(Object obj : list) {
                handler.received(channel, obj);
        } else {
            handler.received(channel, message);
    public void received(Channel channel, Object message) throws RemotingException {
        if (isHeartbeatRequest(message)) {
            Request req = (Request) message;
            // oneWay代表不需要返回数据的请求,twoWay代表需要返回数据的请求
            if (req.isTwoWay()) {
                Response res = new Response(req.getId(), req.getVersion());
                if (logger.isInfoEnabled()) {
                    int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
                    if(logger.isDebugEnabled()) {
                        logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
                                        + ", cause: The channel has no data-transmission exceeds a heartbeat period"
                                        + (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
        if (isHeartbeatResponse(message)) {
            if (logger.isDebugEnabled()) {
                    new StringBuilder(32)
                        .append("Receive heartbeat response in thread ")
        handler.received(channel, message);
    //DecodeHandler 处理编码和解码的逻辑
    //HeaderExchangeHandler 主要处理请求的接收,回调(此时不会再对requestHandler进行调用)


    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    public class InvokerInvocationHandler implements InvocationHandler {

    private final Invoker<?> invoker;
    public InvokerInvocationHandler(Invoker<?> handler){
        this.invoker = handler;

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    public Result invoke(Invocation inv) throws RpcException {
        if(destroyed) {
            throw new RpcException("Rpc invoker for service " + this + " on consumer " + NetUtils.getLocalHost() 
                                            + " use dubbo version " + Version.getVersion()
                                            + " is DESTROYED, can not be invoked any more!");
        RpcInvocation invocation = (RpcInvocation) inv;
        if (attachment != null && attachment.size() > 0) {
        Map<String, String> context = RpcContext.getContext().getAttachments();
        if (context != null) {
        if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)){
            invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        try {
            return doInvoke(invocation);
        } catch (InvocationTargetException e) { // biz exception
            Throwable te = e.getTargetException();
            if (te == null) {
                return new RpcResult(e);
            } else {
                if (te instanceof RpcException) {
                    ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
                return new RpcResult(te);
        } catch (RpcException e) {
            if (e.isBiz()) {
                return new RpcResult(e);
            } else {
                throw e;
        } catch (Throwable e) {
            return new RpcResult(e);

    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);
        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        try {
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                return new RpcResult();
            } else if (isAsync) {
                ResponseFuture future = currentClient.request(inv, timeout) ;
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                return new RpcResult();
            } else {//同步非单工
                return (Result) currentClient.request(inv, timeout).get();
        // 捕捉异常的步骤忽略掉
    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        Request req = new Request();
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        }catch (RemotingException e) {
            throw e;
        return future;
    public DefaultFuture(Channel channel, Request request, int timeout){
        this.channel = channel;
        this.request = request;
        this.id = request.getId();
        this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        // 放到全局的future容器中,以后方便根据ID寻找对应的feature
        FUTURES.put(id, this);
        CHANNELS.put(id, channel);
    private static final Map<Long, DefaultFuture> FUTURES   = new ConcurrentHashMap<Long, DefaultFuture>();


