pigeon源码分析-同步调用和异步调用

pigeon源码分析-同步调用和异步调用

Pigeon是美团点评内部广泛使用的一个分布式服务通信框架(RPC),本文所有分析基于 pigeon 开源版本:RPC framework of DIANPING,具体版本为:2.9.12-SNAPSHOT

概要:先从调用方视角 RPC 框架基本原理,然后介绍动态代理的生成,从这里切入之后再详细介绍同步、异步调用;

RPC就是要在分布式场景下完成这样一个动作: Result result = service.call(args) ,在这个= 等号背后 RPC 框架做了很多事。

RPC 调用在客户端视角下的分解

简单来说可以分成这几步:

  • Client client = findClient(); // 这一步主要是服务治理相关,本文不关注;
  • client.write(request);
  • Object resp = waitResponse();
  • Result result = convert2Result(resp);

pigeon 在客户端是如何生成代理的?

远程调用离不开动态代理,其实 pigeon service 在客户端视角就是生成了一个被调用接口的实现类,通过网络请求来返回数据,这样就好像实现类在本地一样;

生成代理的逻辑:
在 bean 初始化的时候,获取 Proxy 实例,代码如下:

// com.dianping.pigeon.remoting.invoker.config.spring.ReferenceBean#init
    public void init() throws Exception {
            // 进行初始化配置
        this.obj = ServiceFactory.getService(invokerConfig);
        // 略
    }

使用 JDK 动态代理,真正执行的逻辑都在 java.lang.reflect.InvocationHandler 中,

可以看到 invoke() 时执行的是 com.dianping.pigeon.remoting.invoker.service.ServiceInvocationProxy#extractResult,执行的逻辑是一个 FilterChain,成员见 com.dianping.pigeon.remoting.invoker.process.InvokerProcessHandlerFactory#init,每个 InvocationInvokeFilter 的作用基本见名知意,完成一些统计、上下文准备、监控等功能;

终于到了 RemoteCallInvokeFilter,这是执行远程调用最关键的一步,主要代码如下:

//com.dianping.pigeon.remoting.invoker.process.filter.RemoteCallInvokeFilter#invoke

    @Override
    public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext)
            throws Throwable {
            // 略
        try {
            switch (callMethod) {
                case SYNC:
                    // do call
                     break;
                case CALLBACK:
                    // do call
                    break;
                case FUTURE:
                    //do call
                    break;
                case ONEWAY:
                    //do call
                    break;
                default:
                    throw new BadRequestException("Call type[" + callMethod.getName() + "] is not supported!");

            }

            ((DefaultInvokerContext)invocationContext).setResponse(response);
            afterInvoke(invocationContext);
        } catch (Throwable t) {
            afterThrowing(invocationContext, t);
            throw t;
        }

        return response;
    }

可以看到,pigeon 支持四种调用类型:sync, callback, future, oneway

这里主要分析 sync 和 future,分别对应一般意义上的同步、异步调用模式,其他两种是在这两种上做了特殊处理;

SYNC-同步调用

同步调用,主要逻辑:

case SYNC:
    CallbackFuture future = new CallbackFuture();
    response = InvokerUtils.sendRequest(client, invocationContext.getRequest(), future);
    invocationContext.getTimeline().add(new TimePoint(TimePhase.Q));
    if (response == null) {
        response = future.getResponse(request.getTimeout());
    }
    break;

先初始化了一个 callbackFuture,之后 sendRequest(),最后获取 response

进一步查看 sendRequest() 实现:

com.dianping.pigeon.remoting.invoker.util.InvokerUtils#sendRequest(com.dianping.pigeon.remoting.invoker.Client, com.dianping.pigeon.remoting.common.domain.InvocationRequest, com.dianping.pigeon.remoting.invoker.concurrent.Callback)
 
 --> com.dianping.pigeon.remoting.invoker.AbstractClient#write
 实际的 doWrite() 有两种实现,HTTP / netty,这里看 NettyClient
 
 --> com.dianping.pigeon.remoting.netty.invoker.NettyClient#doWrite
 
 // 调用 netty client 写请求
     @Override
    public InvocationResponse doWrite(InvocationRequest request) throws NetworkException {
        NettyChannel channel = null;

        try {

            channel = channelPool.selectChannel();

            ChannelFuture future = channel.write0(request);

            afterWrite(request, channel);

            if (request.getMessageType() == Constants.MESSAGE_TYPE_SERVICE
                    || request.getMessageType() == Constants.MESSAGE_TYPE_HEART) {
                future.addListener(new MessageWriteListener(request, channel));
            }

        } catch (Exception e) {
            throw new NetworkException("[doRequest] remote call failed:" + request, e);
        }
        return null;
    }

仔细看这里永远返回 null,然后再看 sync 调用执行的下一步,就是 response = future.getResponse(request.getTimeout()); , 判断 sendRequest() 返回结果为 null,就从 future 获取结果,这两步就联系起来了;

而 getResponse(timeout) 所做的事情只有一件:while 循环等待到 isDone == true || timeout,这里不详细展示,看代码即可;

拓展一下,这里只做等待,那么返回值从哪里来呢?答案是等待 socket 的另一端也就是 server 回写;

回过头来看,remoteCallInvocationFilter 所做的就是写请求,然后等待接受返回值;

FUTURE-异步调用

Future 调用是 pigeon 提供的标准异步调用模式。使用方式如下:

//调用ServiceA的method1
serviceA.method1("aaa");
//获取ServiceA的method1调用future状态
Future future1OfServiceA = InvokerHelper.getFuture();
//调用ServiceA的method2
serviceA.method2("bbb");
//获取ServiceA的method2调用future状态
Future future2OfServiceA = InvokerHelper.getFuture();

//获取ServiceA的method2调用结果
Object result2OfServiceA = future2OfServiceA.get();
//获取ServiceA的method1调用结果
Object result1OfServiceA = future1OfServiceA.get();

简单来说就是:调用 —> 获取 Future —> 获取结果。

因为每次调用不用阻塞等待结果,而是拿到 Future 实例之后由开发者在合适的时机获取返回值,所以合理使用能提高并发性;

来看下实现:

case FUTURE:
    ServiceFutureImpl futureImpl = new ServiceFutureImpl(invocationContext, request.getTimeout());
    InvokerUtils.sendRequest(client, invocationContext.getRequest(), futureImpl);
    FutureFactory.setFuture(futureImpl);
    response = InvokerUtils.createFutureResponse(futureImpl);
    invocationContext.getTimeline().add(new TimePoint(TimePhase.Q));
break;

可以看到只有 sendRequest(),没有等待返回,开发文档中有介绍,Future 调用之后需要开发者自行通过 future.get(timeout); 获取返回值;

FutureFactory.setFuture(futureImpl); 这一步做了什么?

public class FutureFactory {
    private static ThreadLocal<Future<?>> threadFuture = new ThreadLocal<Future<?>>();

    public static void setFuture(Future<?> future) {
        threadFuture.set(future);
    }

其实就是将 Future 实例设置到 ThreadLocal 中,所以每次调用结束之后立即通过 Future future1OfServiceA = InvokerHelper.getFuture(); 获取Future实例,否则如果紧接着进行了下一次future调用,就会因为ThreadLocal 中的 future 被覆盖而无法正确获取返回值;

思考 :返回值的 Future 是什么作用?

答案:其实 pigeon 中的 Future 实现了 JDK 的标准语义,即:A Future represents the result of an asynchronous computation . 代表了一次异步计算的结果。简单来说,就是一个暂时代表结果的占位符。

需要结果的时候就去看看有没有 response 了,有就返回,没有就阻塞到超时时间到;

pigeon 同步调用与异步调用处理有哪些不同

同步、异步,都是先发送完请求,然后等待结果。

SYNC 模式下,立即等待直到拿到结果或者超时,然后将结果(超时情况下则是异常)返回给调用方;

FUTURE 模式下,立即返回个占位符 Future,这里的实现类是:com.dianping.pigeon.remoting.invoker.concurrent.ServiceFutureImpl#ServiceFutureImpl,将真正获取返回值的时机交给调用方控制;

这里也因为 Future 模式实现时使用了 ThreadLocal 暂时存储占位符,所以多次 Future 调用,一定要顺序获取结果;

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,504评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,434评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,089评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,378评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,472评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,506评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,519评论 3 413
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,292评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,738评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,022评论 2 329
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,194评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,873评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,536评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,162评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,413评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,075评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,080评论 2 352

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,646评论 18 139
  • 前言 很多朋友对异步编程都处于“听说很强大”的认知状态。鲜有在生产项目中使用它。而使用它的同学,则大多数都停留在知...
    星星在线阅读 2,856评论 2 39
  • “ 消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列...
    落羽成霜丶阅读 3,984评论 1 41
  • 接着上节 condition_varible ,本节主要介绍future的内容,练习代码地址。本文参考http:/...
    jorion阅读 14,782评论 1 5
  • 小七 13岁 爱吃甜食 体型却削瘦 身高比同龄人矮5公分。因为中学有早自习,每天要五点起床而小七不会扎...
    小婊七阅读 288评论 2 0