概念
RxJS
是 Reactive Extensions for JavaScript
的缩写,起源于 Reactive Extensions
,是一个基于可观测数据流 Stream
结合观察者模式和迭代器模式的一种异步编程的应用库。RxJS
是 Reactive
Extensions
在 JavaScript
上的实现。
注意!它跟React
没啥关系,笔者最初眼花把它看成了React.js
的缩写(耻辱啊!!!)
对于陌生的技术而言,我们一般的思路莫过于,打开百度(google),搜索,然后查看官方文档,或者从零散的博客当中,去找寻能够理解这项技术的信息。但在很多时候,仅从一些只言片语中,的确也很难真正了解到一门技术的来龙去脉。
本文将从学习的角度来解析这项技术具备的价值以及能给我们现有项目中带来的好处。
背景
从开发者角度来看,对于任何一项技术而言,我们经常会去谈论的,莫过于以下几点:
- 应用场景?
- 如何落地?
- 上手难易程度如何?
- 为什么需要它?它解决了什么问题?
针对以上问题,我们可以由浅入深的来刨析一下RxJS
的相关理念。
应用场景?
假设我们有这样一个需求:
我们上传一个大文件之后,需要实时监听他的进度,并且待进度进行到 100 的时候停止监听。
对于一般的做法我们可以采用短轮询的方式来实现,在对于异步请求的封装的时候,如果我们采用Promise
的方式,那么我们一般的做法就可以采用编写一个用于轮询的方法,获取返回值进行处理,如果进度没有完成则延迟一定时间再次调用该方法,同时在出现错误的时候需要捕获错误并处理。
显然,这样的处理方式无疑在一定程度上给开发者带来了一定开发和维护成本,因为这个过程更像是我们在观察一个事件,这个事件会多次触发并让我感知到,不仅如此还要具备取消订阅的能力,Promise
在处理这种事情时的方式其实并不友好,而RxJS
对于异步数据流的管理就更加符合这种范式。
引用尤大的话:
我个人倾向于在适合 Rx
的地方用 Rx
,但是不强求 Rx for everything
。比较合适的例子就是比如多个服务端实时消息流,通过 Rx
进行高阶处理,最后到 view
层就是很清晰的一个 Observable
,但是 view
层本身处理用户事件依然可以沿用现有的范式。
如何落地?
针对现有项目来说,如何与实际结合并保证原有项目的稳定性也的确是我们应该优先考虑的问题,毕竟任何一项技术如果无法落地实践,那么必然给我们带来的收益是比较有限的。
这里如果你是一名使用Angular
的开发者,或许你应该知道Angular
中深度集成了Rxjs
,只要你使用Angular
框架,你就不可避免的会接触到 RxJs 相关的知识。
在一些需要对事件进行更为精确控制的场景下,比如我们想要监听点击事件 (click event),但点击三次之后不再监听。
那么这个时候引入RxJS
进行功能开发是十分便利而有效的,让我们能省去对事件的监听并且记录点击的状态,以及需要处理取消监听的一些逻辑上的心理负担。
你也可以选择为你的大型项目引入RxJS
进行数据流的统一管理规范,当然也不要给本不适合RxJS
理念的场景强加使用,这样实际带来的效果可能并不明显。
上手难易程度如何?
如果你是一名具备一定开发经验的JavaScript
开发者,那么几分钟或许你就能将RxJS
应用到一些简单的实践中了。
为什么需要它?它解决了什么问题?
如果你是一名使用JavaScript
的开发者,在面对众多的事件处理,以及复杂的数据解析转化时,是否常常容易写出十分低效的代码或者是臃肿的判断以及大量脏逻辑语句?
不仅如此,在JavaScript
的世界里,就众多处理异步事件的场景中来看,“麻烦” 两个字似乎经常容易被提起,我们可以先从JS
的异步事件的处理方式发展史中来细细品味RxJS
带来的价值。
回调函数时代(callback)
使用场景:
- 事件回调
-
Ajax
请求 Node API
-
setTimeout
、setInterval
等异步事件回调
在上述场景中,我们最开始的处理方式就是在函数调用时传入一个回调函数,在同步或者异步事件完成之后,执行该回调函数。可以说在大部分简单场景下,采用回调函数的写法无疑是很方便的,比如我们熟知的几个高阶函数:
forEach
map
filter
[1, 2, 3].forEach(function (item, index) {
console.log(item, index);
})
他们的使用方式只需要我们传入一个回调函数即可完成对一组数据的批量处理,很方便也很清晰明了。
但在一些复杂业务的处理中,我们如果仍然秉持不抛弃不放弃的想法顽强的使用回调函数的方式就可能会出现下面的情况:
fs.readFile('a.txt', 'utf-8', function(err, data) {
fs.readFile('b.txt', 'utf-8', function(err, data1) {
fs.readFile('c.txt', 'utf-8', function(err, data2) {
})
})
})
当然作为编写者来说,你可能觉得说这个很清晰啊,没啥不好的。但是如果再复杂点呢,如果调用的函数都不一样呢,如果每一个回调里面的内容都十分复杂呢。短期内自己可能清楚为什么这么写,目的是什么,但是过了一个月、三个月、一年后,你确定在众多业务代码中你还能找回当初的本心吗?
你会不会迫不及待的查找提交记录,这是哪个憨批写的,跟shit
......,卧槽怎么是我写的。
这时候,面对众多开发者苦不堪言的回调地域
,终于还是有人出来造福人类了......
Promise 时代
Promise
最初是由社区提出(毕竟作为每天与奇奇怪怪的业务代码打交道的我们来说,一直用回调顶不住了啊),后来官方正式在ES6
中将其加入语言标准,并进行了统一规范,让我们能够原生就能new
一个Promise
。
就优势而言,Promise
带来了与回调函数不一样的编码方式,它采用链式调用,将数据一层一层往后抛,并且能够进行统一的异常捕获,不像使用回调函数就直接炸了,还得在众多的代码中一个个try catch
。
话不多说,看码!
function readData(filePath) {
return new Promise((resolve, reject) => {
fs.readFile(filePath, 'utf-8', (err, data) => {
if (err) reject(err);
resolve(data);
})
});
}
readData('a.txt').then(res => {
return readData('b.txt');
}).then(res => {
return readData('c.txt');
}).then(res => {
return readData('d.txt');
}).catch(err => {
console.log(err);
})
对比一下,这种写法会不会就更加符合我们正常的思维逻辑了,这种顺序下,让人看上去十分舒畅,也更利于代码的维护。
优点:
状态改变就不会再变,任何时候都能得到相同的结果
将异步事件的处理流程化,写法更方便
缺点:
无法取消
错误无法被
try catch
(但是可以使用.catch
方式)当处于
pending
状态时无法得知现在处在什么阶段
虽然Promise
的出现在一定程度上提高了我们处理异步事件的效率,但是在需要与一些同步事件的进行混合处理时往往我们还需要面临一些并不太友好的代码迁移,我们需要把原本放置在外层的代码移到Promise
的内部才能保证某异步事件完成之后再进行继续执行。
Generator 函数
ES6
新引入了 Generator
函数,可以通过 yield
关键字,把函数的执行流挂起,为改变执行流程提供了可能,从而为异步编程提供解决方案。形式上也是一个普通函数,但有几个显著的特征:
function
关键字与函数名之间有一个星号 "*" (推荐紧挨着function
关键字)函数体内使用
yield· 表达式,定义不同的内部状态 (可以有多个
yield`)直接调用
Generator
函数并不会执行,也不会返回运行结果,而是返回一个遍历器对象(Iterator Object
)依次调用遍历器对象的
next
方法,遍历Generator
函数内部的每一个状态
function* read(){
let a= yield '666';
console.log(a);
let b = yield 'ass';
console.log(b);
return 2
}
let it = read();
console.log(it.next());
console.log(it.next());
console.log(it.next());
console.log(it.next());
这种模式的写法我们可以自由的控制函数的执行机制,在需要的时候再让函数执行,但是对于日常项目中来说,这种写法也是不够友好的,无法给与使用者最直观的感受。
async / await
相信在经过许多面试题的洗礼后,大家或多或少应该也知道这玩意其实就是一个语法糖,内部就是把Generator
函数与自动执行器co
进行了结合,让我们能以同步的方式编写异步代码,十分畅快。
有一说一,这玩意着实好用,要不是要考虑兼容性,真就想大面积使用这种方式。
再来看看用它编写的代码有多快乐:
async readFileData() {
const data = await Promise.all([
'异步事件一',
'异步事件二',
'异步事件三'
]);
console.log(data);
}
直接把它当作同步方式来写,完全不要考虑把一堆代码复制粘贴的一个其他异步函数内部,属实简洁明了。
RxJS
它在使用方式上,跟Promise
有点像,但在能力上比Promise
强大多了,不仅仅能够以流的形式对数据进行控制,还内置许许多多的内置工具方法让我们能十分方便的处理各种数据层面的操作,让我们的代码如丝一般顺滑。
优势:
- 代码量的大幅度减少
- 代码可读性的提高
- 很好的处理异步
- 事件管理、调度引擎
- 十分丰富的操作符
- 声明式的编程风格
function readData(filePath) {
return new Observable((observer) => {
fs.readFile(filePath, 'utf-8', (err, data) => {
if (err) observer.error(err);
observer.next(data);
})
});
}
Rx.Observable
.forkJoin(readData('a.txt'), readData('b.txt'), readData('c.txt'))
.subscribe(data => console.log(data));
这里展示的仅仅是RxJS
能表达能量的冰山一角,对于这种场景的处理办法还有多种方式。RxJS
擅长处理异步数据流,而且具有丰富的库函数。对于RxJS
而言,他能将任意的Dom
事件,或者是Promise
转换成observables
。
前置知识点
在正式进入RxJS
的世界之前,我们首先需要明确和了解几个概念:
- 响应式编程(
Reactive Programming
) - 流(
Stream
) - 观察者模式(发布订阅)
- 迭代器模式
响应式编程(Reactive Programming)
响应式编程(Reactive Programming
),它是一种基于事件的模型。在上面的异步编程模式中,我们描述了两种获得上一个任务执行结果的方式,一个就是主动轮训,我们把它称为 Proactive
方式。另一个就是被动接收反馈,我们称为 Reactive
。简单来说,在 Reactive
方式中,上一个任务的结果的反馈就是一个事件,这个事件的到来将会触发下一个任务的执行。
响应式编程的思路大概如下:你可以用包括 Click
和 Hover
事件在内的任何东西创建 Data stream
(也称 “流”,后续章节详述)。Stream
廉价且常见,任何东西都可以是一个 Stream
:变量、用户输入、属性、Cache
、数据结构等等。举个例子,想像一下你的 Twitter feed
就像是 Click events
那样的 Data stream
,你可以监听它并相应的作出响应。
结合实际,如果你使用过Vue
,必然能够第一时间想到,Vue
的设计理念不也是一种响应式编程范式么,我们在编写代码的过程中,只需要关注数据的变化,不必手动去操作视图改变,这种Dom
层的修改将随着相关数据的改变而自动改变并重新渲染。
流(Stream
)
流作为概念应该是语言无关的。文件IO
流,Unix
系统标准输入输出流,标准错误流 (stdin
, stdout
, stderr
),还有一开始提到的 TCP
流,还有一些 Web
后台技术(如Nodejs
)对HTTP
请求 / 响应流的抽象,都可以见到流的概念。
作为响应式编程的核心,流的本质是一个按时间顺序排列的进行中事件的序列集合。
对于一流或多个流来说,我们可以对他们进行转化,合并等操作,生成一个新的流,在这个过程中,流是不可改变的,也就是只会在原来的基础返回一个新的stream
。
观察者模式
在众多设计模式中,观察者模式可以说是在很多场景下都有着比较明显的作用。
观察者模式是一种行为设计模式, 允许你定义一种订阅机制, 可在对象事件发生时通知多个 “观察” 该对象的其他对象。
用实际的例子来理解,就比如你订了一个银行卡余额变化短信通知的服务,那么这个时候,每次只要你转账或者是购买商品在使用这张银行卡消费之后,银行的系统就会给你推送一条短信,通知你消费了多少多少钱,这种其实就是一种观察者模式,又称发布 - 订阅模式。
在这个过程中,银行卡余额就是被观察的对象,而用户就是观察者。
优点:
- 降低了目标与观察者之间的耦合关系,两者之间是抽象耦合关系。
- 符合依赖倒置原则。
- 目标与观察者之间建立了一套触发机制。
- 支持广播通信
不足:
- 目标与观察者之间的依赖关系并没有完全解除,而且有可能出现循环引用。
- 当观察者对象很多时,通知的发布会花费很多时间,影响程序的效率。
迭代器模式
迭代器(Iterator
)模式又叫游标(Sursor
)模式,在面向对象编程里,迭代器模式是一种设计模式,是一种最简单也最常见的设计模式。迭代器模式可以把迭代的过程从从业务逻辑中分离出来,它可以让用户透过特定的接口巡访容器中的每一个元素而不用了解底层的实现。
const iterable = [1, 2, 3];
const iterator = iterable[Symbol.iterator]();
iterator.next();
iterator.next();
iterator.next();
iterator.next();
作为前端开发者来说,我们最常遇到的部署了iterator
接口的数据结构不乏有:Map
、Set
、Array
、类数组等等,我们在使用他们的过程中,均能使用同一个接口访问每个元素就是运用了迭代器模式。
Iterator
作用:
- 为各种数据结构,提供一个统一的、简便的访问接口;
- 使得数据结构的成员能够按某种次序排列;
- 为新的遍历语法
for...of
实现循环遍历
在许多文章中,有人会喜欢把迭代器和遍历器混在一起进行概念解析,其实他们表达的含义是一致的,或者可以说(迭代器等于遍历器)。
Observable
表示一个概念,这个概念是一个可调用的未来值或事件的集合。它能被多个observer
订阅,每个订阅关系相互独立、互不影响。
举个栗子:
假设你订阅了一个博客或者是推送文章的服务号(微信公众号之类的),之后只要公众号更新了新的内容,那么该公众号就会把新的文章推送给你,在这段关系中,这个公众号就是一个Observable
,用来产生数据的数据源。
相信看完上面的描述,你应该对Observable
是个什么东西有了一定的了解了,那么这就好办了,下面我们来看看在RxJS
中如何创建一个Observable
。
const Rx = require('rxjs/Rx')
const myObservable = Rx.Observable.create(observer => {
observer.next('foo');
setTimeout(() => observer.next('bar'), 1000);
});
我们可以调用Observable.create
方法来创建一个Observable
,这个方法接受一个函数作为参数,这个函数叫做 producer
函数, 用来生成 Observable
的值。这个函数的入参是 observer
,在函数内部通过调用 observer.next()
便可生成有一系列值的一个 Observable
。
我们先不应理会observer
是个什么东西,从创建一个Observable
的方式来看,其实也就是调用一个API
的事,十分简单,这样一个简单的Observable
对象就创建出来了。
Observer
一个回调函数的集合,它知道如何去监听由Observable
提供的值。Observer
在信号流中是一个观察者(哨兵)的角色,它负责观察任务执行的状态并向流中发射信号。
这里我们简单实现一下内部的构造:
const observer = {
next: function(value) {
console.log(value);
},
error: function(error) {
console.log(error)
},
complete: function() {
console.log('complete')
}
}
在RxJS
中,Observer
是可选的。在next
、error
和 complete
处理逻辑部分缺失的情况下,Observable
仍然能正常运行,为包含的特定通知类型的处理逻辑会被自动忽略。
比如我们可以这样定义:
const observer = {
next: function(value) {
console.log(value);
},
error: function(error) {
console.log(error)
}
}
它依旧是可以正常的运行。
那么它又是怎么来配合我们在实际战斗中使用的呢:
const myObservable = Rx.Observable.create((observer) => {
observer.next('111')
setTimeout(() => {
observer.next('777')
}, 3000)
})
myObservable.subscribe((text) => console.log(text));
这里直接使用subscribe
方法让一个observer
订阅一个Observable
,我们可以看看这个subscribe
的函数定义来看看怎么实现订阅的:
subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription;
源码是用ts
写的,代码即文档,十分清晰,这里笔者给大家解读一下,我们从入参来看,从左至右依次是next
、error
,complete
,且是可选的,我们可以自己选择性的传入相关回调,从这里也就印证了我们上面所说next
、error
和 complete
处理逻辑部分缺失的情况下仍可以正常运行,因为他们都是可选的。
Subscription 与 Subject
Subscription
Subscription
就是表示Observable
的执行,可以被清理。这个对象最常用的方法就是unsubscribe
方法,它不需要任何参数,只是用来清理由Subscription
占用的资源。同时,它还有add
方法可以使我们取消多个订阅。
const myObservable = Rx.Observable.create(observer => {
observer.next('foo');
setTimeout(() => observer.next('bar'), 1000);
});
const subscription = myObservable.subscribe(x => console.log(x));
subscription.unsubscribe();
Subject (主体)
它是一个代理对象,既是一个 Observable
又是一个 Observer
,它可以同时接受 Observable
发射出的数据,也可以向订阅了它的 observer
发射数据,同时,Subject
会对内部的 observers
清单进行多播 (multicast
)
Subjects
是将任意 Observable
执行共享给多个观察者的唯一方式
这个时候眼尖的读者会发现,这里产生了一个新概念——多播。
- 那么多播又是什么呢?
- 有了多播是不是还有单播?
- 他们的区别又是什么呢?
接下来就让笔者给大家好好分析这两个概念吧。
单播
普通的Observable
是单播的,那么什么是单播呢?
单播的意思是,每个普通的 Observables
实例都只能被一个观察者订阅,当它被其他观察者订阅的时候会产生一个新的实例。也就是普通 Observables
被不同的观察者订阅的时候,会有多个实例,不管观察者是从何时开始订阅,每个实例都是从头开始把值发给对应的观察者。
const Rx = require('rxjs/Rx')
const source = Rx.Observable.interval(1000).take(3);
source.subscribe((value) => console.log('A ' + value))
setTimeout(() => {
source.subscribe((value) => console.log('B ' + value))
}, 1000)
看到陌生的调用不要慌,后面会进行详细解析,这里的source
你可以理解为就是一个每隔一秒发送一个从 0 开始递增整数的Observable
就行了,且只会发送三次(take
操作符其实也就是限定拿多少个数就不在发送数据了。)。
从这里我们可以看出两个不同观察者订阅了同一个源(source
),一个是直接订阅,另一个延时一秒之后再订阅。
从打印的结果来看,A
从 0 开始每隔一秒打印一个递增的数,而B
延时了一秒,然后再从 0 开始打印,由此可见,A
与B
的执行是完全分开的,也就是每次订阅都创建了一个新的实例。
在许多场景下,我们可能会希望B
能够不从最初始开始接受数据,而是接受在订阅的那一刻开始接受当前正在发送的数据,这就需要用到多播能力了。
多播
那么如果实现多播能力呢,也就是实现我们不论什么时候订阅只会接收到实时的数据的功能。
可能这个时候会有小伙伴跳出来了,直接给个中间人来订阅这个源,然后将数据转发给A
和B
不就行了?
const source = Rx.Observable.interval(1000).take(3);
const subject = {
observers: [],
subscribe(target) {
this.observers.push(target);
},
next: function(value) {
this.observers.forEach((next) => next(value))
}
}
source.subscribe(subject);
subject.subscribe((value) => console.log('A ' + value))
setTimeout(() => {
subject.subscribe((value) => console.log('B ' + value))
}, 1000)
先分析一下代码,A
和B
的订阅和单播里代码并无差别,唯一变化的是他们订阅的对象由source
变成了subject
,然后再看看这个subject
包含了什么,这里做了一些简化,移除了error
、complete
这样的处理函数,只保留了next
,然后内部含有一个observers
数组,这里包含了所有的订阅者,暴露一个subscribe
用于观察者对其进行订阅。
在使用过程中,让这个中间商subject
来订阅source
,这样便做到了统一管理,以及保证数据的实时性,因为本质上对于source
来说只有一个订阅者。
这里主要是方便理解,简易实现了RxJS
中的Subject
的实例,这里的中间人可以直接换成RxJS
的Subject
类实例,效果是一样的
const source = Rx.Observable.interval(1000).take(3);
const subject = new Rx.Subject();
source.subscribe(subject);
subject.subscribe((value) => console.log('A ' + value))
setTimeout(() => {
subject.subscribe((value) => console.log('B ' + value))
}, 1000)
同样先来看看打印的结果是否符合预期,首先A
的打印结果并无变化,B
首次打印的数字现在是从 1 开始了,也就当前正在传输的数据,这下满足了我们需要获取实时数据的需求了。
不同于单播订阅者总是需要从头开始获取数据,多播模式能够保证数据的实时性。
除了以上这些,RxJS
还提供了Subject
的三个变体:
BehaviorSubject
ReplaySubject
AsyncSubject
BehaviorSubject
BehaviorSubject
是一种在有新的订阅时会额外发出最近一次发出的值的Subject
。
同样我们结合现实场景来进行理解,假设有我们需要使用它来维护一个状态,在它变化之后给所有重新订阅的人都能发送一个当前状态的数据,这就好比我们要实现一个计算属性,我们只关心该计算属性最终的状态,而不关心过程中变化的数,那么又该怎么处理呢?
我们知道普通的Subject
只会在当前有新数据的时候发送当前的数据,而发送完毕之后就不会再发送已发送过的数据,那么这个时候我们就可以引入BehaviorSubject
来进行终态维护了,因为订阅了该对象的观察者在订阅的同时能够收到该对象发送的最近一次的值,这样就能满足我们上述的需求了。
然后再结合代码来分析这种Subject
应用的场景:
const subject = new Rx.Subject();
subject.subscribe((value) => console.log('A:' + value))
subject.next(1);
subject.next(2);
setTimeout(() => {
subject.subscribe((value) => console.log('B:' + value));
}, 1000)
首先演示的是采用普通Subject
来作为订阅的对象,然后观察者A
在实例对象subject
调用next
发送新的值之前订阅的,然后观察者是延时一秒之后订阅的,所以A
接受数据正常,那么这个时候由于B
在数据发送的时候还没订阅,所以它并没有收到数据。
那么我们再来看看采用BehaviorSubject
实现的效果:
const subject = new Rx.BehaviorSubject(0);
subject.subscribe((value: number) => console.log('A:' + value))
subject.next(1);
subject.next(2);
setTimeout(() => {
subject.subscribe((value: number) => console.log('B:' + value))
}, 1000)
同样从打印的结果来看,与普通Subject
的区别在于,在订阅的同时源对象就发送了最近一次改变的值(如果没改变则发送初始值),这个时候我们的B
也如愿获取到了最新的状态。
这里在实例化BehaviorSubject
的时候需要传入一个初始值。
ReplaySubject
在理解了BehaviorSubject
之后再来理解ReplaySubject
就比较轻松了,ReplaySubject
会保存所有值,然后回放给新的订阅者,同时它提供了入参用于控制重放值的数量(默认重放所有)。
什么?还不理解?看码:
const subject = new Rx.ReplaySubject(2);
subject.next(0);
subject.next(1);
subject.next(2);
subject.subscribe((value: number) => console.log('A:' + value))
subject.next(3);
subject.next(4);
setTimeout(() => {
subject.subscribe((value: number) => console.log('B:' + value))
}, 1000)
我们先从构造函数传参来看,BehaviorSubject
与ReplaySubject
都需要传入一个参数,对BehaviorSubject
来说是初始值,而对于ReplaySubject
来说就是重放先前多少次的值,如果不传入重放次数,那么它将重放所有发射过的值。
从结果上看,如果你不传入确定的重放次数,那么实现的效果与之前介绍的单播效果几乎没有差别。
所以我们再分析代码可以知道在订阅的那一刻,观察者们就能收到源对象前多少次发送的值。
AsyncSubject
AsyncSubject
只有当 Observable
执行完成时 (执行complete()
),它才会将执行的最后一个值发送给观察者,如果因异常而终止,AsyncSubject
将不会释放任何数据,但是会向Observer
传递一个异常通知。
AsyncSubject
一般用的比较少,更多的还是使用前面三种。
const subject = new Rx.AsyncSubject();
subject.next(1);
subject.subscribe(res => {
console.log('A:' + res);
});
subject.next(2);
subject.subscribe(res => {
console.log('B:' + res);
});
subject.next(3);
subject.subscribe(res => {
console.log('C:' + res);
});
subject.complete();
subject.next(4);
从打印结果来看其实已经很好理解了,也就是说对于所有的观察者们来说,源对象只会在所有数据发送完毕也就是调用complete
方法之后才会把最后一个数据返回给观察者们。
这就好比小说里经常有的,当你要放技能的时候,先要打一套起手式,打完之后才会放出你的大招。
Cold-Observables 与 Hot-Observables
Cold Observables
Cold Observables
只有被 observers
订阅的时候,才会开始产生值。是单播的,有多少个订阅就会生成多少个订阅实例,每个订阅都是从第一个产生的值开始接收值,所以每个订阅接收到的值都是一样的。
如果大家想要参考Cold Observables
相关代码,直接看前面的单播示例就行了。
正如单播描述的能力,不管观察者们什么时候开始订阅,源对象都会从初始值开始把所有的数都发给该观察者。
Hot Observables
Hot Observables
不管有没有被订阅都会产生值。是多播的,多个订阅共享同一个实例,是从订阅开始接受到值,每个订阅接收到的值是不同的,取决于它们是从什么时候开始订阅。
这里有几种场景,我们可以逐一分析一下便于理解:
“加热”
首先可以忽略代码中出现的陌生的函数,后面会细说。
const source = Rx.Observable.of(1, 2).publish();
source.connect();
source.subscribe((value) => console.log('A:' + value));
setTimeout(() => {
source.subscribe((value) => console.log('B:' + value));
}, 1000);
这里首先用Rx
的操作符of
创建了一个Observable
,并且后面跟上了一个publish
函数,在创建完之后调用connect
函数进行开始数据发送。
最终代码的执行结果就是没有任何数据打印出来,分析一下原因其实也比较好理解,由于开启数据发送的时候还没有订阅,并且这是一个Hot Observables
,它是不会理会你是否有没有订阅它,开启之后就会直接发送数据,所以A
和B
都没有接收到数据。
当然你这里如果把connect
方法放到最后,那么最终的结果就是A
接收到了,B
还是接不到,因为A
在开启发数据之前就订阅了,而B
还要等一秒。
更直观的场景
正如上述多播所描述的,其实我们更多想看到的现象是能够A
和B
两个观察者能够都有接收到数据,然后观察数据的差别,这样会方便理解。
这里直接换一个发射源:
const source = Rx.Observable.interval(1000).take(3).publish();
source.subscribe((value: number) => console.log('A:' + value));
setTimeout(() => {
source.subscribe((value: number) => console.log('B:' + value));
}, 3000);
source.connect();
这里我们利用interval
配合take
操作符每秒发射一个递增的数,最多三个,然后这个时候的打印结果就更清晰了,A
正常接收到了三个数,B
三秒之后才订阅,所以只接收到了最后一个数 2,这种方式就是上述多播所描述的并无一二。
两者对比
-
Cold Observables
:举个栗子会比较好理解一点:比如我们上 B 站看番,更新了新番,我们不论什么时候去看,都能从头开始看到完整的剧集,与其他人看不看毫无关联,互不干扰。 -
Hot Observables
:这就好比我们上 B 站看直播,直播开始之后就直接开始播放了,不管是否有没有订阅者,也就是说如果你没有一开始就订阅它,那么你过一段时候后再去看,是不知道前面直播的内容的。
上述代码中出现的操作符解析
在创建Hot Observables
时我们用到了publish
与connect
函数的结合,其实调用了publish
操作符之后返回的结果是一个ConnectableObservable
,然后该对象上提供了connect
方法让我们控制发送数据的时间。
-
publish
:这个操作符把正常的Observable
(Cold Observables
)转换成ConnectableObservable
。 -
ConnectableObservable
:ConnectableObservable
是多播的共享Observable
,可以同时被多个observers
共享订阅,它是Hot Observables
。ConnectableObservable
是订阅者和真正的源头Observables
(上面例子中的interval
,每隔一秒发送一个值,就是源头Observables
)的中间人,ConnectableObservable
从源头Observables
接收到值然后再把值转发给订阅者。 -
connect()
:ConnectableObservable
并不会主动发送值,它有个connect
方法,通过调用connect
方法,可以启动共享ConnectableObservable
发送值。当我们调用ConnectableObservable.prototype.connect
方法,不管有没有被订阅,都会发送值。订阅者共享同一个实例,订阅者接收到的值取决于它们何时开始订阅。
其实这种手动控制的方式还挺麻烦的,有没有什么更加方便的操作方式呢,比如监听到有订阅者订阅了才开始发送数据,一旦所有订阅者都取消了,就停止发送数据?其实也是有的,让我们看看引用计数(refCount
):
引用计数
这里主要用到了publish
结合refCount
实现一个 “自动挡” 的效果。
const source = Rx.Observable.interval(1000).take(3).publish().refCount();
setTimeout(() => {
source.subscribe(data => { console.log("A:" + data) });
setTimeout(() => {
source.subscribe(data => { console.log("B:" + data) });
}, 1000);
}, 2000);
我们透过结果看本质,能够很轻松的发现,只有当A
订阅的时候才开始发送数据(A
拿到的数据是从 0 开始的),并且当B
订阅时,也是只能获取到当前发送的数据,而不能获取到之前的数据。
不仅如此,这种 “自动挡” 当所有订阅者都取消订阅的时候它就会停止再发送数据了。
Schedulers(调度器)
用来控制并发并且是中央集权的调度员,允许我们在发生计算时进行协调,例如 setTimeout
或 requestAnimationFrame
或其他。
- 调度器是一种数据结构。 它知道如何根据优先级或其他标准来存储任务和将任务进行排序。
- 调度器是执行上下文。 它表示在何时何地执行任务 (举例来说,立即的,或另一种回调函数机制 (比如
setTimeout
或process.nextTick
),或动画帧)。 - 调度器有一个 (虚拟的) 时钟。 调度器功能通过它的
getter
方法now()
提供了 “时间” 的概念。在具体调度器上安排的任务将严格遵循该时钟所表示的时间。
学到这相信大家也已经或多或少对RxJS
有一定了解了,不知道大家有没有发现一个疑问,前面所展示的代码示例中有同步也有异步,而笔者却没有显示的控制他们的执行,他们的这套执行机制到底是什么呢?
其实他们的内部的调度就是靠的Schedulers
来控制数据发送的时机,许多操作符会预设不同的Scheduler
,所以我们不需要进行特殊处理他们就能良好的进行同步或异步运行。
const source = Rx.Observable.create(function (observer: any) {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
});
console.log('订阅前');
source.observeOn(Rx.Scheduler.async)
.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
console.log('订阅后');
从打印结果上来看,数据的发送时机的确已经由同步变成了异步,如果不进行调度方式修改,那么 “订阅后” 的打印应该是在数据发送完毕之后才会执行的。
看完示例之后我们再来研究这个调度器能做哪几种调度:
queue
asap
async
animationFrame
queue
将每个下一个任务放在队列中,而不是立即执行
queue
延迟使用调度程序时,其行为与 async
调度程序相同。
当没有延迟使用时,它将同步安排给定的任务 - 在安排好任务后立即执行。但是,当递归调用时(即在已调度的任务内部),将使用队列调度程序调度另一个任务,而不是立即执行,该任务将被放入队列并等待当前任务完成。
这意味着,当您使用 queue
调度程序执行任务时,您确定它会在该调度程序调度的其他任何任务开始之前结束。
这个同步与我们平常理解的同步可能不太一样,笔者当时也都困惑了一会。
还是用一个官方的例子来讲解这种调度方式是怎么理解吧:
import { queueScheduler } from 'rxjs';
queueScheduler.schedule(() => {
queueScheduler.schedule(() => console.log('second'));
console.log('first');
});
我们无需关注陌生的函数调用,我们这里着重于看这种调度方式与平常的同步调度的区别。
首先我们调用queueScheduler
的schedule
方法开始执行,然后函数内部又同样再以同样的方式调用(这里也可以改成递归,不过这里用这个示例去理解可能会好一点),并且传入一个函数,打印second
。
然后继续看下面的语句,一个普通的console.log('first')
,然后我们再来看看打印结果:
是不是有点神奇,如果没看明白为啥的,可以再回头看看前面queue
对于递归执行的处理方式。也就是说如果递归调用,它内部会维护一个队列,然后等待先加入队列的任务先执行完成(也就是上面的console.log('first')
执行完才会执行console.log('second')
,因为console.log('second')
这个任务是后加入该队列的)。
asap
内部基于Promise
实现(Node
端采用process.nextTick
),他会使用可用的最快的异步传输机制,如果不支持Promise
或process.nextTick
或者Web Worker
的 MessageChannel
也可能会调用setTimeout
方式进行调度。
async
与asap
方式很像,只不过内部采用setInterval
进行调度,大多用于基于时间的操作符。
animationFrame
从名字看其实相信大家已经就能略知一二了,内部基于requestAnimationFrame
来实现调度,所以执行的时机将与window.requestAnimationFrame
保持一致,适用于需要频繁渲染或操作动画的场景。
Operators
Operator 概念
采用函数式编程风格的纯函数 (pure function
),使用像 map
、filter
、concat
、flatMap
等这样的操作符来处理集合。也正因为他的纯函数定义,所以我们可以知道调用任意的操作符时都不会改变已存在的Observable
实例,而是会在原有的基础上返回一个新的Observable
。
尽管 RxJS
的根基是 Observable
,但最有用的还是它的操作符。操作符是允许复杂的异步代码以声明式的方式进行轻松组合的基础代码单元。