RxJS 用法总结
本博客只总结了常用的部分RxJS方法, RxJS可能会让你对响应式编程产生新的理解。RxJS 囊括了很多数据流处理方法,比如:scan、concat、map、debounceTime、filter等等,下面就着重介绍作者认为常用的一些操作符,以及demo:
组合系列
1. concat
注意:第一个observable订阅完毕后,才会订阅第二个,如果第一个完不成,或者延时完成第二个也要一直等下去。
import { concat } from 'RxJS/operators';
import { of } from 'RxJS';
// 发出 1,2,3
const sourceOne = of(1, 2, 3);
// 发出 4,5,6
const sourceTwo = of(4, 5, 6);
// 先发出 sourceOne 的值,当完成时订阅 sourceTwo
const example = sourceOne.pipe(concat(sourceTwo));
const subscribe = example.subscribe(val =>
console.log(val)
);
// 输出: 1,2,3,4,5,6
2. merge
注意:此方法为合并多个observable为一个,然后订阅
import { mapTo } from 'RxJS/operators';
import { interval, merge } from 'RxJS';
// 每2.5秒发出值
const first = interval(2500);
// 每2秒发出值
const second = interval(2000);
// 每1.5秒发出值
const third = interval(1500);
// 每1秒发出值
const fourth = interval(1000);
// 从一个 observable 中发出输出值
const example = merge(
first.pipe(mapTo('FIRST!')),
second.pipe(mapTo('SECOND!')),
third.pipe(mapTo('THIRD')),
fourth.pipe(mapTo('FOURTH'))
);
const subscribe = example.subscribe(val => console.log(val));
// 输出: "FOURTH", "THIRD", "SECOND!", "FOURTH", "FIRST!", "THIRD", "FOURTH"
3. race
注意:此方法为接收第一个返回的observable的值,其他的丢弃
import { mapTo } from 'RxJS/operators';
import { interval } from 'RxJS/observable/interval';
import { race } from 'RxJS/observable/race';
// 接收第一个发出值的 observable
const example = race(
// 每1.5秒发出值
interval(1500),
// 每1秒发出值
interval(1000).pipe(mapTo('1s won!')),
// 每2秒发出值
interval(2000),
// 每2.5秒发出值
interval(2500)
);
const subscribe = example.subscribe(val => console.log(val));
// 输出: "1s won!"
4. startWith
注意:给定的第一个值
import { startWith } from 'RxJS/operators';
import { of } from 'RxJS';
// 发出 (1,2,3)
const source = of(1, 2, 3);
// 从0开始
const example = source.pipe(startWith(0));
const subscribe = example.subscribe(val => console.log(val));
// 输出: 0,1,2,3
5. zip
注意: zip操作符会订阅所有内部 observables,然后等待每个发出一个值。一旦发生这种情况,将发出具有相应索引的所有值。这会持续进行,直到至少一个内部 observable 完成。
import { delay } from 'RxJS/operators';
import { of, zip } from 'RxJS';
const sourceOne = of('Hello');
const sourceTwo = of('World!');
const sourceThree = of('Goodbye');
const sourceFour = of('World!');
// 一直等到所有 observables 都发出一个值,才将所有值作为数组发出
const example = zip(
sourceOne,
sourceTwo.pipe(delay(1000)),
sourceThree.pipe(delay(2000)),
sourceFour.pipe(delay(3000))
);
const subscribe = example.subscribe(val => console.log(val));
// 输出: ["Hello", "World!", "Goodbye", "World!"]
条件系列
1. defaultIfEmpty
注意:如果在完成时没有发出任何通知,那么发出给定的值
import { defaultIfEmpty } from 'RxJS/operators';
import { of } from 'RxJS';
// 当源 observable 为空时,发出 'Observable.of() Empty!',否则发出源的任意值
const exampleOne = of().pipe(defaultIfEmpty('Observable.of() Empty!'));
const subscribe = exampleOne.subscribe(val => console.log(val));
// 输出: 'Observable.of() Empty!'
2. every
注意:如果完成时所有的值都能通过断言,那么发出 true,否则发出 false 。
import { every } from 'RxJS/operators';
import { of } from 'RxJS';
// 发出5个值
const source = of(1, 2, 3, 4, 5);
const example = source.pipe(
// 每个值都是偶数吗?
every(val => val % 2 === 0)
);
const subscribe = example.subscribe(val => console.log(val));
// 输出: false
创建系列
1. create
注意:使用给定的订阅函数来创建 observable 。
import { Observable } from 'RxJS';
/*
创建在订阅函数中发出 'Hello' 和 'World' 的 observable 。
*/
const hello = Observable.create(function(observer) {
observer.next('Hello');
observer.next('World');
});
const subscribe = hello.subscribe(val => console.log(val));
// 输出: 'Hello'...'World'
2. empty
注意: 立即完成的空的observable 。
import { empty } from 'RxJS';
const subscribe = empty().subscribe({
next: () => console.log('Next'),
complete: () => console.log('Complete!')
});
// 输出: 'Complete!'
3. from
注意:将数组、promise 或迭代器转换成 observable
1. 数组转化
import { from } from 'RxJS';
// 将数组作为值的序列发出
const arraySource = from([1, 2, 3, 4, 5]);
// 输出: 1,2,3,4,5
const subscribe = arraySource.subscribe(val => console.log(val));
2. promise转化
import { from } from 'RxJS';
// 发出 promise 的结果
const promiseSource = from(new Promise(resolve => resolve('Hello World!')));
// 输出: 'Hello World'
const subscribe = promiseSource.subscribe(val => console.log(val));
4. interval
注意:基于给定时间间隔发出数字序列。
import { interval } from 'RxJS';
// 每1秒发出数字序列中的值
const source = interval(1000);
const subscribe = source.subscribe(val => console.log(val));
// 输出数字: 0,1,2,3,4,5....
5. of
注意:按顺序发出任意数量的值。
import { of } from 'RxJS';
// 依次发出提供的任意数量的值
const source = of(1, 2, 3, 4, 5);
const subscribe = source.subscribe(val => console.log(val));
// 输出: 1,2,3,4,5
6. throw
注意: 在订阅上发出错误
import { throwError } from 'RxJS';
// 在订阅上使用指定值来发出错误
const source = throwError('This is an error!');
const subscribe = source.subscribe({
next: val => console.log(val),
complete: () => console.log('Complete!'),
error: val => console.log(`Error: ${val}`)
});
// 输出: 'Error: This is an error!'
7. timer
注意:第二个参数时选传参数,传了之后,会按照第二个参数的毫秒数不停歇发送数据,如果不传,则只会发生一次
import { timer } from 'RxJS';
/*
timer 接收第二个参数,它决定了发出序列值的频率,在本例中我们在1秒发出第一个值,
然后每2秒发出序列值
*/
const source = timer(1000, 2000);
// 输出: 0,1,2,3,4,5......
const subscribe = source.subscribe(val => console.log(val));
错误处理系列
1. catchError
注意: 优雅地处理 observable 序列中的错误
import { throwError, of } from 'RxJS';
import { catchError } from 'RxJS/operators';
// 发出错误
const source = throwError('This is an error!');
// 优雅地处理错误,并返回带有错误信息的 observable
const example = source.pipe(catchError(val => of(`I caught: ${val}`)));
const subscribe = example.subscribe(val => console.log(val));
// 输出: 'I caught: This is an error'
2. retry/ retryWhen
注意:如果发生错误,以指定次数重试 observable 序列 / 当发生错误时,基于自定义的标准来重试 observable 序列。
1. retry
import { interval, of, throwError } from 'RxJS';
import { mergeMap, retry } from 'RxJS/operators';
// 每1秒发出值
const source = interval(1000);
const example = source.pipe(
mergeMap(val => {
// 抛出错误以进行演示
if (val > 5) {
return throwError('Error!');
}
return of(val);
}),
// 出错的话可以重试2次
retry(2)
);
const subscribe = example.subscribe({
next: val => console.log(val),
error: val => console.log(`${val}: Retried 2 times then quit!`)
});
/*
输出:
0..1..2..3..4..5..
0..1..2..3..4..5.. 重试的第一次
0..1..2..3..4..5.. 重试的第二次
"Error!: Retried 2 times then quit!"
*/
2. retryWhen
当发生错误时,基于自定义的标准来重试 observable 序列。
import { timer, interval } from 'RxJS';
import { map, tap, retryWhen, delayWhen } from 'RxJS/operators';
// 每1秒发出值
const source = interval(1000);
const example = source.pipe(
map(val => {
if (val > 5) {
// 错误将由 retryWhen 接收
throw val;
}
return val;
}),
retryWhen(errors =>
errors.pipe(
// 输出错误信息
tap(val => console.log(`Value ${val} was too high!`)),
// 5秒后重启
delayWhen(val => timer(val * 1000))
)
)
);
const subscribe = example.subscribe(val => console.log(val));
/*
输出:
0
1
2
3
4
5
"Value 6 was too high!"
--等待5秒后然后重复此过程
*/
多播系列
1. share
注意: 在多个订阅者间共享源 observable 。
import { timer } from 'RxJS';
import { tap, mapTo, share } from 'RxJS/operators';
// 1秒后发出值
const source = timer(1000);
// 输出副作用,然后发出结果
const example = source.pipe(
tap(() => console.log('***SIDE EFFECT***')),
mapTo('***RESULT***')
);
const subscribe = example.subscribe(val => console.log(val));
const subscribeTwo = example.subscribe(val => console.log(val));
/*
***不共享的话,副作用会执行两次***
输出:
"***SIDE EFFECT***"
"***RESULT***"
"***SIDE EFFECT***"
"***RESULT***"
*/
// 在多个订阅者间共享 observable
const sharedExample = example.pipe(share());
const subscribeThree = sharedExample.subscribe(val => console.log(val));
const subscribeFour = sharedExample.subscribe(val => console.log(val));
/*
***共享的话,副作用只执行一次***
输出:
"***SIDE EFFECT***"
"***RESULT***"
"***RESULT***"
*/
2. shareReplay
注意:共享源 observable 并重放指定次数的发出。
为什么使用 shareReplay?
通常啊,当有副作用或繁重的计算时,你不希望在多个订阅者之间重复执行时,会使用 shareReplay 。 当你知道流的后来订阅者也需要访问之前发出的值,shareReplay 在这种场景下也是有价值的。 这种在订阅过程中重放值的能力是区分 share 和 shareReplay 的关键。
import { Subject, ReplaySubject } from 'RxJS';
import { pluck, share, shareReplay, tap } from 'RxJS/operators';
// 使用 subject 模拟 url 的变化
const routeEnd = new Subject<{data: any, url: string}>();
// 提取 url 并与后来订阅者共享
const lastUrl = routeEnd.pipe(
tap(_ => console.log('executed')),
pluck('url'),
// 默认为所有值,因此我们将其设置为仅保留并重播最后一个值
shareReplay(1)
);
// 起始订阅者是必须的
const initialSubscriber = lastUrl.subscribe(console.log)
// 模拟路由变化
// 输出: 'executed', 'my-path'
routeEnd.next({data: {}, url: 'my-path'});
// 输出: 'my-path'
const lateSubscriber = lastUrl.subscribe(console.log);
过滤系列
1. debounceTime
注意:舍弃掉在两次输出之间小于指定时间的发出值
import { fromEvent, timer } from 'RxJS';
import { debounceTime, map } from 'RxJS/operators';
const input = document.getElementById('example');
// 对于每次键盘敲击,都将映射成当前输入值
const example = fromEvent(input, 'keyup').pipe(map(i => i.currentTarget.value));
// 在两次键盘敲击之间等待0.5秒方才发出当前值,
// 并丢弃这0.5秒内的所有其他值
const debouncedInput = example.pipe(debounceTime(500));
// 输出值
const subscribe = debouncedInput.subscribe(val => {
console.log(`Debounced Input: ${val}`);
});
2. distinctUntilChanged
注意:只有当当前值与之前最后一个值不同时才将其发出。
import { from } from 'RxJS';
import { distinctUntilChanged } from 'RxJS/operators';
// 基于最新发出的值进行比较,只输出不同的值
const myArrayWithDuplicatesInARow = from([1, 1, 2, 2, 3, 1, 2, 3]);
const distinctSub = myArrayWithDuplicatesInARow
.pipe(distinctUntilChanged())
.subscribe(val => console.log('DISTINCT SUB:', val));
// 输出: 1,2,3,1,2,3
const nonDistinctSub = myArrayWithDuplicatesInARow
.subscribe(val => console.log('NON DISTINCT SUB:', val));
// 输出 : 1,1,2,2,3,1,2,3
3. filter
注意: 发出符合给定条件的值。
import { from } from 'RxJS';
import { filter } from 'RxJS/operators';
// 发出 (1,2,3,4,5)
const source = from([1, 2, 3, 4, 5]);
// 过滤掉奇数
const example = source.pipe(filter(num => num % 2 === 0));
// 输出: "Even number: 2", "Even number: 4"
const subscribe = example.subscribe(val => console.log(`Even number: ${val}`));
4. take
注意:在完成前发出N个值(N由参数决定)。
为什么使用 take?
当只对开头的一组值感兴趣时,你想要的便是 take 操作符。也许你想看看当用户第一次进入页面时,用户首先点击的是什么,你想要订阅点击事件并只取首个值。举例来说,你想要观看赛跑,但其实你只对首先冲过终点的人感兴趣。此操作符很清晰明了,你想要取开头n个值。
import { of } from 'RxJS';
import { take } from 'RxJS/operators';
// 发出 1,2,3,4,5
const source = of(1, 2, 3, 4, 5);
// 取第一个发出的值然后完成
const example = source.pipe(take(1));
// 输出: 1
const subscribe = example.subscribe(val => console.log(val));
5. takeUntil
注意:发出值,直到提供的 observable 发出值,它便完成。
import { interval, timer } from 'RxJS';
import { takeUntil } from 'RxJS/operators';
// 每1秒发出值
const source = interval(1000);
// 5秒后发出值
const timer$ = timer(5000);
// 当5秒后 timer 发出值时, source 则完成
const example = source.pipe(takeUntil(timer$));
// 输出: 0,1,2,3
const subscribe = example.subscribe(val => console.log(val));
转换系列
1. bufferTime
注意:收集发出的值,直到经过了提供的时间才将其作为数组发出。
import { interval } from 'RxJS';
import { bufferTime } from 'RxJS/operators';
// 创建每500毫秒发出值的 observable
const source = interval(500);
// 2秒后,将缓冲值作为数组发出
const example = source.pipe(bufferTime(2000));
// 打印值到控制台
// 输出: [0,1,2]...[3,4,5,6]
const subscribe = example.subscribe(val =>
console.log('Buffered with Time:', val)
);
2. concatMap
注意:将值映射成内部 observable,并按顺序订阅和发出
注意 concatMap 和 mergeMap 之间的区别。 因为 concatMap 之前前一个内部 observable 完成后才会订阅下一个, source 中延迟 2000ms 值会先发出。 对比的话, mergeMap 会立即订阅所有内部 observables, 延迟少的 observable (1000ms) 会先发出值,然后才是 2000ms 的 observable 。
import { of } from 'RxJS';
import { concatMap, delay, mergeMap } from 'RxJS/operators';
// 发出延迟值
const source = of(2000, 1000);
// 将内部 observable 映射成 source,当前一个完成时发出结果并订阅下一个
const example = source.pipe(
concatMap(val => of(`Delayed by: ${val}ms`).pipe(delay(val)))
);
// 输出: With concatMap: Delayed by: 2000ms, With concatMap: Delayed by: 1000ms
const subscribe = example.subscribe(val =>
console.log(`With concatMap: ${val}`)
);
// 展示 concatMap 和 mergeMap 之间的区别
const mergeMapExample = source
.pipe(
// 只是为了确保 meregeMap 的日志晚于 concatMap 示例
delay(5000),
mergeMap(val => of(`Delayed by: ${val}ms`).pipe(delay(val)))
)
.subscribe(val => console.log(`With mergeMap: ${val}`));
3. map
注意:对源 observable 的每个值应用投射函数
import { from } from 'RxJS';
import { map } from 'RxJS/operators';
// 发出 (1,2,3,4,5)
const source = from([1, 2, 3, 4, 5]);
// 每个数字加10
const example = source.pipe(map(val => val + 10));
// 输出: 11,12,13,14,15
const subscribe = example.subscribe(val => console.log(val));
4. mergeMap / flatMap ( flatMap 是 mergeMap 的别名)
注意:映射成 observable 并发出值。 使用场景,比如调用多个接口以后,将几个接口值进行合并等操作
import { of } from 'RxJS';
import { mergeMap } from 'RxJS/operators';
// 发出 'Hello'
const source = of('Hello');
// 映射成 observable 并将其打平
const example = source.pipe(mergeMap(val => of(`${val} World!`)));
// 输出: 'Hello World!'
const subscribe = example.subscribe(val => console.log(val));
5. scan
注意:随着时间的推移进行归并。用于几个接口值的累加
import { of } from 'RxJS';
import { scan } from 'RxJS/operators';
const source = of(1, 2, 3);
// 基础的 scan 示例,从0开始,随着时间的推移计算总数
const example = source.pipe(scan((acc, curr) => acc + curr, 0));
// 输出累加值
// 输出: 1,3,6
const subscribe = example.subscribe(val => console.log(val));
6. switchMap
注意:映射成 observable,完成前一个内部 observable,发出值。用于短时间多次重复调用一个接口,但是只读取最后一次的场景,比如实时搜索框搜索。
import { interval, fromEvent } from 'RxJS';
import { switchMap, mapTo } from 'RxJS/operators';
// 发出每次点击
const source = fromEvent(document, 'click');
// 如果3秒内发生了另一次点击,则消息不会被发出
const example = source.pipe(
switchMap(val => interval(3000).pipe(mapTo('Hello, I made it!')))
);
// (点击)...3s...'Hello I made it!'...(点击)...2s(点击)...
const subscribe = example.subscribe(val => console.log(val));
工具系列
1. do / tap
注意:透明地执行操作或副作用,比如打印日志。
import { of } from 'RxJS';
import { tap, map } from 'RxJS/operators';
const source = of(1, 2, 3, 4, 5);
// 使用 tap 透明地打印 source 中的值
const example = source.pipe(
tap(val => console.log(`BEFORE MAP: ${val}`)),
map(val => val + 10),
tap(val => console.log(`AFTER MAP: ${val}`))
);
// 'tap' 并不转换值
const subscribe = example.subscribe(val => console.log(val));
// 输出: 11...12...13...14...15
2. delay
注意:根据给定时间延迟发出值。
import { of, merge } from 'RxJS';
import { mapTo, delay } from 'RxJS/operators';
// 发出一项
const example = of(null);
// 每延迟一次输出便增加1秒延迟时间
const message = merge(
example.pipe(mapTo('Hello')),
example.pipe(
mapTo('World!'),
delay(1000)
),
example.pipe(
mapTo('Goodbye'),
delay(2000)
),
example.pipe(
mapTo('World!'),
delay(3000)
)
);
// 输出: 'Hello'...'World!'...'Goodbye'...'World!'
const subscribe = message.subscribe(val => console.log(val));
3. toPromise
注意:将 observable 转换成 promise 。
// 返回基础的 observable
const sample = val => Rx.Observable.of(val).delay(5000);
// 将基础的 observable 转换成 promise
const example = sample('First Example')
.toPromise()
// 输出: 'First Example'
.then(result => {
console.log('From Promise:', result);
});
如果您想全面学习RxJS,请前往RxJS中文网
本文作者原创,仅供学习交流使用,转载需注明出处。