https://gitbook.cn/books/5d72668ad9c2de3db4e8ecd3/index.html
微信扫码
Apache Dubbo 是阿里巴巴开源的高性能可扩展分布式 RPC 框架,在 Dubbo 2.7.0 版本其服务消费端异步调用实现中引入了 JDK8 中的 CompletableFuture 类实现了真正意义上的异步调用;服务提供端则为了避免不同服务共用同一个 Dubbo 内部线程池造成相互影响,提供了异步处理能力,从而实现了全链路异步。
dubbo中将调用端称为异步调用,而服务端称为异步执行,异步执行无异于
本 Chat 内容如下:Dubbo 提供的异步调用与异步处理模型
Dubbo 2.7.0 版本前服务消费端如何使用异步调用,及其缺点
Dubbo 2.7.0 版本后服务消费端如何使用异步调用
服务提供端如何基于定义 CompletableFuture 签名的接口实现异步执行
服务提供端如何使用 AsyncContext 实现异步执行
Dubbo 提供的服务消费端异步调用
正如 Dubbo 官网所说 dubbo 从 2.7.0 版本开始支持所有异步编程接口以 CompletableFuture 为基础,以便解决 2.7.0 之前版本异步调用的不便与功能缺失。
异步调用实现是基于 NIO 的非阻塞能力实现并行调用,服务消费端不需要启动多线程即可完成并行调用多个远程服务,相对多线程开销较小,如下图是 Dubbo 异步调用链路概要流程图图如上图,步骤 1 当服务消费端发起 RPC 调用时候使用的用户线程,用户线程首先使用步骤 2 创建了一个 Future 对象,然后步骤 3 会把请求转换为 IO 线程来执行,步骤 3 为异步过程,所以会马上返回,然后用户线程使用步骤 4 把其创建的 Future 对象设置到 RpcContext 中,其后用户线程就返回了。
然后步骤 5 用户线程可以在某个时间点从 RpcContext 中获取设置的 Futrue 对象,并且使用步骤 6 来等待调用结果。
步骤 7 当服务提供方返回结果后,调用方线程模型中的线程池中线程则会把结果使用步骤 8 写入到 Future,这时候用户线程就可以得到远程调用结果了。
上图中实线条箭头代表同步调用,虚线箭头表示异步调用。
Dubbo 2.7.0 版本前的异步调用
2.7.0 之前的异步调用能力比较弱,比如使用下面方式进行异步调用:
//1
ReferenceConfig<GreetingService> referenceConfig = new ReferenceConfig<GreetingService>();
...
//2\. 设置为异步
referenceConfig.setAsync(true);
//3\. 直接返回 null
GreetingService greetingService = referenceConfig.get();
System.out.println(greetingService.sayHello("world"));
//4.等待结果
java.util.concurrent.Future<String> future = RpcContext.getContext().getFuture();
System.out.println(future.get());
如上代码 2 设置调用为异步方式,设置为异步后,代码 3 直接调用 sayHello 方法会马上返回 null,如果要想获取远程调用的真正结果,需要使用代码 4 获取 future 对象,并且调用 future 的 get 系列方法来获取真正结果。
上面讲解的基于从返回的 future 调用 get()方法方式实现异步缺点是当业务线程调用 get()方法后业务线程会被阻塞,这不是我们想要的,所以 dubbo 提供了在 future 对象上设置回调函数的方式,让我们实现真正的异步调用。比如 APiAsyncConsumerForCallBack 类中:
// 14
ReferenceConfig<GreetingService> referenceConfig = new ReferenceConfig<GreetingService>();
...
// 15\. 设置为异步
referenceConfig.setAsync(true);
// 16\. 直接返回 null
GreetingService greetingService = referenceConfig.get();
System.out.println(greetingService.sayHello("world"));
// 17.异步执行回调函数
((FutureAdapter) RpcContext.getContext().getFuture()).getFuture().setCallback(new ResponseCallback() {
//返回响应结果
@Override
public void done(Object response) {
System.out.println("result:" + response);
}
//出现异常
@Override
public void caught(Throwable exception) {
System.out.println("error:" + exception.getLocalizedMessage());
}
});
如上代码可知这种方式当业务线程获取了 future 对象后,在其上设置了回调函数后马上就会返回,然后等服务提供端把响应结果写回调用方后,调用方的线程模型中的线程池中线程会把结果写入 future 对象后,回调回调函数,可知这个过程中是不需要业务线程干预的,实现了真正的异步调用。
上面我们介绍了 2.7.0 前提供的异步调用方式,Future 方式只支持阻塞式的 get()接口获取结果。虽然通过获取内置的 ResponseFuture 接口,可以设置回调。但获取 ResponseFuture 的 API 使用不便,并且无法满足让多个 Future 协同工作的场景,功能比较单一。
Dubbo 2.7.0 版本提供的异步调用
下面我们使用 Dubbo 2.7.0 版本提供的基于 CompletableFuture 的异步调用:
// 1
ReferenceConfig<GreetingService> referenceConfig = new ReferenceConfig<GreetingService>();
...
// 2\. 设置为异步
referenceConfig.setAsync(true);
// 3\. 直接返回 null
GreetingService greetingService = referenceConfig.get();
System.out.println(greetingService.sayHello("world"));
// 4.异步执行回调
CompletableFuture<String> future = RpcContext.getContext().getCompletableFuture();
future.whenComplete((v, t) -> {
if (null != t) {
t.printStackTrace();
} else {
System.out.println(v);
}
});
如上代码 4,可以直接获取到 CompletableFuture,然后设置回调,基于 CompletableFuture 已有的能力,我们可以对 CompletableFuture 对象进行一系列的操作,以及可以让多个请求的 CompletableFuture 对象之间进行运算(比如合并两个 CompletableFuture 对象的结果为一个 CompletableFuture 对象等等)。
下面我们看看如何基于 CompletableFuture 的能力,来组合多个 Future 实现(多次 rpc 调用结果进行聚合),我们看下面代码:
public class APiAsyncConsumerForCompletableFuture3 {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 1.创建服务引用对象实例
ReferenceConfig<GreetingService> referenceConfig = new ReferenceConfig<GreetingService>();
// 2.设置应用程序信息
referenceConfig.setApplication(new ApplicationConfig("first-dubbo-consumer"));
// 3.设置服务注册中心
referenceConfig.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
// 4.设置服务接口和超时时间
referenceConfig.setInterface(GreetingService.class);
referenceConfig.setTimeout(5000);
// 5.设置服务分组与版本
referenceConfig.setVersion("1.0.0");
referenceConfig.setGroup("dubbo");
// 6\. 设置为异步
referenceConfig.setAsync(true);
// 7.引用服务
GreetingService greetingService = referenceConfig.get();
// 8.异步执行,并设置回调
System.out.println(greetingService.sayHello("hello"));
CompletableFuture<String> future1 = RpcContext.getContext().getCompletableFuture();
// 9.异步执行,并设置回调
System.out.println(greetingService.sayHello("jiaduo"));
CompletableFuture<String> future2 = RpcContext.getContext().getCompletableFuture();
//10.组合两个 future
future1.thenCombine(future2, (x,y)->x+y).whenComplete((v, t) -> {
if (t != null) {
t.printStackTrace();
} else {
System.out.println(Thread.currentThread().getName() + " " + v);
}
});
// 11\. 挂起线程
Thread.currentThread().join();
}
}
如上代码 8 发起了一次异步调用并且从上下文中获取了 future1,如上代码 9 发起了一次异步调用并且从上下文中获取了 future2,代码 10 则使用函数 thenCombine 组合 future1 与 future2,然后设置一个回调函数,意在等两次异步调用结果都产生后,基于两者的结果作为回调函数的参数,然后执行回调函数。
Dubbo 提供的服务提供端异步处理
在 Provider 端非异步执行时候,其对调用方发来的请求的处理是在 Dubbo 内部线程模型的线程池中的线程来执行的,在 dubbo 中服务提供方提供的所有的服务接口都是使用这一个线程池来执行的,所以当一个服务执行比较耗时时候,可能会占用线程池中很多线程,这可能就会导致其他服务的处理收到影响。
Provider 端异步执行则将服务的处理逻辑从 Dubbo 内部线程池切换到业务自定义线程,避免 Dubbo 线程池中线程被过度占用,有助于避免不同服务间的互相影响。
但是需要注意 provider 端异步执行对节省资源和提升 RPC 响应性能是没有效果的,这时是因为如果服务处理比较耗时,虽然不是使用 Dubbo 框架内部线程处理,但是还是需要业务自己的线程来处理,另外副作用还有会新增一次线程上下文切换(从 dubbo 内部线程池线程切换到业务线程),模型如下图如上图图中 Provider 端在同步提供服务时候是使用 Dubbo 内部线程池中线程来进行处理的,在异步执行时候则是使用业务自己设置的线程来从 dubbo 内部线程池中线程接收请求进行处理。
基于定义 CompletableFuture 签名的接口实现异步执行
基于定义 CompletableFuture 签名的接口实现异步执行需要服务提供端的服务方法返回值类型为 CompletableFuture,如下 GrettingServiceAsyncImpl 中服务提供端实现了该方式的异步执行:
public class GrettingServiceAsyncImpl implements GrettingServiceAsync {
// 1.创建业务自定义线程池
private final ThreadPoolExecutor bizThreadpool = new ThreadPoolExecutor(8, 16, 1, TimeUnit.MINUTES,
new SynchronousQueue(), new NamedThreadFactory("biz-thread-pool"),
new ThreadPoolExecutor.CallerRunsPolicy());
// 2.创建服务处理接口,返回值为 CompletableFuture
@Override
public CompletableFuture<String> sayHello(String name) {
// 2.1 为 supplyAsync 提供自定义线程池 bizThreadpool,避免使用 JDK 公用线程池(ForkJoinPool.commonPool())
// 使用 CompletableFuture.supplyAsync 让服务处理异步化进行处理
// 保存当前线程的上下文
RpcContext context = RpcContext.getContext();
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("async return ");
return "Hello " + name + " " + context.getAttachment("company");
}, bizThreadpool);
}
}
如上可知基于定义 CompletableFuture 签名的接口实现异步执行需要接口方法返回值为 CompletableFuture,并且方法内部使用 CompletableFuture.supplyAsync 让本来该 Dubbo 内部线程线程处理的服务,转换为由业务自定义线程池中线程来处理,CompletableFuture.supplyAsync 方法会马上返回一个 CompletableFuture 对象(所以 dubbo 内部线程池线程会得到及时释放),传递的业务函数则由业务线程池 bizThreadpool 执行。
需要注意的是调用 sayHello 方法的线程是 Dubbo 线程模型线程池中线程,而业务处理是 bizThreadpool 中线程处理,所以代码 2.1 保存了 Rpc 上下文对象,以便在业务处理线程中使用。
使用 AsyncContext 实现异步执行
使用 AsyncContext 实现异步执行需要在服务提供端的服务方法内使用 RpcContext.startAsync()显示开启异步,如下服务提供端服务实现 GrettingServiceAsyncContextImpl 代码如下:
public class GrettingServiceAsyncContextImpl implements GrettingServiceRpcContext {
// 1.创建业务自定义线程池
private final ThreadPoolExecutor bizThreadpool = new ThreadPoolExecutor(8, 16, 1, TimeUnit.MINUTES,
new SynchronousQueue(), new NamedThreadFactory("biz-thread-pool"),
new ThreadPoolExecutor.CallerRunsPolicy());
// 2.创建服务处理接口,返回值为 CompletableFuture
@Override
public String sayHello(String name) {
// 2.1 开启异步
final AsyncContext asyncContext = RpcContext.startAsync();
bizThreadpool.execute(() -> {
// 2.2 如果要使用上下文,则必须要放在第一句执行
asyncContext.signalContextSwitch();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 2.3 写回响应
asyncContext.write("Hello " + name + " " + RpcContext.getContext().getAttachment("company"));
});
return null;
}
}
如上代码 2.1 调用 RpcContext.startAsync()开启服务异步执行,然后返回一个 asyncContext,然后把服务处理任务提交到业务线程池后方法就直接返回了 null。
异步任务内首先执行代码 2.2 切换任务的上下文,然后休眠 500ms 充当任务执行,最后代码 2.3 把任务执行结果写入到异步上下文,可知其实现是参考了 Servlet3.0 的异步执行。
这里由于具体执行业务处理的逻辑不在 sayHello 方法所在的 dubbo 内部线程池线程,所以不会被阻塞。
为了探究其原理我们先看 RpcContext.startAsync()方法:
public static AsyncContext startAsync() throws IllegalStateException {
//2.1.1 获取当前线程的上下文对象
RpcContext currentContext = getContext();
//2.1.2 为当前线程的上下文创建 AsyncContextImpl 实现
if (currentContext.asyncContext == null) {
currentContext.asyncContext = new AsyncContextImpl();
}
//2.2.3 启动异步上下文,并返回
currentContext.asyncContext.start();
return currentContext.asyncContext;
}
如上代码主要作用是为当前调用线程关联的 rpc 上下文对象关联 AsyncContextImpl,AsyncContextImpl 构造函数如下:
public AsyncContextImpl() {
this.storedContext = RpcContext.getContext();
this.storedServerContext = RpcContext.getServerContext();
}
可知其把当前线程上下文对象保存到了 AsyncContextImpl 内部(这是因为 ThreadLocal 变量不能跨线程访问,可以参考《Java 并发编程之美》一书)
AsyncContextImpl 创建完毕后会被启动,其中 AsyncContextImpl 的 start 方法为:
public void start() {
if (this.started.compareAndSet(false, true)) {
this.future = new CompletableFuture<>();
}
}
如上代码可知是为 AsyncContextImpl 内的 future 对象创建一 CompletableFuture 对象,这里 started 是原子性 boolean 变量,是为了避免重复创建 CompletableFuture。
下面我们看 AsyncContextImpl 的,signalContextSwitch 方法,该方法是为了让 AsyncContextImpl 内保存的上下文信息传递到业务线程池线程中(也就是业务线程池中线程可以通过 RpcContext 来访问):
public void signalContextSwitch() {
RpcContext.restoreContext(storedContext);
RpcContext.restoreServerContext(storedServerContext);
}
需要注意 signalContextSwitch 方法需要在业务线程中第一句来执行,以避免后面的业务处理使用 RpcContext 获取上下文信息时候出错。
下面我们在看 AsyncContextImpl 的 write 方法:
public void write(Object value) {
if (isAsyncStarted() && stop()) {
//异常
if (value instanceof Throwable) {
Throwable bizExe = (Throwable) value;
future.completeExceptionally(bizExe);
} else {
//服务处理结果
future.complete(value);
}
} else {
throw new IllegalStateException("The async response has probably been wrote back by another thread, or the asyncContext has been closed.");
}
}
如上代码当业务线程中服务处理完毕后,会把执行结果写入到在 start 方法创建的 CompletableFuture 对象内。
总结:当 dubbo 的线程模型中的线程池线程执行 sayHello()方法时候,方法内通过 RpcContext.startAsync()创建了一个 AsyncContextImpl 实例,然后调用其 start()方法创建了一个 CompletableFuture 对象;然后 sayHello()方法把业务处理任务添加到线程池后,直接返回 null;返回 null 后,结合上节 AbstractProxyInvoker 的 invoke()方法内代码 11.1 也返回了 null,然后代码 11.2 判断用 RpcContext.startAsync()开启了异步执行,所以使用((AsyncContextImpl)(rpcContext.getAsyncContext())).getInternalFuture()获取了 AsyncContextImpl 内的 future 对象。