同异步开发模式

同异步开发模式

标签(空格分隔): 同步异步


在soa服务调用中,一般一个请求会分配一个线程对请求进行处理,在IO操作比较多并且采用同步模式的情况下,假如有大量请求,那么大部分的线程都会处于IO等待状态,会导致系统的吞吐率下降。在很多情况下,使用异步的调用方式,可以立即返回初步的结果,延迟返回最终结果数据,在这个过程可以释放占用的线程等资源,避免阻塞,提高响应速率和系统的吞吐率。


1、同步异步、阻塞非阻塞

之前一直以为同步/异步阻塞/非阻塞是同一个概念,后面了解到是不一样的概念之后又很想弄清两者之间的区别,一直在找能将两者划清的一道线。但其实这两者的联系是十分紧密的,同步的往往伴随的是阻塞,异步的往往是非阻塞的。但是阻塞和非阻塞一般来说是只针对IO操作而言的。

同步:发出一个请求之后,线程需要等到结果回来才去处理其他事情

异步:发出一个请求后,等待的过程可以释放这个线程,等到结果回来了,继续处理剩下的事情

阻塞:进程处理一个任务时,需要等待IO操作处理完成后才能继续执行

非阻塞:进程处理一个任务时,不需要等待IO操作处理完成,就能继续后续的操作,隔断时间再来询问之前的操作是否完成。这样的过程其实也叫轮询。

阻塞和非阻塞应该是一种状态,针对单个线程或者进程而言,是否需要等待IO操作处理完了才能做其他操作。而同步异步更多的是一种通信模式,一般来说,异步都是通过回调来完成,调用方发出请求后,中间没有阻塞的状态,直接去处理其他任务,被调用方处理完请求后主动通知调用方并返回结果。

这篇文章的介绍写的很好 http://blog.csdn.net/historyasamirror/article/details/5778378


2、Java的异步编程方式

2.1、Future

JDK 5引入了Future模式,可以用来进行异步调用。Future接口有五个方法

public interface Future<V> { 

    boolean cancel(boolean mayInterruptIfRunning); //取消任务的执行
    boolean isCancelled();  //任务是否已经取消
    boolean isDone();      //任务是否已经完成
    V get() throws InterruptedException, ExecutionException;   //等待任务结束 ,获取结果
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; //在timeout时间内等待任务结束 ,获取结果,如果超时抛出异常
}

Future虽然可以实现异步执行,但是在调用future.get()时线程会阻塞,直到拿到结果,这时又变成同步操作。
不阻塞的情况则需要循环地调用future.isDone()判断future是否完成,再去get结果,这样会浪费CPU的资源。

2.2、CompletableFuture

CompletableFuture实现了非阻塞的异步调用。我们可以在CompletableFuture上注册一个completion事件,CompletableFuture执行完成后会触发这个事件的调用,这样就让执行免收阻塞之苦。提供这一功能的方法有

  • thenApply()(针对返回值为其他类型的函数)
  • thenAccept()(针对返回值为void的函数)
  • whenComplete()(针对接受一个值和一个Throwable,并返回void的函数)等。

3、SOA服务客户端与服务端通信模式

客户端的同异步模式和服务端的同异步模式其实是无关联的。


cliSer.png

客户端如果使用同步模式,发出请求后,线程会一直等待,直到服务端返回结果。
客户端如果使用异步模式,发出请求后,先拿到返回的CompletableFuture,等服务端返回结果后,会执行thenApply中的回调方法。
这两种方式,都是要等到服务端有结果了客户端才继续执行或者通过回调的方式处理请求结果,因此客户端的调用模式和服务端应该是无关联的。

服务端使用异步

if (serviceDef.isAsync) {//异步

    SoaFunctionDefinition.Async asyncFunc = (SoaFunctionDefinition.Async) soaFunction;
    CompletableFuture<RESP> future = (CompletableFuture<RESP>) asyncFunc.apply(iface, args);
    
    future.whenComplete((realResult, ex) -> {//任务执行完成后
        TransactionContext.Factory.setCurrentInstance(context);
        processResult(channelHandlerContext, soaFunction, context, realResult, application, ctx);
        onExit(ctx, getPrevChain(ctx));
    });
} else {//同步

    SoaFunctionDefinition.Sync syncFunction = (SoaFunctionDefinition.Sync) soaFunction;
    RESP result = (RESP) syncFunction.apply(iface, args);
    processResult(channelHandlerContext, soaFunction, context, result, application, ctx);
    onExit(ctx, getPrevChain(ctx));
}

上面的代码是服务端请求的处理,分了同异步,我一开始的时候想不通,既然客户端使用了异步,服务端使用了异步还有意义吗?后来我就去问了我的老师江湖人称老王,其实是因为我只站在了客户端的角度思考,服务端使用异步,服务本身的性能也会提高,那对于客户端肯定是有益处的。另外,服务端使用异步也是要看业务场景的,举个例子,客户端发送请求过来服务端后,服务端需要进行处理的逻辑是:使用爬虫从其他网页爬取数据,再对这些数据进行解析。由于爬虫需要进行网络连接,可能出现网络不好一直在等待的情况,但是解析数据其实是很快的,这个时候不使用异步就会出现大部分时间一直在等待而线程资源又没有释放的情况,就会影响系统的吞吐率。因此,在服务端使用异步模式在某些场景下其实意义是很大的。

3.1、客户端的通信模式分析(以服务化SOA框架dapeng为例)

调用dapeng服务的客户端通过服务的api去调用。dapeng是通过thrift的IDL去定义服务接口的,并且根据IDL自动生成相应的api,api中包含了同步和异步的客户端代码,因此客户端可以通过同步模式来调用也可以通过异步模式来调用。

假定是在电商系统下订单这种场景下,当用户下了订单之后,我们可能需要去做很多的操作,比如扣减用户的优惠券,减库存,扣减用户的钱包等等其他服务,假设这三个操作,每个操作分别需要100ms,200ms,300ms,如果使用同步,那么执行这些所有的操作是串行的,执行总时间可能需要100ms+200ms+300ms=600ms,假如通过异步,那么每次执行完都会立即返回一个CompletableFuture,所有操作的执行其实可以当作是并行的,那么全部执行完只需要300ms,也就是说取决于耗时最长的那个服务。

具体我们结合dapeng的代码分析

dapeng的通信是基于netty的,我们启动客户端时会将所有ChannelHandler注册到ChannelPipeline中,服务端返回的字节流最终会去到回调处理的方法中。

handler:

bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new IdleStateHandler(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds), //超时设置
                new SoaDecoder(),        //粘包拆包处理
                new SoaIdleHandler(),    //心跳处理
                new SoaClientHandler(callBack) //回调处理
        );
    }
});

同步调用:可以看到下面的代码,我们会将future插入RequestQueue队列中,以seqid为key,请求发送到服务端之后,我们调用future的get()方法,这时线程会阻塞在这里,直到服务端返回结果,callback方法中把结果设回future。这种情况下调用每个服务是按顺序走的,一次调用结束后才能继续调用下一个服务,因此耗时是调用每一次服务的时间之和。

public ByteBuf send(Channel channel, int seqid, ByteBuf request, long timeout) throws SoaException {

    //means that this channel is not idle and would not managered by IdleConnectionManager
    IdleConnectionManager.remove(channel);

    CompletableFuture<ByteBuf> future = new CompletableFuture<>();

    RequestQueue.put(seqid, future);

    try {
        channel.writeAndFlush(request); //将请求发送至服务端
        ByteBuf respByteBuf = future.get(timeout, TimeUnit.MILLISECONDS);
        return respByteBuf;
    } catch (TimeoutException e) {
        LOGGER.error("请求超时,seqid:"+seqid);
        throw new SoaException(SoaCode.TimeOut.getCode(), SoaCode.TimeOut.getMsg());
    } catch (Throwable e) {
        throw new SoaException(SoaCode.UnKnown, e.getMessage() == null ? SoaCode.UnKnown.getMsg() : e.getMessage());
    } finally {
        RequestQueue.remove(seqid);
    }

}

callback:服务端返回的结果最终会来到这个方法,我们拿到seqid后从RequestQueue获取对应的future,然后将结果写进future。

private SoaClientHandler.CallBack callBack = msg -> {
    // length(4) stx(1) version(...) protocol(1) seqid(4) header(...) body(...) etx(1)
    int readerIndex = msg.readerIndex();
    msg.skipBytes(7); // length4 + stx1 + version1 + protocol1
    int seqid = msg.readInt();   //拿到seqid
    msg.readerIndex(readerIndex);

    CompletableFuture<ByteBuf> future = RequestQueue.remove(seqid);
    if (future != null) {
        future.complete(msg); //将结果写进future
    } else {
        LOGGER.error("返回结果超时,siqid为:" + seqid);
        msg.release();
    }
};

异步调用:这里同样是将future写进RequestQueue队列中,然后发送请求到服务端,这个时候我们直接返回future,因此主线程不会阻塞。最后当结果返回的时候,回调方法里同样会将结果写进future,最后调用方可以在CompletableFuture中的thenApply等方法对结果进行处理。因此异步调用下会立即返回future,马上可以执行其他操作,结合上面的例子,调用十个服务,每个调用耗时100ms,用异步执行总耗时也是大约100ms。

public CompletableFuture<ByteBuf> sendAsync(Channel channel, int seqid, ByteBuf request, long timeout) throws Exception {

    IdleConnectionManager.remove(channel);

    CompletableFuture<ByteBuf> future = new CompletableFuture<>();

    RequestQueue.putAsync(seqid, future, timeout);

    channel.writeAndFlush(request);

    return future;
}

3.2、服务端的通信模式分析(以服务化SOA框架dapeng为例)

dapeng 根据IDL生成api的时候,会生成两个接口,一个同步一个异步,写服务实现时需要根据应用场景决定使用同步还是异步,使用同步则继承同步的接口,使用异步则继承异步的接口。服务端会根据继承的接口判断使用同步还是异步。

我们以在浏览器登陆微信的场景为例,微信登陆的时候需要扫描二维码,假设用户扫码之后我们需要调用一个服务,去加载某些信息,当用户扫描成功后返回这些信息,有个定义这个处理的接口接口,那么我们定义好IDL之后会生成一个同步和一个异步的接口,异步的会继承AsyncService,这样框架处理的时候可以以这个为根据判断是同步还是异步。

public interface LoadInfoService {  
    void loadInfo() throws com.github.dapeng.core.SoaException;
}

public interface LoadInfoServiceAsync extends AsyncService {      

    Future<String> loadInfo() throws SoaException;
}

      

那么我们在写服务端的时候需要去实现这个接口,我们采用异步的模式,就需要implements LoadInfoServiceAsync。

public class LoadInfoServiceImpl implements LoadInfoServiceAsync {

    @Override
    public Future<String> loadInfo() throws SoaException {

        CompletableFuture<String> result = CompletableFuture.supplyAsync(() -> {
            try {
                String info = processLoading();  //业务处理
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return info;
        });
        return result;
    }
    
}

这样我们就实现了异步服务,那么dapeng框架在处理请求的时候就会判断这个服务是同步还是异步实现,然后采取不同的处理方法,主要代码如下:

if (serviceDef.isAsync) {
    SoaFunctionDefinition.Async asyncFunc = (SoaFunctionDefinition.Async) soaFunction;
    CompletableFuture<RESP> future = (CompletableFuture<RESP>) asyncFunc.apply(iface, args); //调用服务实现的方法
    future.whenComplete((realResult, ex) -> {
        TransactionContext.Factory.setCurrentInstance(context);
        processResult(channelHandlerContext, soaFunction, context, realResult, application, ctx);
        onExit(ctx, getPrevChain(ctx));
    });
} else {
    SoaFunctionDefinition.Sync syncFunction = (SoaFunctionDefinition.Sync) soaFunction;
    RESP result = (RESP) syncFunction.apply(iface, args); //调用服务实现的方法
    processResult(channelHandlerContext, soaFunction, context, result, application, ctx);
    onExit(ctx, getPrevChain(ctx));
}

回到上面说到的场景,假设我们采用同步实现,由于我们不知道用户什么时候扫描,因此扫描完后我们才能去调用服务,并且等待服务返回,这个时候假如服务需要等到的时间很长,那么用户体验就会很差。另外,业务线程池的数量是有限的,如果每次服务调用时间很长,那么使用同步每次调用都占用一个业务线程,对服务性能就会造成很大的影响。

4、同步开发模式存在的问题

对于存在大量计算或者IO操作等需要等待应答的情况,一个连接的并发请求数非常有限,只有等到有结果之后才能进行下一个请求,而一个连接池的连接数是有限的。尤其是在多个服务系统间的调用时,如果流程阻塞在某个系统上,那么整体系统的性能都会下降。

5、异步开发模式适用场景

但并不是说异步模式一定优越于同步模式,以下场景适合使用异步的开发模式

  • 不涉及共享资源,或对共享资源只读,即非互斥操作

  • 没有时序上的严格关系

  • 常用于IO操作等耗时操作

  • 不影响主线程逻辑

参考https://juejin.im/post/593e455861ff4b006c9bf6e0

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,839评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,543评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,116评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,371评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,384评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,111评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,416评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,053评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,558评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,007评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,117评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,756评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,324评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,315评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,539评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,578评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,877评论 2 345

推荐阅读更多精彩内容

  • 前端开发面试题 <a name='preface'>前言</a> 只看问题点这里 看全部问题和答案点这里 本文由我...
    自you是敏感词阅读 754评论 0 3
  • 转自: //www.greatytc.com/p/486b0965c296 http://www.jia...
    demop阅读 3,874评论 1 21
  • 本文为苍云法尔原创作品,转载请注明出处。 ❶ 那一世 宫院深深,锦缎绫罗 谁意气风发江山在握 谁勤精绣画贤良淑德 ...
    苍云法尔阅读 445评论 1 0
  • 我一直认为,束缚我的是黑暗,谁曾想,束缚我的竟是自己最熟悉的一张网,当我向着自己认定为方向前行到最后一段时,突...
    各自流阅读 505评论 0 1
  • 一张脸是一丘山壑,一双脚踩的是荒泽,一条山路尽是坎坷,是他,就是他!用一身硬朗的骨骼,撑起一方郊田野舍,他让田园不...
    柳尘微阅读 195评论 7 9