dubbo的异步调用和异步处理

https://gitbook.cn/books/5d72668ad9c2de3db4e8ecd3/index.html
微信扫码

Apache Dubbo 是阿里巴巴开源的高性能可扩展分布式 RPC 框架,在 Dubbo 2.7.0 版本其服务消费端异步调用实现中引入了 JDK8 中的 CompletableFuture 类实现了真正意义上的异步调用;服务提供端则为了避免不同服务共用同一个 Dubbo 内部线程池造成相互影响,提供了异步处理能力,从而实现了全链路异步。

  • dubbo中将调用端称为异步调用,而服务端称为异步执行,异步执行无异于
    本 Chat 内容如下:

  • Dubbo 提供的异步调用与异步处理模型

  • Dubbo 2.7.0 版本前服务消费端如何使用异步调用,及其缺点

  • Dubbo 2.7.0 版本后服务消费端如何使用异步调用

  • 服务提供端如何基于定义 CompletableFuture 签名的接口实现异步执行

  • 服务提供端如何使用 AsyncContext 实现异步执行

Dubbo 提供的服务消费端异步调用

正如 Dubbo 官网所说 dubbo 从 2.7.0 版本开始支持所有异步编程接口以 CompletableFuture 为基础,以便解决 2.7.0 之前版本异步调用的不便与功能缺失。

异步调用实现是基于 NIO 的非阻塞能力实现并行调用,服务消费端不需要启动多线程即可完成并行调用多个远程服务,相对多线程开销较小,如下图是 Dubbo 异步调用链路概要流程图图
在这里插入图片描述
  • 如上图,步骤 1 当服务消费端发起 RPC 调用时候使用的用户线程,用户线程首先使用步骤 2 创建了一个 Future 对象,然后步骤 3 会把请求转换为 IO 线程来执行,步骤 3 为异步过程,所以会马上返回,然后用户线程使用步骤 4 把其创建的 Future 对象设置到 RpcContext 中,其后用户线程就返回了。

  • 然后步骤 5 用户线程可以在某个时间点从 RpcContext 中获取设置的 Futrue 对象,并且使用步骤 6 来等待调用结果。

  • 步骤 7 当服务提供方返回结果后,调用方线程模型中的线程池中线程则会把结果使用步骤 8 写入到 Future,这时候用户线程就可以得到远程调用结果了。

上图中实线条箭头代表同步调用,虚线箭头表示异步调用。

Dubbo 2.7.0 版本前的异步调用

2.7.0 之前的异步调用能力比较弱,比如使用下面方式进行异步调用:

//1
ReferenceConfig<GreetingService> referenceConfig = new ReferenceConfig<GreetingService>();
...
//2\. 设置为异步
referenceConfig.setAsync(true);

//3\. 直接返回 null
GreetingService greetingService = referenceConfig.get();
System.out.println(greetingService.sayHello("world"));

//4.等待结果
java.util.concurrent.Future<String> future = RpcContext.getContext().getFuture();
System.out.println(future.get());

如上代码 2 设置调用为异步方式,设置为异步后,代码 3 直接调用 sayHello 方法会马上返回 null,如果要想获取远程调用的真正结果,需要使用代码 4 获取 future 对象,并且调用 future 的 get 系列方法来获取真正结果。

上面讲解的基于从返回的 future 调用 get()方法方式实现异步缺点是当业务线程调用 get()方法后业务线程会被阻塞,这不是我们想要的,所以 dubbo 提供了在 future 对象上设置回调函数的方式,让我们实现真正的异步调用。比如 APiAsyncConsumerForCallBack 类中:

// 14
ReferenceConfig<GreetingService> referenceConfig = new ReferenceConfig<GreetingService>();
...
// 15\. 设置为异步
referenceConfig.setAsync(true);

// 16\. 直接返回 null
GreetingService greetingService = referenceConfig.get();
System.out.println(greetingService.sayHello("world"));

// 17.异步执行回调函数
((FutureAdapter) RpcContext.getContext().getFuture()).getFuture().setCallback(new ResponseCallback() {

   //返回响应结果
    @Override
    public void done(Object response) {
        System.out.println("result:" + response);
    }
   //出现异常
    @Override
    public void caught(Throwable exception) {
        System.out.println("error:" + exception.getLocalizedMessage());
    }
});

如上代码可知这种方式当业务线程获取了 future 对象后,在其上设置了回调函数后马上就会返回,然后等服务提供端把响应结果写回调用方后,调用方的线程模型中的线程池中线程会把结果写入 future 对象后,回调回调函数,可知这个过程中是不需要业务线程干预的,实现了真正的异步调用。

上面我们介绍了 2.7.0 前提供的异步调用方式,Future 方式只支持阻塞式的 get()接口获取结果。虽然通过获取内置的 ResponseFuture 接口,可以设置回调。但获取 ResponseFuture 的 API 使用不便,并且无法满足让多个 Future 协同工作的场景,功能比较单一。

Dubbo 2.7.0 版本提供的异步调用

下面我们使用 Dubbo 2.7.0 版本提供的基于 CompletableFuture 的异步调用:

// 1
ReferenceConfig<GreetingService> referenceConfig = new ReferenceConfig<GreetingService>();
...
// 2\. 设置为异步
referenceConfig.setAsync(true);

// 3\. 直接返回 null
GreetingService greetingService = referenceConfig.get();
System.out.println(greetingService.sayHello("world"));

// 4.异步执行回调
CompletableFuture<String> future = RpcContext.getContext().getCompletableFuture();
future.whenComplete((v, t) -> {
    if (null != t) {
        t.printStackTrace();
    } else {
        System.out.println(v);
    }

});

如上代码 4,可以直接获取到 CompletableFuture,然后设置回调,基于 CompletableFuture 已有的能力,我们可以对 CompletableFuture 对象进行一系列的操作,以及可以让多个请求的 CompletableFuture 对象之间进行运算(比如合并两个 CompletableFuture 对象的结果为一个 CompletableFuture 对象等等)。

下面我们看看如何基于 CompletableFuture 的能力,来组合多个 Future 实现(多次 rpc 调用结果进行聚合),我们看下面代码:

public class APiAsyncConsumerForCompletableFuture3 {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 1.创建服务引用对象实例
        ReferenceConfig<GreetingService> referenceConfig = new ReferenceConfig<GreetingService>();
        // 2.设置应用程序信息
        referenceConfig.setApplication(new ApplicationConfig("first-dubbo-consumer"));
        // 3.设置服务注册中心
        referenceConfig.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));

        // 4.设置服务接口和超时时间
        referenceConfig.setInterface(GreetingService.class);
        referenceConfig.setTimeout(5000);

        // 5.设置服务分组与版本
        referenceConfig.setVersion("1.0.0");
        referenceConfig.setGroup("dubbo");

        // 6\. 设置为异步
        referenceConfig.setAsync(true);

        // 7.引用服务
        GreetingService greetingService = referenceConfig.get();

        // 8.异步执行,并设置回调
        System.out.println(greetingService.sayHello("hello"));
        CompletableFuture<String> future1 = RpcContext.getContext().getCompletableFuture();

        // 9.异步执行,并设置回调
        System.out.println(greetingService.sayHello("jiaduo"));
        CompletableFuture<String> future2 = RpcContext.getContext().getCompletableFuture();

        //10.组合两个 future
        future1.thenCombine(future2, (x,y)->x+y).whenComplete((v, t) -> {
            if (t != null) {
                t.printStackTrace();
            } else {
                System.out.println(Thread.currentThread().getName() + " " + v);
            }

        });

        // 11\. 挂起线程
        Thread.currentThread().join();
    }
}

如上代码 8 发起了一次异步调用并且从上下文中获取了 future1,如上代码 9 发起了一次异步调用并且从上下文中获取了 future2,代码 10 则使用函数 thenCombine 组合 future1 与 future2,然后设置一个回调函数,意在等两次异步调用结果都产生后,基于两者的结果作为回调函数的参数,然后执行回调函数。

Dubbo 提供的服务提供端异步处理

在 Provider 端非异步执行时候,其对调用方发来的请求的处理是在 Dubbo 内部线程模型的线程池中的线程来执行的,在 dubbo 中服务提供方提供的所有的服务接口都是使用这一个线程池来执行的,所以当一个服务执行比较耗时时候,可能会占用线程池中很多线程,这可能就会导致其他服务的处理收到影响。

Provider 端异步执行则将服务的处理逻辑从 Dubbo 内部线程池切换到业务自定义线程,避免 Dubbo 线程池中线程被过度占用,有助于避免不同服务间的互相影响。

但是需要注意 provider 端异步执行对节省资源和提升 RPC 响应性能是没有效果的,这时是因为如果服务处理比较耗时,虽然不是使用 Dubbo 框架内部线程处理,但是还是需要业务自己的线程来处理,另外副作用还有会新增一次线程上下文切换(从 dubbo 内部线程池线程切换到业务线程),模型如下图
在这里插入图片描述

如上图图中 Provider 端在同步提供服务时候是使用 Dubbo 内部线程池中线程来进行处理的,在异步执行时候则是使用业务自己设置的线程来从 dubbo 内部线程池中线程接收请求进行处理。

基于定义 CompletableFuture 签名的接口实现异步执行

基于定义 CompletableFuture 签名的接口实现异步执行需要服务提供端的服务方法返回值类型为 CompletableFuture,如下 GrettingServiceAsyncImpl 中服务提供端实现了该方式的异步执行:

public class GrettingServiceAsyncImpl implements GrettingServiceAsync {

    // 1.创建业务自定义线程池
    private final ThreadPoolExecutor bizThreadpool = new ThreadPoolExecutor(8, 16, 1, TimeUnit.MINUTES,
            new SynchronousQueue(), new NamedThreadFactory("biz-thread-pool"),
            new ThreadPoolExecutor.CallerRunsPolicy());

    // 2.创建服务处理接口,返回值为 CompletableFuture
    @Override
    public CompletableFuture<String> sayHello(String name) {

        // 2.1 为 supplyAsync 提供自定义线程池 bizThreadpool,避免使用 JDK 公用线程池(ForkJoinPool.commonPool())
        // 使用 CompletableFuture.supplyAsync 让服务处理异步化进行处理
        // 保存当前线程的上下文
        RpcContext context = RpcContext.getContext();

        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("async return ");
            return "Hello " + name + " " + context.getAttachment("company");
        }, bizThreadpool);
    }
}

如上可知基于定义 CompletableFuture 签名的接口实现异步执行需要接口方法返回值为 CompletableFuture,并且方法内部使用 CompletableFuture.supplyAsync 让本来该 Dubbo 内部线程线程处理的服务,转换为由业务自定义线程池中线程来处理,CompletableFuture.supplyAsync 方法会马上返回一个 CompletableFuture 对象(所以 dubbo 内部线程池线程会得到及时释放),传递的业务函数则由业务线程池 bizThreadpool 执行。

需要注意的是调用 sayHello 方法的线程是 Dubbo 线程模型线程池中线程,而业务处理是 bizThreadpool 中线程处理,所以代码 2.1 保存了 Rpc 上下文对象,以便在业务处理线程中使用。

使用 AsyncContext 实现异步执行

使用 AsyncContext 实现异步执行需要在服务提供端的服务方法内使用 RpcContext.startAsync()显示开启异步,如下服务提供端服务实现 GrettingServiceAsyncContextImpl 代码如下:

public class GrettingServiceAsyncContextImpl implements GrettingServiceRpcContext {

    // 1.创建业务自定义线程池
    private final ThreadPoolExecutor bizThreadpool = new ThreadPoolExecutor(8, 16, 1, TimeUnit.MINUTES,
            new SynchronousQueue(), new NamedThreadFactory("biz-thread-pool"),
            new ThreadPoolExecutor.CallerRunsPolicy());

    // 2.创建服务处理接口,返回值为 CompletableFuture
    @Override
    public String sayHello(String name) {

        // 2.1 开启异步
        final AsyncContext asyncContext = RpcContext.startAsync();
        bizThreadpool.execute(() -> {
            // 2.2 如果要使用上下文,则必须要放在第一句执行
            asyncContext.signalContextSwitch();
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 2.3 写回响应
            asyncContext.write("Hello " + name + " " + RpcContext.getContext().getAttachment("company"));
        });

        return null;
    }
}

如上代码 2.1 调用 RpcContext.startAsync()开启服务异步执行,然后返回一个 asyncContext,然后把服务处理任务提交到业务线程池后方法就直接返回了 null。

异步任务内首先执行代码 2.2 切换任务的上下文,然后休眠 500ms 充当任务执行,最后代码 2.3 把任务执行结果写入到异步上下文,可知其实现是参考了 Servlet3.0 的异步执行。

这里由于具体执行业务处理的逻辑不在 sayHello 方法所在的 dubbo 内部线程池线程,所以不会被阻塞。

为了探究其原理我们先看 RpcContext.startAsync()方法:

public static AsyncContext startAsync() throws IllegalStateException {
    //2.1.1 获取当前线程的上下文对象
    RpcContext currentContext = getContext();
    //2.1.2 为当前线程的上下文创建 AsyncContextImpl 实现
    if (currentContext.asyncContext == null) {
        currentContext.asyncContext = new AsyncContextImpl();
    }
    //2.2.3 启动异步上下文,并返回
    currentContext.asyncContext.start();
    return currentContext.asyncContext;
}

如上代码主要作用是为当前调用线程关联的 rpc 上下文对象关联 AsyncContextImpl,AsyncContextImpl 构造函数如下:

   public AsyncContextImpl() {
        this.storedContext = RpcContext.getContext();
        this.storedServerContext = RpcContext.getServerContext();
    }

可知其把当前线程上下文对象保存到了 AsyncContextImpl 内部(这是因为 ThreadLocal 变量不能跨线程访问,可以参考《Java 并发编程之美》一书)

AsyncContextImpl 创建完毕后会被启动,其中 AsyncContextImpl 的 start 方法为:

public void start() {
    if (this.started.compareAndSet(false, true)) {
        this.future = new CompletableFuture<>();
    }
}

如上代码可知是为 AsyncContextImpl 内的 future 对象创建一 CompletableFuture 对象,这里 started 是原子性 boolean 变量,是为了避免重复创建 CompletableFuture。

下面我们看 AsyncContextImpl 的,signalContextSwitch 方法,该方法是为了让 AsyncContextImpl 内保存的上下文信息传递到业务线程池线程中(也就是业务线程池中线程可以通过 RpcContext 来访问):

public void signalContextSwitch() {
        RpcContext.restoreContext(storedContext);
        RpcContext.restoreServerContext(storedServerContext);
    }

需要注意 signalContextSwitch 方法需要在业务线程中第一句来执行,以避免后面的业务处理使用 RpcContext 获取上下文信息时候出错。

下面我们在看 AsyncContextImpl 的 write 方法:

public void write(Object value) {
    if (isAsyncStarted() && stop()) {
        //异常
        if (value instanceof Throwable) {
            Throwable bizExe = (Throwable) value;
            future.completeExceptionally(bizExe);
        } else {
        //服务处理结果
            future.complete(value);
        }
    } else {
        throw new IllegalStateException("The async response has probably been wrote back by another thread, or the asyncContext has been closed.");
    }
}

如上代码当业务线程中服务处理完毕后,会把执行结果写入到在 start 方法创建的 CompletableFuture 对象内。

总结:当 dubbo 的线程模型中的线程池线程执行 sayHello()方法时候,方法内通过 RpcContext.startAsync()创建了一个 AsyncContextImpl 实例,然后调用其 start()方法创建了一个 CompletableFuture 对象;然后 sayHello()方法把业务处理任务添加到线程池后,直接返回 null;返回 null 后,结合上节 AbstractProxyInvoker 的 invoke()方法内代码 11.1 也返回了 null,然后代码 11.2 判断用 RpcContext.startAsync()开启了异步执行,所以使用((AsyncContextImpl)(rpcContext.getAsyncContext())).getInternalFuture()获取了 AsyncContextImpl 内的 future 对象。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,126评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,254评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,445评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,185评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,178评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,970评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,276评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,927评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,400评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,883评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,997评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,646评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,213评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,204评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,423评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,423评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,722评论 2 345

推荐阅读更多精彩内容