Reactor 3-响应式编程-冷序列与热序列

如何使用响应式编程(一)

该篇主要是针对于响应式编程的实践,在一些实际的开发当中,利用具体的例子和应用场景,来加深对响应式编程的理解。
其中可能会涉及到一些思考过程,和方案的选择,以及最佳实践的讨论,希望可以让读者更快的上手并且可以自己开利用Reactor3框架进行实际的开发工作。

在这之前,我们可以先说一下冷热序列的问题。

冷序列与热序列

冷热序列是相对于发布者(Publisher)来说的。一般普通的默认序列我们都认为是冷序列,这些序列在调用subscribe方法之前是不会有任何行动的,那么热序列又是什么呢,别着急,我们慢慢来说,先从一个应用场景来说明什么是热序列。

热序列的使用场景

想象有这样一个需求,需要将后台服务打印的日志,实时的展示到页面上,那么我们应该要怎么来做这个事情呢

这个示例是一个完整的示例,虽然在实际的工作中没有什么特别大的用处,但是重点是要熟悉一下Reactor3的热序列的应用。
该示例运行在2.5.2的springboot上,使用的是webFlux

我们这儿选择使用websocket来实现这个这个功能,先来说一下使用webflux配置WebSocket的方法;

  • 首先我们应该在WebConfig当中创建一个 SimpleUrlHandlerMapping
    @Bean
    public HandlerMapping handlerMapping(LogWebSocketHandler logWebSocketHandler,
                                         FlowWebSocketHandler flowWebSocketHandler) {
        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/ws/log", logWebSocketHandler); //如果一个handler不够可以配置多个
        int order = -1; // before annotated controllers
        return new SimpleUrlHandlerMapping(map, order);
    }

LogWebSocketHandler 实现了 WebSocketHandler 接口,如下


public interface WebSocketHandler {
    default List<String> getSubProtocols() {
        return Collections.emptyList();
    }

    Mono<Void> handle(WebSocketSession var1);
}

通过 handle 方法我们可以拿到一个WebSocketSession对象,如下是该方法的实现

    @Override
    public Mono<Void> handle(WebSocketSession webSocketSession) {
        webSocketSession.getHandshakeInfo();
        Mono<Void> in = webSocketSession       //1
                .receive()
                .doOnNext(webSocketMessage -> {
                    String text = webSocketMessage.getPayloadAsText();
                    LOGGER.info("WebSocket msg:{}", text);
                })
                .then();

        Mono<Void> out = webSocketSession.send(webSocketMessagePublisher(webSocketSession)); //2
        return Mono.zip(in, out).then(); //3
    }

  1. 用来接受客户端发来的数据(输入)
  2. 用来给客户端发送数据(输出)
  3. 这里尽量保证输入和输出分开来处理,因为我们这里不是为了要实现一个请求响应的模式(这说法有点牵强,等找到更好的表达方式的时候,我会回来改掉);这里 zip 我感觉还是挺有意思的,想象一下我们有两个等长的集合或者数组,将这两个集合或者数组 zip 一下的结果应该是什么呢,在一些实际项目开发的场景当中,我门可能会遇到这样的场景,Java JDK当中是没有提供 元组(Tuple) 这个数据结构的,当有些场景需要使用元组的时候,我门只能自己实现一个,那么如何生成元组呢,这时,我门可以实现一个 zip 算法来生成元组。

我门再来看下WebSocketSession的 send 方法的定义

Mono<Void> send(Publisher<WebSocketMessage> var1);

来看,这里需要传入的是一个 WebSocketMessagePublisher,而返回值是一个Mono<Void>,这里是不是长得有点像我们之前说的拼接模式呢,入参是一个上游的序列,而返回的是该上游序列拼接了内部执行逻辑的下游序列之后的后继,听起来有点绕口对不对,没关系,这里不是重点,因为我们是不需要关心send方法内部到底拼接了哪些逻辑的,可能是关于一些网络请求,或者网络协议之类的东西,这些都无所谓,我们最需要关注的是这里的 Publisher(这里斜体加粗体你就知道有多重要了)。

到这里,WebSocket的配置已经结束,下面的重点是如何来实现这个Publisher。

两种实现方案
第一个方案

当第一次遇到这个问题的时候,我的第一反应是维护一组 Session ,每个Session代表一个客户端链接,想要给哪个人发消息,就找到相应的 Session对象并调用发送消息的接口(之前使用SrpingMVC的时候就是这么干的)。很显然这个方案是不可行的,因为这里的 WebSocketSession没什么实际的用处(除了用于获取 HandShakeInfo),我门看send方法,并没有给谁发消息,而是返回了一个 Mono<Voide> ,我们能拿这个返回值怎么办呢,什么都干不了。

第一反应不行,紧接着是第二反应:百度搜索。查询了很久,查到一个解决方案:

List<FluxSink<WebSocketMessage>> fluxSinkList = new ArrayList<>();
Flux<WebSocketMessage> sessionFlux = Flux.create(fluxSinkList::add);

是的,我们现在得到了一个 SessionFlux 这个 Flux 可以作为一个数据发布者,返回给我们的WebSocket框架,然后用暴露出来的 FluxSink#next() 发布新的数据。

这个看起来貌似是个不错的解决方案,但是总觉得有一些别扭,将一个内部对象暴露出来存在别的容器当中,估计还需要考虑内存泄漏的问题。

第二个方案

第二个方案是使用官方文档推荐的方式,使用 Sinks , 先贴一段代码


private final Sinks.Many<String> many = Sinks.many().multicast().onBackpressureBuffer(1000, true);
private final Flux<String> logFlux = many.asFlux();

...
public void outputLog(String log) {
    many.tryEmitNext(log);
}
...
public Flux<String> getLogFlux() {
    return logFlux;
}

Reactor3 API 提供的这种方式用起来还是挺方便的,省了很多事情,在这之前,我还尝试了一种方式,那就是 继承一个 Flux ,将数据发布的逻辑放到实现的内部,也就是说用Flux把业务模块包起来,做成一个大的数据发布者,这种方法很麻烦,需要考虑的事情太多了,不推荐使用,当然如果是需要扩展 Flux 的功能的话,那就无所谓了,不过在日常开发中,热序列的应用场景其实是比较少的,一般一个系统也不会出现几个。

进一步说明冷序列与热序列(重要的事情总是要多说几遍)

之前我们说的执行序列说的是以数据为核心,声明对某些数据执行顺序的封装,也就是说,一个执行序列封装了数据的执行步骤(第一步干什么,第二部干什么,最终转化成什么样子,类似生产线),再来说冷热序列,冷序列是被动执行的,热序列是主动执行的,被动执行的意思就是它的执行是由订阅者来触发,当Subscriber调用subscription的request方法的时候,传递一个数值类型的参数,该参数代表需要请求数据的数量,当publisher收到请求之后就会产出指定数量的数据,然后调用 onNext 方法来发部新数据;主动执行的意思就是在发布者创建对象的那一刻开始,就就已经开始生产新数据了,不管是否有订阅者请求数据,而且订阅者请求数据过程,发布者也不会按照订阅者请求的数量返还相应的数据,而是有数据生成的时候主动推送给订阅者,直到订阅者取消订阅或者发布者完成发布的操作(onComplete)。

上边介绍的推送日志的功能,就是一个典型热序列,我们创建了一个用于推送日志的发布者(Flux),然后将这个发布者当作Websocket请求链接的返回值返回给框架,这个时候,框架当中Websocket请求对象成为了我们的订阅者,当有新日志生成的时候,日志发布者就会将日志发布给WebSocket请求链接,请求链接再发送给客户端,netty里边具体怎么做的没有太细研究,应该是一个Channel等待就绪状态,当有戏数据的时候就变为就绪状态,select的时候直接拿到并发送到操作系统的IO当中(这是nio的执行逻辑,有可能是使用的epoll或者kqueue,具体看你使用的是什么操作系统,这俩是netty自己实现的,没太细看)。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,427评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,551评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,747评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,939评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,955评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,737评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,448评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,352评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,834评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,992评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,133评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,815评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,477评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,022评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,147评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,398评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,077评论 2 355

推荐阅读更多精彩内容