如何基于Dubbo实现全异步调用链

本文回顾了 2.6.x 版本的异步实现,然后引出了 2.7.0 版本基于 CompletableFuture 的异步编程方式。

基于Dubbo实现全异步编程,是在2.7.0版本中对现有异步方式增强后新引入的功能。本文先是回顾2.6.x及之前版本对异步的支持情况及存在的问题,引出了2.7.0版本基于CompletableFuture做了哪些针对性的增强,通过几个示例详细阐述了增强后的异步编程的使用方式,最后总结了引入异步模式带来的新问题及Dubbo的解决方法。通过阅读这篇文章,可以很容易的基于Dubbo2.7.0+版本实现一个全异步的远程服务调用链路。

从3.0.0版本开始,Dubbo框架提供了对Reactive编程范式的支持,除了编程接口之外,在跨进程的RPC通信中引入了Reactive的语义。如果你所在的环境需要使用Reactive编程范式,或者你的RPC调用需要支持流式传输,Reactive应该会给你带来帮助,具体请参考发布在阿里巴巴中间件公众号上的响应式编程支持相关文章。

注意,你可能并不是总需要Reactive的语义,尤其是在RPC的场景,CompletableFuture本身也能带给你Reactive模式的编程模型,在选择Reactive(RxJava、Reactor之类)而不是理解及使用成本更低的CompletableFuture前,请尝试关注以下问题:

你是请求/响应是一次性传输的还是流式传输的,一个明显特征是你定义的数据类型是 List<String> 还是 Stream<String>
你的RPC请求有没有要求是Cold,即在subscribe后触发,因为CompletableFuture总是hot的
你依赖的编程上下文中是否已经在大量使用Reactive的编程接口
你是否需要Rx框架提供的更丰富的Operator,而这点和1又是密切相关的

2.6.x版本之前的异步方式

在2.6.x及之前的版本提供了一定的异步编程能力,包括Consumer端异步调用、参数回调、事件通知等,在上面的文档链接中有关于使用方式的简单介绍和Demo。

关于参数回调,其本质上是一种服务端的数据推送能力,这是终端应用很常见的一种需求,关于这部分的重构计划,不在本文讨论范围。

但当前的异步方式存在以下问题:

  • Future获取方式不够直接
  • Future接口无法实现自动回调,而自定义ResponseFuture虽支持回调但支持的异步场景有限,如不支持Future间的相互协调或组合等
    不支持Provider端异步
  • 以Consumer端异步使用方式为例:

定义一个普通的同步接口并声明支持异步调用

public interface FooService {
    String findFoo(String name);
}
<dubbo:reference id="fooService" interface="com.alibaba.foo.FooService">
      <dubbo:method name="findFoo" async="true" />
</dubbo:reference>

通过RpcContext获取Future

// 此调用会立即返回null
fooService.findFoo(fooId);
// 拿到调用的Future引用,当结果返回后,会被通知和设置到此Future
Future<Foo> fooFuture = RpcContext.getContext().getFuture();
fooFuture.get();

// 此调用会立即返回null
fooService.findFoo(fooId);
// 拿到Dubbo内置的ResponseFuture并设置回调
ResponseFuture future = ((FutureAdapter)RpcContext.getContext().getFuture()).getFuture();
future.setCallback(new ResponseCallback() {
    @Override
    public void done(Object response) {
        System.out.print(response);
    }

    @Override
    public void caught(Throwable exception) {
        exception.printStackTrace();
    }
});

从这个简单的示例我们可以体会到一些使用中的不便之处:

  1. findFoo的同步接口,不能直接返回代表异步结果的Future,通过RpcContext进一步获取。
  2. Future只支持阻塞式的get()接口获取结果。
  3. 通过获取内置的ResponseFuture接口,可以设置回调。但获取ResponseFuture的API使用不便,且仅支持设置回调其他异步场景均不支持,如多个Future协同工作的场景等。

2.7.0基于CompletableFuture的增强

了解Java中Future演进历史的同学应该知道,Dubbo 2.6.x及之前版本中使用的Future是在java 5中引入的,所以存在以上一些功能设计上的问题,而在java 8中引入的CompletableFuture进一步丰富了Future接口,很好的解决了这些问题。

Dubbo在2.7.0版本已经升级了对Java 8的支持,同时基于CompletableFuture对当前的异步功能进行了增强。

1. 支持直接定义返回CompletableFuture的服务接口。通过这种类型的接口,我们可以更自然的实现Consumer、Provider端的异步编程。

public interface AsyncService {
    CompletableFuture<String> sayHello(String name);
}

2. 如果你不想将接口的返回值定义为Future类型,或者存在定义好的同步类型接口,则可以选择重载原始方法并为新方法定义CompletableFuture类型返回值。

public interface GreetingsService {
    String sayHi(String name);
}
public interface GreetingsService {
    String sayHi(String name);
    // 为了保证方法级服务治理规则依然有效,建议保持方法名不变: sayHi
    // 使用default实现,避免给服务端提供者带来额外实现成本
    // boolean placeHoler只是为了实现重载而增加,只要Java语法规则允许,你可以使用任何方法重载手段
    default CompletableFuture<String> sayHi(String name, boolean placeHolder) {
      return CompletableFuture.completedFuture(sayHello(name));
    }
}

这样,Provider依然可以只实现sayHi方法;而Consumer通过直接调用新增的sayHi重载方法可以拿到一个Future实例。

3. 如果你的原始接口定义是同步的,这时要实现Provider端异步,则可以使用AsyncContext(类似Servlet 3.0里的AsyncContext的编程接口)。

注意:在已有CompletabeFuture返回类型的接口上,不建议再使用AsyncContext,请直接利用CompletableFuture带来的异步能力。

public interface AsyncService {
    String sayHello(String name);
}
public class AsyncServiceImpl implements AsyncService {
    public String sayHello(String name) {
        final AsyncContext asyncContext = RpcContext.startAsync();
        new Thread(() -> {
            asyncContext.write("Hello " + name + ", response from provider.");
        }).start();
        return null;
    }
}

在方法体的开始RpcContext.startAsync()启动异步,并开启新线程异步的执行业务逻辑,在耗时操作完成后通过asyncContext.write将结果写回。

4. RpcContext直接返回CompletableFuture

CompletableFuture<String> f = RpcContext.getContext().getCompletableFuture();

以上所有的增强,是在兼容已有异步编程的基础上进行的,因此基于2.6.x版本编写的异步程序不用做任何改造即可顺利编译通过。

如何实现一个全异步的Dubbo服务调用链。

示例1:CompletableFuture类型接口

CompletableFuture类型的接口既可以用作同步调用,也可以实现Consumer或Provider的异步调用。本示例实现了Consumer和Provider端异步调用,代码参见dubbo-samples-async-original-future。

1.定义接口

public interface AsyncService {
    CompletableFuture<String> sayHello(String name);
}

注意接口的返回类型是CompletableFuture<String>。

2.Provider端

实现

public class AsyncServiceImpl implements AsyncService {
    public CompletableFuture<String> sayHello(String name) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "async response from provider.";
        });
    }
}

可以看到这里通过supplyAsync将业务代码切换到了新的线程执行,因此实现了Provider端异步。

配置

<bean id="asyncService" class="com.alibaba.dubbo.samples.async.impl.AsyncServiceImpl"/>
<dubbo:service interface="com.alibaba.dubbo.samples.async.api.AsyncService" ref="asyncService"/>

配置方式和普通接口是一样的。

3.Consumer端

配置

<dubbo:reference id="asyncService" timeout="10000" interface="com.alibaba.dubbo.samples.async.api.AsyncService"/>

配置方式和普通接口是一样的。

调用远程服务

public static void main(String[] args) throws Exception {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/async-consumer.xml"});
        context.start();
        final AsyncService asyncService = (AsyncService) context.getBean("asyncService");

        CompletableFuture<String> future = asyncService.sayHello("async call request");
        future.whenComplete((v, t) -> {
            if (t != null) {
                t.printStackTrace();
            } else {
                System.out.println("Response: " + v);
            }
        });
        System.out.println("Executed before response return.");
        System.in.read();
    }
CompletableFuture<String> future = asyncService.sayHello("async call request");很自然的返回了Future示例,这样就实现了Consumer端的异步服务调用。

示例2:重载同步接口

这个示例演示了如何在同步接口的基础上,通过增加重载方法实现消费端的异步调用,具体代码参见地址dubbo-samples-async-generated-future

定义接口

@DubboAsync
public interface GreetingsService {
    String sayHi(String name);
}

修改接口,增加重载方法

public interface GreetingsService {
    String sayHi(String name);

    default CompletableFuture<String> sayHi(String name, boolean isAsync) {
      return CompletableFuture.completedFuture(sayHello(name));
    }
}

Provider端

配置

<bean id="greetingsService" class="com.alibaba.dubbo.samples.async.impl.GreetingsServiceImpl"/>
<dubbo:service interface="com.alibaba.dubbo.samples.api.GreetingsService" ref="greetingsService"/>

服务实现

public class GreetingsServiceImpl implements GreetingsService {
    @Override
    public String sayHi(String name) {
        return "hi, " + name;
    }
}

Consumer端

配置

 <dubbo:reference id="greetingsService" interface="com.alibaba.dubbo.samples.api.GreetingsService"/>

调用服务

 public static void main(String[] args) throws Exception {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/async-consumer.xml"});
        context.start();

        GreetingsService greetingsService = (GreetingsService) context.getBean("greetingsService");
        CompletableFuture<String> future = greetingsService.sayHi("async call reqeust", true);
        System.out.println("async call ret :" + future.get());

        System.in.read();
    }

这样,我们就可以直接使用CompletableFuture<String> future = greetingsService.sayHi("async call reqeust", true);,直接返回CompletableFuture。

示例3:使用AsyncContext

本示例演示了如何在同步接口的基础上,通过AsyncContext实现Provider端异步执行,示例代码参见 dubbo-samples-async-provider

之前已经提到过,已经是CompletableFuture签名的接口,要实现Provider端异步没必要再用AsyncContext。

定义接口

public interface AsyncService {
    String sayHello(String name);
}

Provider端,和普通provider端配置完全一致

配置

<bean id="asyncService" class="com.alibaba.dubbo.samples.async.impl.AsyncServiceImpl"/>
<dubbo:service async="true" interface="com.alibaba.dubbo.samples.async.api.AsyncService" ref="asyncService"/>

异步执行实现

public class AsyncServiceImpl implements AsyncService {
    public String sayHello(String name) {
        final AsyncContext asyncContext = RpcContext.startAsync();
        new Thread(() -> {
            asyncContext.signalContextSwitch();
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            asyncContext.write("Hello " + name + ", response from provider.");
        }).start();
        return null;
    }
}

Consumer端

配置

<dubbo:reference id="asyncService" interface="com.alibaba.dubbo.samples.async.api.AsyncService"/>

服务调用

 public static void main(String[] args) throws Exception {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/async-consumer.xml"});
        context.start();

        AsyncService asyncService = (AsyncService) context.getBean("asyncService");
        System.out.println(asyncService.sayHello("async call request"));

        System.in.read();
    }

异步引入的新问题

Filter链
以下是一次普通Dubbo调用的完整Filter链(Filter链路图待补充)。

而采用异步调用后,由于异步结果在异步线程中单独执行,所以流经后半段Filter链的Result是空值,当真正的结果返回时已无法被Filter链处理。

为了解决这个问题,2.7.0中为Filter增加了回调接口onResponse。

以下是一个扩展Filter并支持异步Filter链的例子

@Activate(group = {Constants.PROVIDER, Constants.CONSUMER})
public class AsyncPostprocessFilter implements Filter {

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        return invoker.invoke(invoker, invocation);
    }

    @Override
    public Result onResponse(Result result, Invoker<?> invoker, Invocation invocation) {
        System.out.println("Filter get the return value: " + result.getValue());
        return result;
    }
}

上下文传递

这里的上下文问题主要是指在提供端异步的场景。

当前我们考虑的上下文主要是指保存在RpcContext中的数据,大多数场景是需要用户在切换业务线程前自己完成Context的传递。

public class AsyncServiceImpl implements AsyncService {
    // 保存当前线程的上下文
    RpcContext context = RpcContext.getContext();
    public CompletableFuture<String> sayHello(String name) {
        return CompletableFuture.supplyAsync(() -> {
            // 设置到新线程中
            RpcContext.setContext(context);
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "async response from provider.";
        });
    }
}

不过AsyncContext也提供了signalContextSwitch()的方法来实现方便的Context切换。

public class AsyncServiceImpl implements AsyncService {
    public String sayHello(String name) {
        final AsyncContext asyncContext = RpcContext.startAsync();
        new Thread(() -> {
            asyncContext.signalContextSwitch();
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            asyncContext.write("Hello " + name + ", response from provider.");
        }).start();
        return null;
    }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,284评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,115评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,614评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,671评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,699评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,562评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,309评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,223评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,668评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,859评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,981评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,705评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,310评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,904评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,023评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,146评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,933评论 2 355

推荐阅读更多精彩内容