同异步开发模式
标签(空格分隔): 同步异步
在soa服务调用中,一般一个请求会分配一个线程对请求进行处理,在IO操作比较多并且采用同步模式的情况下,假如有大量请求,那么大部分的线程都会处于IO等待状态,会导致系统的吞吐率下降。在很多情况下,使用异步的调用方式,可以立即返回初步的结果,延迟返回最终结果数据,在这个过程可以释放占用的线程等资源,避免阻塞,提高响应速率和系统的吞吐率。
1、同步异步、阻塞非阻塞
之前一直以为同步/异步和阻塞/非阻塞是同一个概念,后面了解到是不一样的概念之后又很想弄清两者之间的区别,一直在找能将两者划清的一道线。但其实这两者的联系是十分紧密的,同步的往往伴随的是阻塞,异步的往往是非阻塞的。但是阻塞和非阻塞一般来说是只针对IO操作而言的。
同步:发出一个请求之后,线程需要等到结果回来才去处理其他事情
异步:发出一个请求后,等待的过程可以释放这个线程,等到结果回来了,继续处理剩下的事情
阻塞:进程处理一个任务时,需要等待IO操作处理完成后才能继续执行
非阻塞:进程处理一个任务时,不需要等待IO操作处理完成,就能继续后续的操作,隔断时间再来询问之前的操作是否完成。这样的过程其实也叫轮询。
阻塞和非阻塞应该是一种状态,针对单个线程或者进程而言,是否需要等待IO操作处理完了才能做其他操作。而同步异步更多的是一种通信模式,一般来说,异步都是通过回调来完成,调用方发出请求后,中间没有阻塞的状态,直接去处理其他任务,被调用方处理完请求后主动通知调用方并返回结果。
这篇文章的介绍写的很好 http://blog.csdn.net/historyasamirror/article/details/5778378
2、Java的异步编程方式
2.1、Future
JDK 5引入了Future模式,可以用来进行异步调用。Future接口有五个方法
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning); //取消任务的执行
boolean isCancelled(); //任务是否已经取消
boolean isDone(); //任务是否已经完成
V get() throws InterruptedException, ExecutionException; //等待任务结束 ,获取结果
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; //在timeout时间内等待任务结束 ,获取结果,如果超时抛出异常
}
Future虽然可以实现异步执行,但是在调用future.get()时线程会阻塞,直到拿到结果,这时又变成同步操作。
不阻塞的情况则需要循环地调用future.isDone()判断future是否完成,再去get结果,这样会浪费CPU的资源。
2.2、CompletableFuture
CompletableFuture实现了非阻塞的异步调用。我们可以在CompletableFuture上注册一个completion事件,CompletableFuture执行完成后会触发这个事件的调用,这样就让执行免收阻塞之苦。提供这一功能的方法有
- thenApply()(针对返回值为其他类型的函数)
- thenAccept()(针对返回值为void的函数)
- whenComplete()(针对接受一个值和一个Throwable,并返回void的函数)等。
3、SOA服务客户端与服务端通信模式
客户端的同异步模式和服务端的同异步模式其实是无关联的。
客户端如果使用同步模式,发出请求后,线程会一直等待,直到服务端返回结果。
客户端如果使用异步模式,发出请求后,先拿到返回的CompletableFuture,等服务端返回结果后,会执行thenApply中的回调方法。
这两种方式,都是要等到服务端有结果了客户端才继续执行或者通过回调的方式处理请求结果,因此客户端的调用模式和服务端应该是无关联的。
服务端使用异步
if (serviceDef.isAsync) {//异步
SoaFunctionDefinition.Async asyncFunc = (SoaFunctionDefinition.Async) soaFunction;
CompletableFuture<RESP> future = (CompletableFuture<RESP>) asyncFunc.apply(iface, args);
future.whenComplete((realResult, ex) -> {//任务执行完成后
TransactionContext.Factory.setCurrentInstance(context);
processResult(channelHandlerContext, soaFunction, context, realResult, application, ctx);
onExit(ctx, getPrevChain(ctx));
});
} else {//同步
SoaFunctionDefinition.Sync syncFunction = (SoaFunctionDefinition.Sync) soaFunction;
RESP result = (RESP) syncFunction.apply(iface, args);
processResult(channelHandlerContext, soaFunction, context, result, application, ctx);
onExit(ctx, getPrevChain(ctx));
}
上面的代码是服务端请求的处理,分了同异步,我一开始的时候想不通,既然客户端使用了异步,服务端使用了异步还有意义吗?后来我就去问了我的老师江湖人称老王,其实是因为我只站在了客户端的角度思考,服务端使用异步,服务本身的性能也会提高,那对于客户端肯定是有益处的。另外,服务端使用异步也是要看业务场景的,举个例子,客户端发送请求过来服务端后,服务端需要进行处理的逻辑是:使用爬虫从其他网页爬取数据,再对这些数据进行解析。由于爬虫需要进行网络连接,可能出现网络不好一直在等待的情况,但是解析数据其实是很快的,这个时候不使用异步就会出现大部分时间一直在等待而线程资源又没有释放的情况,就会影响系统的吞吐率。因此,在服务端使用异步模式在某些场景下其实意义是很大的。
3.1、客户端的通信模式分析(以服务化SOA框架dapeng为例)
调用dapeng服务的客户端通过服务的api去调用。dapeng是通过thrift的IDL去定义服务接口的,并且根据IDL自动生成相应的api,api中包含了同步和异步的客户端代码,因此客户端可以通过同步模式来调用也可以通过异步模式来调用。
假定是在电商系统下订单这种场景下,当用户下了订单之后,我们可能需要去做很多的操作,比如扣减用户的优惠券,减库存,扣减用户的钱包等等其他服务,假设这三个操作,每个操作分别需要100ms,200ms,300ms,如果使用同步,那么执行这些所有的操作是串行的,执行总时间可能需要100ms+200ms+300ms=600ms,假如通过异步,那么每次执行完都会立即返回一个CompletableFuture,所有操作的执行其实可以当作是并行的,那么全部执行完只需要300ms,也就是说取决于耗时最长的那个服务。
具体我们结合dapeng的代码分析
dapeng的通信是基于netty的,我们启动客户端时会将所有ChannelHandler注册到ChannelPipeline中,服务端返回的字节流最终会去到回调处理的方法中。
handler:
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new IdleStateHandler(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds), //超时设置
new SoaDecoder(), //粘包拆包处理
new SoaIdleHandler(), //心跳处理
new SoaClientHandler(callBack) //回调处理
);
}
});
同步调用:可以看到下面的代码,我们会将future插入RequestQueue队列中,以seqid为key,请求发送到服务端之后,我们调用future的get()方法,这时线程会阻塞在这里,直到服务端返回结果,callback方法中把结果设回future。这种情况下调用每个服务是按顺序走的,一次调用结束后才能继续调用下一个服务,因此耗时是调用每一次服务的时间之和。
public ByteBuf send(Channel channel, int seqid, ByteBuf request, long timeout) throws SoaException {
//means that this channel is not idle and would not managered by IdleConnectionManager
IdleConnectionManager.remove(channel);
CompletableFuture<ByteBuf> future = new CompletableFuture<>();
RequestQueue.put(seqid, future);
try {
channel.writeAndFlush(request); //将请求发送至服务端
ByteBuf respByteBuf = future.get(timeout, TimeUnit.MILLISECONDS);
return respByteBuf;
} catch (TimeoutException e) {
LOGGER.error("请求超时,seqid:"+seqid);
throw new SoaException(SoaCode.TimeOut.getCode(), SoaCode.TimeOut.getMsg());
} catch (Throwable e) {
throw new SoaException(SoaCode.UnKnown, e.getMessage() == null ? SoaCode.UnKnown.getMsg() : e.getMessage());
} finally {
RequestQueue.remove(seqid);
}
}
callback:服务端返回的结果最终会来到这个方法,我们拿到seqid后从RequestQueue获取对应的future,然后将结果写进future。
private SoaClientHandler.CallBack callBack = msg -> {
// length(4) stx(1) version(...) protocol(1) seqid(4) header(...) body(...) etx(1)
int readerIndex = msg.readerIndex();
msg.skipBytes(7); // length4 + stx1 + version1 + protocol1
int seqid = msg.readInt(); //拿到seqid
msg.readerIndex(readerIndex);
CompletableFuture<ByteBuf> future = RequestQueue.remove(seqid);
if (future != null) {
future.complete(msg); //将结果写进future
} else {
LOGGER.error("返回结果超时,siqid为:" + seqid);
msg.release();
}
};
异步调用:这里同样是将future写进RequestQueue队列中,然后发送请求到服务端,这个时候我们直接返回future,因此主线程不会阻塞。最后当结果返回的时候,回调方法里同样会将结果写进future,最后调用方可以在CompletableFuture中的thenApply等方法对结果进行处理。因此异步调用下会立即返回future,马上可以执行其他操作,结合上面的例子,调用十个服务,每个调用耗时100ms,用异步执行总耗时也是大约100ms。
public CompletableFuture<ByteBuf> sendAsync(Channel channel, int seqid, ByteBuf request, long timeout) throws Exception {
IdleConnectionManager.remove(channel);
CompletableFuture<ByteBuf> future = new CompletableFuture<>();
RequestQueue.putAsync(seqid, future, timeout);
channel.writeAndFlush(request);
return future;
}
3.2、服务端的通信模式分析(以服务化SOA框架dapeng为例)
dapeng 根据IDL生成api的时候,会生成两个接口,一个同步一个异步,写服务实现时需要根据应用场景决定使用同步还是异步,使用同步则继承同步的接口,使用异步则继承异步的接口。服务端会根据继承的接口判断使用同步还是异步。
我们以在浏览器登陆微信的场景为例,微信登陆的时候需要扫描二维码,假设用户扫码之后我们需要调用一个服务,去加载某些信息,当用户扫描成功后返回这些信息,有个定义这个处理的接口接口,那么我们定义好IDL之后会生成一个同步和一个异步的接口,异步的会继承AsyncService,这样框架处理的时候可以以这个为根据判断是同步还是异步。
public interface LoadInfoService {
void loadInfo() throws com.github.dapeng.core.SoaException;
}
public interface LoadInfoServiceAsync extends AsyncService {
Future<String> loadInfo() throws SoaException;
}
那么我们在写服务端的时候需要去实现这个接口,我们采用异步的模式,就需要implements LoadInfoServiceAsync。
public class LoadInfoServiceImpl implements LoadInfoServiceAsync {
@Override
public Future<String> loadInfo() throws SoaException {
CompletableFuture<String> result = CompletableFuture.supplyAsync(() -> {
try {
String info = processLoading(); //业务处理
} catch (InterruptedException e) {
e.printStackTrace();
}
return info;
});
return result;
}
}
这样我们就实现了异步服务,那么dapeng框架在处理请求的时候就会判断这个服务是同步还是异步实现,然后采取不同的处理方法,主要代码如下:
if (serviceDef.isAsync) {
SoaFunctionDefinition.Async asyncFunc = (SoaFunctionDefinition.Async) soaFunction;
CompletableFuture<RESP> future = (CompletableFuture<RESP>) asyncFunc.apply(iface, args); //调用服务实现的方法
future.whenComplete((realResult, ex) -> {
TransactionContext.Factory.setCurrentInstance(context);
processResult(channelHandlerContext, soaFunction, context, realResult, application, ctx);
onExit(ctx, getPrevChain(ctx));
});
} else {
SoaFunctionDefinition.Sync syncFunction = (SoaFunctionDefinition.Sync) soaFunction;
RESP result = (RESP) syncFunction.apply(iface, args); //调用服务实现的方法
processResult(channelHandlerContext, soaFunction, context, result, application, ctx);
onExit(ctx, getPrevChain(ctx));
}
回到上面说到的场景,假设我们采用同步实现,由于我们不知道用户什么时候扫描,因此扫描完后我们才能去调用服务,并且等待服务返回,这个时候假如服务需要等到的时间很长,那么用户体验就会很差。另外,业务线程池的数量是有限的,如果每次服务调用时间很长,那么使用同步每次调用都占用一个业务线程,对服务性能就会造成很大的影响。
4、同步开发模式存在的问题
对于存在大量计算或者IO操作等需要等待应答的情况,一个连接的并发请求数非常有限,只有等到有结果之后才能进行下一个请求,而一个连接池的连接数是有限的。尤其是在多个服务系统间的调用时,如果流程阻塞在某个系统上,那么整体系统的性能都会下降。
5、异步开发模式适用场景
但并不是说异步模式一定优越于同步模式,以下场景适合使用异步的开发模式
不涉及共享资源,或对共享资源只读,即非互斥操作
没有时序上的严格关系
常用于IO操作等耗时操作
不影响主线程逻辑