RPC之美团pigeon源码分析(三)调用方服务监听和调用

在此之前我们理清了pigeon服务方的初始化、注册和消息处理逻辑,本篇我们来看看pigeon调用方的实现。

第一部分我们先看看服务调用的实现。
服务调用示例:

@RestController
@RequestMapping("/common")
public class CommonController {

    @Autowired
    private CommonService commonService;

    @RequestMapping(value = "/hello")
    @ResponseBody
    public String hello(@RequestParam("name") String name) {
        System.out.println("enter hello");
        return commonService.hello(name);
    }
}

CommonService 就是服务方发布的服务接口,可以看到在调用方只需要引入相应服务的api jar包,就可以像调用本地方法一样调用对应的服务接口,这也是大部分RPC框架的实现效果。
CommonService 通过@Autowired注解在spring容器中找到对应的bean,我们来看看相应的bean配置

    <bean id="commonService" class="com.dianping.pigeon.remoting.invoker.config.spring.ReferenceBean" init-method="init">
        <!-- 服务全局唯一的标识url,默认是服务接口类名,必须设置 -->
        <property name="url" value="http://service.dianping.com/rpcserver/commonService_1.0.0" />
        <!-- 接口名称,必须设置 -->
        <property name="interfaceName" value="com.study.rpcserver.api.CommonService" />
        <!-- 超时时间,毫秒,默认5000,建议自己设置 -->
        <property name="timeout" value="2000" />
        <!-- 序列化,hessian/fst/protostuff,默认hessian,可不设置-->
        <property name="serialize" value="hessian" />
        <!-- 调用方式,sync/future/callback/oneway,默认sync,可不设置 -->
        <property name="callType" value="sync" />
        <!-- 失败策略,快速失败failfast/失败转移failover/失败忽略failsafe/并发取最快返回forking,默认failfast,可不设置 -->
        <property name="cluster" value="failfast" />
        <!-- 是否超时重试,默认false,可不设置 -->
        <property name="timeoutRetry" value="false" />
        <!-- 重试次数,默认1,可不设置 -->
        <property name="retries" value="1" />
    </bean>

ReferenceBean继承了spring的FactoryBean接口,来处理复杂bean的生成,通过getObject()方法来返回对应bean实例。接下来我们就以ReferenceBean为入口来切入pigeon调用方的实现思路。

    public void init() throws Exception {
        if (StringUtils.isBlank(interfaceName)) {
            throw new IllegalArgumentException("invalid interface:" + interfaceName);
        }
        this.objType = ClassUtils.loadClass(this.classLoader, this.interfaceName.trim());
        //服务调用相关的配置信息,就是我们对每一个接口服务在xml文件中的配置
        InvokerConfig<?> invokerConfig = new InvokerConfig(this.objType, this.url, this.timeout, this.callType,
                this.serialize, this.callback, this.group, this.writeBufferLimit, this.loadBalance, this.cluster,
                this.retries, this.timeoutRetry, this.vip, this.version, this.protocol);
        invokerConfig.setClassLoader(classLoader);
        invokerConfig.setSecret(secret);
        invokerConfig.setRegionPolicy(regionPolicy);

        if (!CollectionUtils.isEmpty(methods)) {
            Map<String, InvokerMethodConfig> methodMap = new HashMap<String, InvokerMethodConfig>();
            invokerConfig.setMethods(methodMap);
            for (InvokerMethodConfig method : methods) {
                methodMap.put(method.getName(), method);
            }
        }

        checkMock(); // 降级配置检查
        invokerConfig.setMock(mock);
        checkRemoteAppkey();
        invokerConfig.setRemoteAppKey(remoteAppKey);
        //生成接口的代理对象
        this.obj = ServiceFactory.getService(invokerConfig);
        configLoadBalance(invokerConfig);
    }
    //FactoryBean返回的bean实例
    public Object getObject() {
        return this.obj;
    }

ServiceFactory.getService(invokerConfig);根据配置的interfaceName生成一个java代理对象

    private static ServiceProxy serviceProxy = ServiceProxyLoader.getServiceProxy();

    public static <T> T getService(InvokerConfig<T> invokerConfig) throws RpcException {
        return serviceProxy.getProxy(invokerConfig);
    }

跟踪代码,进入AbstractServiceProxy.getProxy方法,核心代码如下:

    protected final static Map<InvokerConfig<?>, Object> services = new ConcurrentHashMap<InvokerConfig<?>, Object>();
    @Override
    public <T> T getProxy(InvokerConfig<T> invokerConfig) {
        //InvokerConfig实现了自定义equals和hashCode方法
        service = services.get(invokerConfig);
        if (service == null) {
            synchronized (interner.intern(invokerConfig)) {
                service = services.get(invokerConfig);
                if (service == null) {
                    //此处执行调用方的一些初始化逻辑,包括InvokerProcessHandlerFactory.init();初始化调用方Filter责任链等
                    InvokerBootStrap.startup();
                    //生成代理对象
                    service = SerializerFactory.getSerializer(invokerConfig.getSerialize()).proxyRequest(invokerConfig);
                    try {
                        //获取服务信息,创建Client实例
                        ClientManager.getInstance().registerClients(invokerConfig);
                    } catch (Throwable t) {
                        logger.warn("error while trying to setup service client:" + invokerConfig, t);
                    }
                    services.put(invokerConfig, service);
                }
        }
        return (T) service;
    }

AbstractSerializer.proxyRequest使用我们熟悉的JDK动态代理来生成服务接口的代理对象

    @Override
    public Object proxyRequest(InvokerConfig<?> invokerConfig) throws SerializationException {
        return Proxy.newProxyInstance(ClassUtils.getCurrentClassLoader(invokerConfig.getClassLoader()),
                new Class[] { invokerConfig.getServiceInterface() }, new ServiceInvocationProxy(invokerConfig,
                        InvokerProcessHandlerFactory.selectInvocationHandler(invokerConfig)));
    }
        //InvokerProcessHandlerFactory.selectInvocationHandler获取调用方请求责任链
        public static void init() {
        if (!isInitialized) {
            if (Constants.MONITOR_ENABLE) {
                registerBizProcessFilter(new RemoteCallMonitorInvokeFilter());
            }
            registerBizProcessFilter(new TraceFilter());
            registerBizProcessFilter(new DegradationFilter());
                        //关于ClusterInvokeFilter后文详细介绍
            registerBizProcessFilter(new ClusterInvokeFilter());
            registerBizProcessFilter(new GatewayInvokeFilter());
            registerBizProcessFilter(new ContextPrepareInvokeFilter());
            registerBizProcessFilter(new SecurityFilter());
                        //远程调用
            registerBizProcessFilter(new RemoteCallInvokeFilter());
            bizInvocationHandler = createInvocationHandler(bizProcessFilters);
            isInitialized = true;
        }
    }

    public static ServiceInvocationHandler selectInvocationHandler(InvokerConfig<?> invokerConfig) {
        return bizInvocationHandler;
    }

ServiceInvocationProxy继承了java.lang.reflect.InvocationHandler接口,invoke实现逻辑如下:

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
                //代理对象的非服务方法调用走原有逻辑
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(handler, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return handler.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return handler.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return handler.equals(args[0]);
        }
                //服务接口执行责任链处理逻辑
        return extractResult(handler.handle(new DefaultInvokerContext(invokerConfig, methodName, parameterTypes, args)),
                method.getReturnType());
    }

同服务端责任链的分析一样,我们首先重点看下RemoteCallInvokeFilter的处理逻辑,核心代码如下:

    @Override
    public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext)
            throws Throwable {
        Client client = invocationContext.getClient();
        InvocationRequest request = invocationContext.getRequest();
        InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig();    
        。。。
        //以同步调用场景分析下远程调用逻辑
        CallbackFuture future = new CallbackFuture();
        response = InvokerUtils.sendRequest(client, invocationContext.getRequest(), future);
        invocationContext.getTimeline().add(new TimePoint(TimePhase.Q));
        if (response == null) {
            response = future.getResponse(request.getTimeout());
        }        
        return response;
    }    

    public static InvocationResponse sendRequest(Client client, InvocationRequest request, Callback callback) {
        InvocationResponse response = response = client.write(request);
        return response;
    }

client.write(request);最终调用NettyClient或HttpInvokerClient的doWrite方法发送请求消息体。
至此我们理清了服务调用的逻辑,简单来说就是通过JDK动态代理来生成服务方接口对应的实例对象,在方法执行逻辑中调用远程服务。

但对于每一个服务接口,调用方是如何知道远程服务的访问地址的呢?以及新注册或者下线的服务地址,调用方如何得到即时通知?
接下来进入本篇第二部分,远程调用Client的初始化和调用方对服务信息的心跳监听。
以请求责任链的ClusterInvokeFilter为入口:

    public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext)
            throws Throwable {
        InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig();
                //失败策略cluster可配,默认为快速失败failfast
        Cluster cluster = ClusterFactory.selectCluster(invokerConfig.getCluster());
        if (cluster == null) {
            throw new IllegalArgumentException("Unsupported cluster type:" + cluster);
        }
        return cluster.invoke(handler, invocationContext);
    }

跟踪代码进入FailfastCluster.invoke方法,核心代码如下:

    private ClientManager clientManager = ClientManager.getInstance();
    @Override
    public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext)
            throws Throwable {
        InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig();
        //构造请求消息对象
        InvocationRequest request = InvokerUtils.createRemoteCallRequest(invocationContext, invokerConfig);
        //是否超时重试
        boolean timeoutRetry = invokerConfig.isTimeoutRetry();
        //重试次数
        int retry = invokerConfig.getRetries(invocationContext.getMethodName());
        //关于重试和重试次数的逻辑在此不做过多说明,只摘取主干代码
        //获取远程客户端
        Client remoteClient = clientManager.getClient(invokerConfig, request, null);
        //就是在这里设置的RemoteCallInvokeFilter中用到的客户端Client
        invocationContext.setClient(remoteClient);   
        try {
            //向后执行责任链
            return handler.handle(invocationContext);
        } catch (NetworkException e) {
            remoteClient = clientManager.getClient(invokerConfig, request, null);
            invocationContext.setClient(remoteClient);
            return handler.handle(invocationContext);
        }     
    }

ClientManager 为单例模式,我们看看内部实现

        //私有构造函数 
    private ClientManager() {
        this.providerAvailableListener = new ProviderAvailableListener();
        this.clusterListener = new DefaultClusterListener(providerAvailableListener);
        this.clusterListenerManager.addListener(this.clusterListener);
        providerAvailableThreadPool.execute(this.providerAvailableListener);
        RegistryEventListener.addListener(providerChangeListener);
        RegistryEventListener.addListener(registryConnectionListener);
        RegistryEventListener.addListener(groupChangeListener);
        registerThreadPool.getExecutor().allowCoreThreadTimeOut(true);
    }

    private RouteManager routerManager = DefaultRouteManager.INSTANCE;

    public Client getClient(InvokerConfig<?> invokerConfig, InvocationRequest request, List<Client> excludeClients) {
                //根据全局唯一标识url获取Client集合
        List<Client> clientList = clusterListener.getClientList(invokerConfig);
        List<Client> clientsToRoute = new ArrayList<Client>(clientList);
        if (excludeClients != null) {
            clientsToRoute.removeAll(excludeClients);
        }
                //根据负载均衡策略选取有效的Client
                //此处细节比较多,感兴趣的朋友可以自行细致浏览下源码,限于篇幅不一一讲解了
        return routerManager.route(clientsToRoute, invokerConfig, request);
    }

距离目标越来越近了,我们继续跟踪代码DefaultClusterListener的实现

    private ConcurrentHashMap<String, List<Client>> serviceClients = new ConcurrentHashMap<String, List<Client>>();

    public List<Client> getClientList(InvokerConfig<?> invokerConfig) {
        //根据url获取对应的Client集合
        List<Client> clientList = this.serviceClients.get(invokerConfig.getUrl());
        return clientList;
    }

问题来了,serviceClients是在什么时候创建的Client实例呢?
我们回顾下AbstractServiceProxy.getProxy中的一段逻辑:

                    try {
                        ClientManager.getInstance().registerClients(invokerConfig);
                    } catch (Throwable t) {
                        logger.warn("error while trying to setup service client:" + invokerConfig, t);
                    }

从异常信息我们可以清晰的看到,这里就是创建service client的入口,最终调用到DefaultClusterListener.addConnect添加Client映射关系到serviceClients。调用链路比较长,在此简单贴一下线程调用栈:


image.png

至此我们理清了Client的创建,接下来我们看看调用方的心跳监听。
我们直接连接注册中心zookeeper的相关类CuratorClient,用的是curator-framework-2.7.1.jar,这个ZK客户端功能很强大,可以非常方便的对具体的zk节点添加listener回调。

    private boolean newCuratorClient() throws InterruptedException {
                //根据zk地址创建zkClient
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString(address)
                .sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(connectionTimeout)
                .retryPolicy(new MyRetryPolicy(retries, retryInterval)).build();
                //监听连接状态,掉线重连
        client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                logger.info("zookeeper state changed to " + newState);
                if (newState == ConnectionState.RECONNECTED) {
                    RegistryEventListener.connectionReconnected();
                }
                monitor.logEvent(EVENT_NAME, "zookeeper:" + newState.name().toLowerCase(), "");
            }
        });
                //监听change事件!!!
        client.getCuratorListenable().addListener(new CuratorEventListener(this), curatorEventListenerThreadPool);
        client.start();
        boolean isConnected = client.getZookeeperClient().blockUntilConnectedOrTimedOut();
        CuratorFramework oldClient = this.client;
        this.client = client;
        close(oldClient);
        return isConnected;
    }

CuratorEventListener继承org.apache.curator.framework.api.CuratorListener,看下事件处理逻辑

    @Override
    public void eventReceived(CuratorFramework client, CuratorEvent curatorEvent) throws Exception {
        WatchedEvent event = (curatorEvent == null ? null : curatorEvent.getWatchedEvent());
                //过滤不敢兴趣的EventType
        if (event == null
                || (event.getType() != EventType.NodeCreated && event.getType() != EventType.NodeDataChanged
                        && event.getType() != EventType.NodeDeleted && event.getType() != EventType.NodeChildrenChanged)) {
            return;
        }
        try {
                        //解析节点路径并分类
            PathInfo pathInfo = parsePath(event.getPath());
                        
            if (pathInfo.type == ADDRESS) {//服务地址  
                addressChanged(pathInfo);
            } else if (pathInfo.type == WEIGHT) {//权重
                weightChanged(pathInfo);
            } else if (pathInfo.type == APP) {
                appChanged(pathInfo);
            } else if (pathInfo.type == VERSION) {
                versionChanged(pathInfo);
            } else if (pathInfo.type == PROTOCOL) {
                protocolChanged(pathInfo);
            } else if (pathInfo.type == HOST_CONFIG) {
                registryConfigChanged(pathInfo);
            }
        } catch (Throwable e) {
            logger.error("Error in ZookeeperWatcher.process()", e);
            return;
        }
    }
    /*
     * 1. Get newest value from ZK and watch again 2. Determine if changed
     * against cache 3. notify if changed 4. pay attention to group fallback
     * notification
     */
    private void addressChanged(PathInfo pathInfo) throws Exception {
        if (shouldNotify(pathInfo)) {
            String hosts = client.get(pathInfo.path);
            logger.info("Service address changed, path " + pathInfo.path + " value " + hosts);
            List<String[]> hostDetail = Utils.getServiceIpPortList(hosts);
            serviceChangeListener.onServiceHostChange(pathInfo.serviceName, hostDetail);
        }
        // Watch again
        client.watch(pathInfo.path);
    }

addressChanged难得加了注释,判断是否需要回调,回调。

本篇到此结束,内容较多,希望能对大家有所助益。

转载请备注原文链接。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,588评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,456评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,146评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,387评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,481评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,510评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,522评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,296评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,745评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,039评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,202评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,901评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,538评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,165评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,415评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,081评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,085评论 2 352

推荐阅读更多精彩内容