创建Observable:
Rx.Observable.create 是 Observable 构造函数的别名,它接收一个参数:subscribe 函数。
例子:每秒推送一个“Hi”
Rx.Observable.create(function subscribe(observer){
var i=setInterval((observer)=>{
observer.next("Hi")
},1000);
});
在上面的示例中,subscribe 函数是用来描述 Observable 最重要的一块。
订阅Observable:
示例中的 Observable 对象 observable 可以订阅,像这样:
observable.subscribe(x => console.log(x));
observable.subscribe 和 Observable.create(function subscribe(observer) {...}) 中的 subscribe 有着同样的名字,这并不是一个巧合。在库的实现中,这两个subscribe是不同的,但在实际使用中,你可以认为在概念上它们是等同的。
这个例子表明订阅行为在Observable 的多个观察者之间是不共享的。当使用一个观察者调用 observable.subscribe 时,Observable.create(function subscribe(observer) {...}) 中的 subscribe 函数就会为这个观察者运行一次。对 observable.subscribe 的每次调用都会触发针对给定观察者的独立设置。(每个已订阅的观察者都拥有 Observable 的独立执行)
以上简单的理解为:
订阅Observable的时候,需要传入一个observer。observer定义了自身的next、complete、error方法。observer的next方法会获得Observable的next方法推送过来的值,并按照自己的定义处理他们。当一个observer订阅Observable的时候,不会影响另外一个observer。(每个已订阅的观察者都拥有 Observable 的独立执行)
也就是说:只有Observable才可以被subscribe。只有observer才可以subscribe Observable。
订阅 Observable 像是调用函数, 并提供接收数据的回调函数。
取消订阅:
Observable.subscribe()返回一个Subscription对象。调用其unsubscribe()方法取消订阅。实现类似如下:
var observable = Rx.Observable.create(function subscribe(observer) {
// 追踪 interval 资源
var intervalID = setInterval(() => {
observer.next('hi');
}, 1000);
// 提供取消和清理 interval 资源的方法
return function unsubscribe() {
clearInterval(intervalID);
};
});
//这里返回一个函数声明
var subscription=observable.subscribe((x)=>console.log(x));
//执行取消订阅
subscription.unsubscribe();
Observer (观察者)
简单的讲,观察者是用来消费(处理)Observable 推送出来的值。
它是一个包含了3个方法的对象。每个方法分别对应一个Observable发送通知的类型。
如果你只提供了一个方法,但是没有将它附加到观察者对象上,像这样:
observable.subscribe(x => console.log('Observer got a next value: ' + x));
在 observable.subscribe 内部,它会创建一个观察者对象并使用第一个回调函数参数作为 next 的处理方法。
如果你没有提供某个回调函数,Observable 的执行也会正常运行,只是某些通知类型会被忽略,因为观察者中没有相对应的回调函数。
Subscription (订阅)
subscription通常只有一个unsubscribe()方法,这个函数用来释放资源或去取消 Observable 执行。
Subscription 还可以合在一起,这样一个 Subscription 调用 unsubscribe() 方法,可能会有多个 Subscription 取消订阅 。你可以通过把一个 Subscription 添加到另一个上面来做这件事:
subscription1.add(subscription2);//添加子订阅
subscription1.unsubscribe();// 2个都会取消订阅。
subscription1.remove(subscription2); //取消添加子订阅
Subject (主题)
什么是 Subject? - RxJS Subject 是一种特殊类型的 Observable,它允许将值多播给多个观察者,所以 Subject 是多播的,而普通的 Observables 是单播的(每个已订阅的观察者都拥有 Observable 的独立执行)。
Subject 像是 Observable,但是可以多播给多个观察者。Subject 还像是 EventEmitters,维护着多个监听器的注册表。
每个 Subject 都是 Observable 。
在 Subject 的内部,subscribe 不会调用发送值的新执行。它只是将给定的观察者注册到观察者列表中,类似于其他库或语言中的 addListener 的工作方式。
- 每个 Subject 都是观察者。
Subjects 是将任意 Observable 执行共享给多个观察者的唯一方式。
多播 Observable 在底层是通过使用 Subject 使得多个观察者可以看见同一个 Observable 执行。
多播的 Observables
- 多播 Observable 在底层是通过使用 Subject 使得多个观察者可以看见同一个 Observable 执行。(共享一个Observable )
在底层,这就是 multicast 操作符的工作原理:观察者订阅一个基础的 Subject,然后 Subject 订阅源 Observable 。
var subject = new Rx.Subject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
var observable = Rx.Observable.from([1, 2, 3]);
observable.subscribe(subject); // 你可以提供一个 Subject 进行订阅
执行结果:
observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
multicast 操作符返回一个 Observable,它看起来和普通的 Observable 没什么区别,但当订阅时就像是 Subject 。multicast 返回的是 ConnectableObservable,它只是一个有 connect() 方法的 Observable 。
var source = Rx.Observable.from([1, 2, 3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);
// 在底层使用了 `subject.subscribe({...})`:
multicasted.subscribe({
next: (v) => console.log('observerA: ' + v)
});
multicasted.subscribe({
next: (v) => console.log('observerB: ' + v)
});
// 在底层使用了 `source.subscribe(subject)`:
multicasted.connect();
connect() 方法十分重要,它决定了何时启动共享的 Observable 执行。因为 connect() 方法在底层执行了 source.subscribe(subject),所以它返回的是 Subscription,你可以取消订阅以取消共享的 Observable 执行。
Subject的三种变体类型:
- BehaviorSubject :主要用来表示随着时间的流逝,值会改变的情况。比如生日是Subject,而生日就是BehaviorSubject。
- ReplaySubject :当有新的订阅者订阅时,可以指定回放的值得个数,或者以 window time (以毫秒为单位) 规定范围内发生的值。
- AsyncSubject : 当被观察者的值推送完成时,才将最后一个值发送给观察者。类似于last ()操作符。