RxJS 用法(操作符)总结

RxJS 用法总结

本博客只总结了常用的部分RxJS方法, RxJS可能会让你对响应式编程产生新的理解。RxJS 囊括了很多数据流处理方法,比如:scan、concat、map、debounceTime、filter等等,下面就着重介绍作者认为常用的一些操作符,以及demo:

组合系列

1. concat

注意:第一个observable订阅完毕后,才会订阅第二个,如果第一个完不成,或者延时完成第二个也要一直等下去。

import { concat } from 'RxJS/operators';
import { of } from 'RxJS';
    
// 发出 1,2,3
const sourceOne = of(1, 2, 3);
// 发出 4,5,6
const sourceTwo = of(4, 5, 6);
// 先发出 sourceOne 的值,当完成时订阅 sourceTwo
const example = sourceOne.pipe(concat(sourceTwo));
const subscribe = example.subscribe(val =>
  console.log(val)
);
// 输出: 1,2,3,4,5,6

2. merge

注意:此方法为合并多个observable为一个,然后订阅

import { mapTo } from 'RxJS/operators';
import { interval, merge } from 'RxJS';

// 每2.5秒发出值
const first = interval(2500);
// 每2秒发出值
const second = interval(2000);
// 每1.5秒发出值
const third = interval(1500);
// 每1秒发出值
const fourth = interval(1000);

// 从一个 observable 中发出输出值
const example = merge(
  first.pipe(mapTo('FIRST!')),
  second.pipe(mapTo('SECOND!')),
  third.pipe(mapTo('THIRD')),
  fourth.pipe(mapTo('FOURTH'))
);

const subscribe = example.subscribe(val => console.log(val));
// 输出: "FOURTH", "THIRD", "SECOND!", "FOURTH", "FIRST!", "THIRD", "FOURTH"

3. race

注意:此方法为接收第一个返回的observable的值,其他的丢弃

import { mapTo } from 'RxJS/operators';
import { interval } from 'RxJS/observable/interval';
import { race } from 'RxJS/observable/race';

// 接收第一个发出值的 observable
const example = race(
  // 每1.5秒发出值
  interval(1500),
  // 每1秒发出值
  interval(1000).pipe(mapTo('1s won!')),
  // 每2秒发出值
  interval(2000),
  // 每2.5秒发出值
  interval(2500)
);
const subscribe = example.subscribe(val => console.log(val));
// 输出: "1s won!"

4. startWith

注意:给定的第一个值

import { startWith } from 'RxJS/operators';
import { of } from 'RxJS';

// 发出 (1,2,3)
const source = of(1, 2, 3);
// 从0开始
const example = source.pipe(startWith(0));
const subscribe = example.subscribe(val => console.log(val));
// 输出: 0,1,2,3

5. zip

注意: zip操作符会订阅所有内部 observables,然后等待每个发出一个值。一旦发生这种情况,将发出具有相应索引的所有值。这会持续进行,直到至少一个内部 observable 完成。

import { delay } from 'RxJS/operators';
import { of, zip } from 'RxJS';

const sourceOne = of('Hello');
const sourceTwo = of('World!');
const sourceThree = of('Goodbye');
const sourceFour = of('World!');
// 一直等到所有 observables 都发出一个值,才将所有值作为数组发出
const example = zip(
  sourceOne,
  sourceTwo.pipe(delay(1000)),
  sourceThree.pipe(delay(2000)),
  sourceFour.pipe(delay(3000))
);
const subscribe = example.subscribe(val => console.log(val));
// 输出: ["Hello", "World!", "Goodbye", "World!"]

条件系列

1. defaultIfEmpty

注意:如果在完成时没有发出任何通知,那么发出给定的值

import { defaultIfEmpty } from 'RxJS/operators';
import { of } from 'RxJS';

// 当源 observable 为空时,发出 'Observable.of() Empty!',否则发出源的任意值
const exampleOne = of().pipe(defaultIfEmpty('Observable.of() Empty!'));

const subscribe = exampleOne.subscribe(val => console.log(val));
// 输出: 'Observable.of() Empty!'

2. every

注意:如果完成时所有的值都能通过断言,那么发出 true,否则发出 false 。

import { every } from 'RxJS/operators';
import { of } from 'RxJS';

// 发出5个值
const source = of(1, 2, 3, 4, 5);
const example = source.pipe(
  // 每个值都是偶数吗?
  every(val => val % 2 === 0)
);

const subscribe = example.subscribe(val => console.log(val));
// 输出: false

创建系列

1. create

注意:使用给定的订阅函数来创建 observable 。

import { Observable } from 'RxJS';
/*
  创建在订阅函数中发出 'Hello' 和 'World' 的 observable 。
*/
const hello = Observable.create(function(observer) {
  observer.next('Hello');
  observer.next('World');
});

const subscribe = hello.subscribe(val => console.log(val));
// 输出: 'Hello'...'World'

2. empty

注意: 立即完成的空的observable 。

import { empty } from 'RxJS';

const subscribe = empty().subscribe({
  next: () => console.log('Next'),
  complete: () => console.log('Complete!')
});
// 输出: 'Complete!'

3. from

注意:将数组、promise 或迭代器转换成 observable

1. 数组转化
import { from } from 'RxJS';

// 将数组作为值的序列发出
const arraySource = from([1, 2, 3, 4, 5]);
// 输出: 1,2,3,4,5
const subscribe = arraySource.subscribe(val => console.log(val));

2. promise转化
import { from } from 'RxJS';

// 发出 promise 的结果
const promiseSource = from(new Promise(resolve => resolve('Hello World!')));
// 输出: 'Hello World'
const subscribe = promiseSource.subscribe(val => console.log(val));

4. interval

注意:基于给定时间间隔发出数字序列。

import { interval } from 'RxJS';

// 每1秒发出数字序列中的值
const source = interval(1000);

const subscribe = source.subscribe(val => console.log(val));
// 输出数字: 0,1,2,3,4,5....

5. of

注意:按顺序发出任意数量的值。

import { of } from 'RxJS';
// 依次发出提供的任意数量的值
const source = of(1, 2, 3, 4, 5);

const subscribe = source.subscribe(val => console.log(val));
// 输出: 1,2,3,4,5

6. throw

注意: 在订阅上发出错误

import { throwError } from 'RxJS';

// 在订阅上使用指定值来发出错误
const source = throwError('This is an error!');

const subscribe = source.subscribe({
  next: val => console.log(val),
  complete: () => console.log('Complete!'),
  error: val => console.log(`Error: ${val}`)
});
// 输出: 'Error: This is an error!'

7. timer

注意:第二个参数时选传参数,传了之后,会按照第二个参数的毫秒数不停歇发送数据,如果不传,则只会发生一次

import { timer } from 'RxJS';

/*
  timer 接收第二个参数,它决定了发出序列值的频率,在本例中我们在1秒发出第一个值,
  然后每2秒发出序列值
*/
const source = timer(1000, 2000);
// 输出: 0,1,2,3,4,5......
const subscribe = source.subscribe(val => console.log(val));

错误处理系列

1. catchError

注意: 优雅地处理 observable 序列中的错误

import { throwError, of } from 'RxJS';
import { catchError } from 'RxJS/operators';
// 发出错误
const source = throwError('This is an error!');
// 优雅地处理错误,并返回带有错误信息的 observable
const example = source.pipe(catchError(val => of(`I caught: ${val}`)));

const subscribe = example.subscribe(val => console.log(val));
// 输出: 'I caught: This is an error'

2. retry/ retryWhen

注意:如果发生错误,以指定次数重试 observable 序列 / 当发生错误时,基于自定义的标准来重试 observable 序列。

1. retry
import { interval, of, throwError } from 'RxJS';
import { mergeMap, retry } from 'RxJS/operators';

// 每1秒发出值
const source = interval(1000);
const example = source.pipe(
  mergeMap(val => {
    // 抛出错误以进行演示
    if (val > 5) {
      return throwError('Error!');
    }
    return of(val);
  }),
  // 出错的话可以重试2次
  retry(2)
);

const subscribe = example.subscribe({
  next: val => console.log(val),
  error: val => console.log(`${val}: Retried 2 times then quit!`)
});
/*
  输出:
  0..1..2..3..4..5..
  0..1..2..3..4..5..   重试的第一次
  0..1..2..3..4..5..    重试的第二次
  "Error!: Retried 2 times then quit!"
*/

2. retryWhen
当发生错误时,基于自定义的标准来重试 observable 序列。
import { timer, interval } from 'RxJS';
import { map, tap, retryWhen, delayWhen } from 'RxJS/operators';

// 每1秒发出值
const source = interval(1000);
const example = source.pipe(
  map(val => {
    if (val > 5) {
      // 错误将由 retryWhen 接收
      throw val;
    }
    return val;
  }),
  retryWhen(errors =>
    errors.pipe(
      // 输出错误信息
      tap(val => console.log(`Value ${val} was too high!`)),
      // 5秒后重启
      delayWhen(val => timer(val * 1000))
    )
  )
);

const subscribe = example.subscribe(val => console.log(val));
/*
  输出:
  0
  1
  2
  3
  4
  5
  "Value 6 was too high!"
  --等待5秒后然后重复此过程
*/

多播系列

1. share

注意: 在多个订阅者间共享源 observable 。

import { timer } from 'RxJS';
import { tap, mapTo, share } from 'RxJS/operators';

// 1秒后发出值
const source = timer(1000);
// 输出副作用,然后发出结果
const example = source.pipe(
  tap(() => console.log('***SIDE EFFECT***')),
  mapTo('***RESULT***')
);

const subscribe = example.subscribe(val => console.log(val));
const subscribeTwo = example.subscribe(val => console.log(val));
/*
  ***不共享的话,副作用会执行两次***
  输出: 
  "***SIDE EFFECT***"
  "***RESULT***"
  "***SIDE EFFECT***"
  "***RESULT***"
*/

// 在多个订阅者间共享 observable
const sharedExample = example.pipe(share());

const subscribeThree = sharedExample.subscribe(val => console.log(val));
const subscribeFour = sharedExample.subscribe(val => console.log(val));
/*
   ***共享的话,副作用只执行一次***
  输出:
  "***SIDE EFFECT***"
  "***RESULT***"
  "***RESULT***"
*/

2. shareReplay

注意:共享源 observable 并重放指定次数的发出。
为什么使用 shareReplay?
通常啊,当有副作用或繁重的计算时,你不希望在多个订阅者之间重复执行时,会使用 shareReplay 。 当你知道流的后来订阅者也需要访问之前发出的值,shareReplay 在这种场景下也是有价值的。 这种在订阅过程中重放值的能力是区分 share 和 shareReplay 的关键。

import { Subject, ReplaySubject } from 'RxJS';
import { pluck, share, shareReplay, tap } from 'RxJS/operators';

// 使用 subject 模拟 url 的变化
const routeEnd = new Subject<{data: any, url: string}>();
// 提取 url 并与后来订阅者共享
const lastUrl = routeEnd.pipe(
  tap(_ => console.log('executed')),
  pluck('url'),
  // 默认为所有值,因此我们将其设置为仅保留并重播最后一个值
  shareReplay(1)
);
// 起始订阅者是必须的
const initialSubscriber = lastUrl.subscribe(console.log)
// 模拟路由变化
// 输出: 'executed', 'my-path'
routeEnd.next({data: {}, url: 'my-path'});
// 输出: 'my-path'
const lateSubscriber = lastUrl.subscribe(console.log);

过滤系列

1. debounceTime

注意:舍弃掉在两次输出之间小于指定时间的发出值

import { fromEvent, timer } from 'RxJS';
import { debounceTime, map } from 'RxJS/operators';

const input = document.getElementById('example');

// 对于每次键盘敲击,都将映射成当前输入值
const example = fromEvent(input, 'keyup').pipe(map(i => i.currentTarget.value));

// 在两次键盘敲击之间等待0.5秒方才发出当前值,
// 并丢弃这0.5秒内的所有其他值
const debouncedInput = example.pipe(debounceTime(500));

// 输出值
const subscribe = debouncedInput.subscribe(val => {
  console.log(`Debounced Input: ${val}`);
});

2. distinctUntilChanged

注意:只有当当前值与之前最后一个值不同时才将其发出。

import { from } from 'RxJS';
import { distinctUntilChanged } from 'RxJS/operators';

// 基于最新发出的值进行比较,只输出不同的值
const myArrayWithDuplicatesInARow = from([1, 1, 2, 2, 3, 1, 2, 3]);

const distinctSub = myArrayWithDuplicatesInARow
  .pipe(distinctUntilChanged())
  .subscribe(val => console.log('DISTINCT SUB:', val));
  // 输出: 1,2,3,1,2,3

const nonDistinctSub = myArrayWithDuplicatesInARow
  .subscribe(val => console.log('NON DISTINCT SUB:', val));
   // 输出 : 1,1,2,2,3,1,2,3

3. filter

注意: 发出符合给定条件的值。

import { from } from 'RxJS';
import { filter } from 'RxJS/operators';

// 发出 (1,2,3,4,5)
const source = from([1, 2, 3, 4, 5]);
// 过滤掉奇数
const example = source.pipe(filter(num => num % 2 === 0));
// 输出: "Even number: 2", "Even number: 4"
const subscribe = example.subscribe(val => console.log(`Even number: ${val}`));

4. take

注意:在完成前发出N个值(N由参数决定)。
为什么使用 take?
当只对开头的一组值感兴趣时,你想要的便是 take 操作符。也许你想看看当用户第一次进入页面时,用户首先点击的是什么,你想要订阅点击事件并只取首个值。举例来说,你想要观看赛跑,但其实你只对首先冲过终点的人感兴趣。此操作符很清晰明了,你想要取开头n个值。

import { of } from 'RxJS';
import { take } from 'RxJS/operators';

// 发出 1,2,3,4,5
const source = of(1, 2, 3, 4, 5);
// 取第一个发出的值然后完成
const example = source.pipe(take(1));
// 输出: 1
const subscribe = example.subscribe(val => console.log(val));

5. takeUntil

注意:发出值,直到提供的 observable 发出值,它便完成。

import { interval, timer } from 'RxJS';
import { takeUntil } from 'RxJS/operators';

// 每1秒发出值
const source = interval(1000);
// 5秒后发出值
const timer$ = timer(5000);
// 当5秒后 timer 发出值时, source 则完成
const example = source.pipe(takeUntil(timer$));
// 输出: 0,1,2,3
const subscribe = example.subscribe(val => console.log(val));

转换系列

1. bufferTime

注意:收集发出的值,直到经过了提供的时间才将其作为数组发出。

import { interval } from 'RxJS';
import { bufferTime } from 'RxJS/operators';

// 创建每500毫秒发出值的 observable
const source = interval(500);
// 2秒后,将缓冲值作为数组发出
const example = source.pipe(bufferTime(2000));
// 打印值到控制台
// 输出: [0,1,2]...[3,4,5,6]
const subscribe = example.subscribe(val =>
  console.log('Buffered with Time:', val)
);

2. concatMap

注意:将值映射成内部 observable,并按顺序订阅和发出
注意 concatMap 和 mergeMap 之间的区别。 因为 concatMap 之前前一个内部 observable 完成后才会订阅下一个, source 中延迟 2000ms 值会先发出。 对比的话, mergeMap 会立即订阅所有内部 observables, 延迟少的 observable (1000ms) 会先发出值,然后才是 2000ms 的 observable 。

import { of } from 'RxJS';
import { concatMap, delay, mergeMap } from 'RxJS/operators';

// 发出延迟值
const source = of(2000, 1000);
// 将内部 observable 映射成 source,当前一个完成时发出结果并订阅下一个
const example = source.pipe(
  concatMap(val => of(`Delayed by: ${val}ms`).pipe(delay(val)))
);
// 输出: With concatMap: Delayed by: 2000ms, With concatMap: Delayed by: 1000ms
const subscribe = example.subscribe(val =>
  console.log(`With concatMap: ${val}`)
);

// 展示 concatMap 和 mergeMap 之间的区别
const mergeMapExample = source
  .pipe(
    // 只是为了确保 meregeMap 的日志晚于 concatMap 示例
    delay(5000),
    mergeMap(val => of(`Delayed by: ${val}ms`).pipe(delay(val)))
  )
  .subscribe(val => console.log(`With mergeMap: ${val}`));

3. map

注意:对源 observable 的每个值应用投射函数

import { from } from 'RxJS';
import { map } from 'RxJS/operators';

// 发出 (1,2,3,4,5)
const source = from([1, 2, 3, 4, 5]);
// 每个数字加10
const example = source.pipe(map(val => val + 10));
// 输出: 11,12,13,14,15
const subscribe = example.subscribe(val => console.log(val));

4. mergeMap / flatMap ( flatMap 是 mergeMap 的别名)

注意:映射成 observable 并发出值。 使用场景,比如调用多个接口以后,将几个接口值进行合并等操作

import { of } from 'RxJS';
import { mergeMap } from 'RxJS/operators';

// 发出 'Hello'
const source = of('Hello');
// 映射成 observable 并将其打平
const example = source.pipe(mergeMap(val => of(`${val} World!`)));
// 输出: 'Hello World!'
const subscribe = example.subscribe(val => console.log(val));

5. scan

注意:随着时间的推移进行归并。用于几个接口值的累加

import { of } from 'RxJS';
import { scan } from 'RxJS/operators';

const source = of(1, 2, 3);
// 基础的 scan 示例,从0开始,随着时间的推移计算总数
const example = source.pipe(scan((acc, curr) => acc + curr, 0));
// 输出累加值
// 输出: 1,3,6
const subscribe = example.subscribe(val => console.log(val));

6. switchMap

注意:映射成 observable,完成前一个内部 observable,发出值。用于短时间多次重复调用一个接口,但是只读取最后一次的场景,比如实时搜索框搜索。

import { interval, fromEvent } from 'RxJS';
import { switchMap, mapTo } from 'RxJS/operators';

// 发出每次点击
const source = fromEvent(document, 'click');
// 如果3秒内发生了另一次点击,则消息不会被发出
const example = source.pipe(
  switchMap(val => interval(3000).pipe(mapTo('Hello, I made it!')))
);
// (点击)...3s...'Hello I made it!'...(点击)...2s(点击)...
const subscribe = example.subscribe(val => console.log(val));

工具系列

1. do / tap

注意:透明地执行操作或副作用,比如打印日志。

import { of } from 'RxJS';
import { tap, map } from 'RxJS/operators';

const source = of(1, 2, 3, 4, 5);
// 使用 tap 透明地打印 source 中的值
const example = source.pipe(
  tap(val => console.log(`BEFORE MAP: ${val}`)),
  map(val => val + 10),
  tap(val => console.log(`AFTER MAP: ${val}`))
);

// 'tap' 并不转换值
const subscribe = example.subscribe(val => console.log(val));
// 输出: 11...12...13...14...15

2. delay

注意:根据给定时间延迟发出值。

import { of, merge } from 'RxJS';
import { mapTo, delay } from 'RxJS/operators';

// 发出一项
const example = of(null);
// 每延迟一次输出便增加1秒延迟时间
const message = merge(
  example.pipe(mapTo('Hello')),
  example.pipe(
    mapTo('World!'),
    delay(1000)
  ),
  example.pipe(
    mapTo('Goodbye'),
    delay(2000)
  ),
  example.pipe(
    mapTo('World!'),
    delay(3000)
  )
);
// 输出: 'Hello'...'World!'...'Goodbye'...'World!'
const subscribe = message.subscribe(val => console.log(val));

3. toPromise

注意:将 observable 转换成 promise 。

// 返回基础的 observable
const sample = val => Rx.Observable.of(val).delay(5000);
// 将基础的 observable 转换成 promise
const example = sample('First Example')
  .toPromise()
  // 输出: 'First Example'
  .then(result => {
    console.log('From Promise:', result);
  });

如果您想全面学习RxJS,请前往RxJS中文网

本文作者原创,仅供学习交流使用,转载需注明出处。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,817评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,329评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,354评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,498评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,600评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,829评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,979评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,722评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,189评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,519评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,654评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,329评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,940评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,762评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,993评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,382评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,543评论 2 349

推荐阅读更多精彩内容

  • 最近在学习Rxjs,所以借此机会对rxjs知识点进行了一定的整理,用以加深自己对这部分知识的理解和记忆。 简介 R...
    大喵爱读书阅读 60,199评论 5 70
  • 一、RxJava操作符概述 RxJava中的操作符就是为了提供函数式的特性,函数式最大的好处就是使得数据处理简洁易...
    测天测地测空气阅读 626评论 0 1
  • 介绍 RxJS是一个异步编程的库,同时它通过observable序列来实现基于事件的编程。它提供了一个核心的类型:...
    泓荥阅读 16,590评论 0 12
  • 注:只包含标准包中的操作符,用于个人学习及备忘参考博客:http://blog.csdn.net/maplejaw...
    小白要超神阅读 2,186评论 2 8
  • 一、RxJava操作符概述 RxJava中的操作符就是为了提供函数式的特性,函数式最大的好处就是使得数据处理简洁易...
    无求_95dd阅读 2,993评论 0 21