Dubbo 服务引用过程(二)

在上一章中讲到了dubbo引用过程的第一步,在zk上注册和订阅。本章将接着上一章继续讲解之后的服务引用的步骤:

记得在上一章中我们看到了这段代码:

    if (enabled) {
        invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
    }
    //InvokerDelegete中包装的Invoker实际上是DubboInvoker
    public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        // create rpc invoker,这里的url对应的地址和接口否是provider信息,parameters是providerUrl和consumerUrl的汇总信息
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);
        return invoker;
    }
    
    /**
     * 此代码负责NIO框架Client创建及初始化,和提供者初始化相似,消费者的
     * 初始化也基本上围绕着Client、Handler、Channel对象的创建与不断装饰
     * 的过程,不同的是消费者底层是与提供者Server建立连接,此过程在remote
     * 层下完成,remote层可分为消息交换层与网路传输层即Exchanger与
     * Transporter层,还有,我们在说提供者初始化时,说过同个JVM中相同协议
     * 的服务共享一个Server,同样在消费者初始化时,引用同一个提供者的所有
     * 服务可以共享一个Client进行通信,这也就实现了Server-Client在同一个
     * 通道中进行通信,实现长连接的高效通信,但是在服务请求数据量比较大时
     * 或请求数比较多时,可以设置每服务每连接或每服务多连接可以提高通信效
     * 率,具体是通过消费者方connections=2设置连接数。所有消费者端Client
     * 有两种,一种是共享型Client,一种是创建型Client,当然共享型Client
     * 属于创建型Client一部分,下面具体说说这两种Client创建的细节,也是服
     * 务引用的重要细节
     */
    private ExchangeClient[] getClients(URL url){
        //是否共享连接
        boolean service_share_connect = false;
        int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
        //如果connections不配置,则共享连接,否则每服务每连接
        if (connections == 0){
            service_share_connect = true;
            connections = 1;
        }
        //有多少个连接就创建多少个Client
        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; i++) {
            if (service_share_connect){ //如果是共享连接的话(客户端只创建一个Client)
                clients[i] = getSharedClient(url);
            } else {
                clients[i] = initClient(url); //这个初始化Client很重要
            }
        }
        return clients;
    }
    
    /**
     * 获取共享连接
     * 此为创建共享型Client,共享型Client是指消费者引用同一
     * 提供者的服务时,使用同一个Client来提高通信效率
     */
    private ExchangeClient getSharedClient(URL url){
        String key = url.getAddress(); //消费者的地址
        ReferenceCountExchangeClient client = referenceClientMap.get(key);
        if ( client != null ){
            if ( !client.isClosed()){
                //如果这个共享client不等于null并且这个client没有关闭的话
                //将client内部的引用计数加1
                client.incrementAndGetCount();
                return client;
            } else {
                //虽然这个client不为空,client确实关闭状态的
                //于是要移除这个被关闭的client
                referenceClientMap.remove(key);
            }
        }
        //这个client内部已经建立了与服务端的特定端口号的连接
        ExchangeClient exchagneclient = initClient(url);
        //将client设置为幽灵Client,作用目前还不明确
        client = new ReferenceCountExchangeClient(exchagneclient, ghostClientMap);
        referenceClientMap.put(key, client);
        ghostClientMap.remove(key);
        return client; 
    }
    
    /**
     * 创建客户端新连接.
     */
    private ExchangeClient initClient(URL url) {
        //Client类型设置
        String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
        String version = url.getParameter(Constants.DUBBO_VERSION_KEY);
        boolean compatible = (version != null && version.startsWith("1.0."));
        url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() && compatible ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
        //默认开启heartbeat
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
        // BIO存在严重性能问题,暂时不允许使用
        if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported client type: " + str + "," +
                    " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
        }
        ExchangeClient client ;
        try {
            //延迟加载
            if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)){
                client = new LazyConnectExchangeClient(url ,requestHandler);
            } else {
                //在这里我们又看到这个Exchangers的门面了,这个要十分注意,因为在这之前讲解服务启动的时候有讲过这个,这个跟上次说的门面十分类似。
                client = Exchangers.connect(url ,requestHandler);
            }
        } catch (RemotingException e) {
            throw new RpcException("Fail to create remoting client for service(" + url
                    + "): " + e.getMessage(), e);
        }
        return client;
    }

上面提到过在包装的过程中,会初始化与服务端的连接,这一步是在NettyClient中完成的,实际上client = Exchangers.connect(url ,requestHandler);最后返回的实际类型也就是nettyClient,在NettyClient的构造函数中直接完成了与provider链接的建立,具体如下:

    public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException{
        //wrapChannelHandler(url, handler)操作即返回了后面提到的MultiMessageHandler
        super(url, wrapChannelHandler(url, handler));
    }
    
    //super(url, wrapChannelHandler(url, handler))
    public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
        //见面我简化了代码,只写了核心的逻辑
        super(url, handler);
        doOpen();
        connect();
    }
    
    //doOpen和connect都是调用子类NettyClient的方法
    @Override
    protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        bootstrap = new ClientBootstrap(channelFactory);
        // 设置全局的链接配置
        // @see org.jboss.netty.channel.socket.SocketChannelConfig
        bootstrap.setOption("keepAlive", true);
        bootstrap.setOption("tcpNoDelay", true);
        bootstrap.setOption("connectTimeoutMillis", getTimeout());
        //nettyHandler持有了NettyClient,NettyClient持有了上面说的多层Handler,最终还是委托到持有的Handler来处理
        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
    }
    
    
    // 初始化或者重新覆盖channel
    protected void doConnect() throws Throwable {
        long start = System.currentTimeMillis();
        //直接建立与provider的链接,建立链接之后就可以调用channel.write发送信息到provider
        ChannelFuture future = bootstrap.connect(getConnectAddress());
        try{
            //链接建立的超时时间默认为3s
            boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
            
            if (ret && future.isSuccess()) {
                Channel newChannel = future.getChannel();
                newChannel.setInterestOps(Channel.OP_READ_WRITE);
                try {
                    // 关闭旧的连接,因为一个NettyClient就对应一个链接,所以当创建新的链接的时候就代表要移除旧的链接
                    Channel oldChannel = NettyClient.this.channel; // copy reference
                    if (oldChannel != null) {
                        try {
                            if (logger.isInfoEnabled()) {
                                logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
                            }
                            oldChannel.close();
                        } finally {
                            NettyChannel.removeChannelIfDisconnected(oldChannel);
                        }
                    }
                //完成链接替换
                } finally {
                    if (NettyClient.this.isClosed()) {
                        try {
                            if (logger.isInfoEnabled()) {
                                logger.info("Close new netty channel " + newChannel + ", because the client closed.");
                            }
                            newChannel.close();
                        } finally {
                            NettyClient.this.channel = null;
                            NettyChannel.removeChannelIfDisconnected(newChannel);
                        }
                    } else {
                        NettyClient.this.channel = newChannel;
                    }
                }
            } else if (future.getCause() != null) {
                throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                        + getRemoteAddress() + ", error message is:" + future.getCause().getMessage(), future.getCause());
            } else {
                throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                        + getRemoteAddress() + " client-side timeout "
                        + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
                        + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
            }
        }finally{
            if (! isConnected()) {
                future.cancel();
            }
        }
    }

上面的过程就是建立链接的过程了,但是在这个过程中有个重要的地方就是多层Handler的包装,因为装饰者模式的层级比较多,所以我们还是采用一贯的措施:层层分解。
包装的层级依次是:

  • requestHandler
  • HeaderExchangeHandler 包装了requestHandler
  • DecodeHandler 包装了HeaderExchangeHandler
  • AllChannelHandler 包装了DecodeHandler
  • HeartbeatHandler 包装了AllChannelHandler
  • MultiMessageHandler 包装了HeartbeatHandler

最终调用分顺序是又下至上开始调用的。

    //MultiMessageHandler
    public void received(Channel channel, Object message) throws RemotingException {
        //dubbo可以将多个返回信息包装到一个信息里去返回,这时候就就用到MultiMessageHandler了。这么做的目的的是节省传输量
        if (message instanceof MultiMessage) {
            MultiMessage list = (MultiMessage)message;
            for(Object obj : list) {
                handler.received(channel, obj);
            }
        } else {
            handler.received(channel, message);
        }
    }
    
    //HeartbeatHandler,判断收到的信息类型是否是心跳类型,如果是心跳类型的话就不会触发后面的handler,直接在此结束请求。
    public void received(Channel channel, Object message) throws RemotingException {
        setReadTimestamp(channel);
        if (isHeartbeatRequest(message)) {
            Request req = (Request) message;
            // oneWay代表不需要返回数据的请求,twoWay代表需要返回数据的请求
            if (req.isTwoWay()) {
                Response res = new Response(req.getId(), req.getVersion());
                res.setEvent(Response.HEARTBEAT_EVENT);
                channel.send(res);
                if (logger.isInfoEnabled()) {
                    int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
                    if(logger.isDebugEnabled()) {
                        logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
                                        + ", cause: The channel has no data-transmission exceeds a heartbeat period"
                                        + (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
                    }
                }
            }
            //直接结束请求
            return;
        }
        //客户端只要知道服务端活这就可以了,不需要其他操作
        if (isHeartbeatResponse(message)) {
            if (logger.isDebugEnabled()) {
                logger.debug(
                    new StringBuilder(32)
                        .append("Receive heartbeat response in thread ")
                        .append(Thread.currentThread().getName())
                        .toString());
            }
            return;
        }
        handler.received(channel, message);
    }
    
    //AllChannelHandler就是将所有的逻辑都通过线程池来调度,其余的就是直接调用内部handler的received方法
    
    //DecodeHandler 处理编码和解码的逻辑
    
    //HeaderExchangeHandler 主要处理请求的接收,回调(此时不会再对requestHandler进行调用)
    
    

在这里我们只是粗略的了解到这些Handler的层级以及各自的关系,但其实在发起请求调用的时候首先进行的是Invoker的invoke调用,这些Handler只是处理请求结果的一些过程逻辑,也就是consumer端收到provider的返回结果之后的处理逻辑。
下面我们要看一下作为consumer是如何把请求发送出去的:
因为我们看到的接口类其实都是代理,所以真正的请求开始也是从代理开始的。

    //创建代理的时候指定了处理的InvokerInvocationHandler
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }
    
    //InvocationHandler的具体实现,在调用toString,hashCode和equals的时候会直接进行调用,不再产生远程的请求
    public class InvokerInvocationHandler implements InvocationHandler {

    private final Invoker<?> invoker;
    
    public InvokerInvocationHandler(Invoker<?> handler){
        this.invoker = handler;
    }

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        //invoke返回的结果是Result类型,recreate表示如果有结果的话就返回真正的调用结果,有异常的话就返回异常信息
        //我们在外面看到的Invocation信息都源自这里,主要包含了方法和参数信息
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }
    //抛开集群和mock内容来看的话这里的Invoker可以理解为DubboInvoker,所以我们来看看DubboInvoker的invoke操作:
    //首先是进入到AbstractInvoker的invoke操作:
    public Result invoke(Invocation inv) throws RpcException {
        if(destroyed) {
            throw new RpcException("Rpc invoker for service " + this + " on consumer " + NetUtils.getLocalHost() 
                                            + " use dubbo version " + Version.getVersion()
                                            + " is DESTROYED, can not be invoked any more!");
        }
        RpcInvocation invocation = (RpcInvocation) inv;
        //这里的this就是DubboInvoker本身
        invocation.setInvoker(this);
        //设置attachment参数,设置完之后就可以在服务端取到具体的参数值,很方便服务端和客户端进行通信
        if (attachment != null && attachment.size() > 0) {
            invocation.addAttachmentsIfAbsent(attachment);
        }
        Map<String, String> context = RpcContext.getContext().getAttachments();
        if (context != null) {
            invocation.addAttachmentsIfAbsent(context);
        }
        if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)){
            invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
        }
        //如果是异步的话需要设置一下InvocationId,方便在回调的时候找到对应的数据
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        
        
        try {
            return doInvoke(invocation);
            //将异常信息包装到RpcResult中,至于怎么处理这些异常信息,要看后面的逻辑了,原则上是RpcResult包含了调用的所有返回信息,包括异常信息
        } catch (InvocationTargetException e) { // biz exception
            Throwable te = e.getTargetException();
            if (te == null) {
                return new RpcResult(e);
            } else {
                if (te instanceof RpcException) {
                    ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
                }
                return new RpcResult(te);
            }
        } catch (RpcException e) {
            if (e.isBiz()) {
                return new RpcResult(e);
            } else {
                throw e;
            }
        } catch (Throwable e) {
            return new RpcResult(e);
        }
    }
}

    @Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);
        
        ExchangeClient currentClient;
        //同一个客户端只跟一个服务端建立一个长链接,也可能建立多个,如果是多个长链接的话就默认随机取一个
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            //是否是异步
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            //是否是单工
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
            //oneWay形式最简单,发出消息之后就不用管其他的事情了
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            //异步调用的话在发出请求之后后面还要取到结果,后面有机会可以讲一下具体如何取
            //本质上无论同步还是异步,在服务端都是异步的,调用方在调用发出之后就收到ResponseFuture对象,只不过异步的操作方式不会马上调用get方法获取结果,同步的方法会立马调用get获取数据
            } else if (isAsync) {
                ResponseFuture future = currentClient.request(inv, timeout) ;
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                return new RpcResult();
            } else {//同步非单工
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        }
        // 捕捉异常的步骤忽略掉
    }
    
    //上面的request就是调用到了HeaderExchangeChannel.request
    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        //创建request并设置req的属性,在生成的时候已经在req的内部创建了一个独有的ID
        Request req = new Request();
        req.setVersion("2.0.0");
        req.setTwoWay(true);
        //request=RpcInvocation
        req.setData(request);
        //创建默认的Future
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try{
            //继续调用channel的send,这里的channel其实就是NettyClient了,客户端的调用到这里其实已经结束了。接下来就是等待返回结果
            channel.send(req);
        }catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }
    //feature的ID非常重要,和request,response的ID保持一致,方便从返回的结果Response中找到对应的future
    public DefaultFuture(Channel channel, Request request, int timeout){
        this.channel = channel;
        this.request = request;
        this.id = request.getId();
        this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        // 放到全局的future容器中,以后方便根据ID寻找对应的feature
        FUTURES.put(id, this);
        //道理同上
        CHANNELS.put(id, channel);
    }
    
    private static final Map<Long, DefaultFuture> FUTURES   = new ConcurrentHashMap<Long, DefaultFuture>();


基本上到这里客户端调用的逻辑都讲到了,涉及到一些细节方面还没有进行梳理,但是大致上我们已经对于服务的调用有了一个轮廓性的认识,后面会逐篇展开其他细节的内容,希望在所有Dubbo内容都解释完的时候能有一个回归性的总结,把一个调用从开始到结束经过的所有过程都讲一下。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,376评论 6 491
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,126评论 2 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,966评论 0 347
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,432评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,519评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,792评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,933评论 3 406
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,701评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,143评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,488评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,626评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,292评论 4 329
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,896评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,742评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,977评论 1 265
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,324评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,494评论 2 348

推荐阅读更多精彩内容