RxJS Schedulers 调度器

Rxjs调度器

本文采用 RxjsV5.5 版本,这个版本和先前版本最大不同之处是import方式,以及引入了 pipe 操作符来替代链式操作,在此不赘述。

今天主要是来将调度器scheduler,这个通常用的比较少,但是了解它还是有作用的。
顾名思义,调度器可以理解为调度Observable,Observer,根据 官方scheduler文档 可以知道它有以下几个作用:

  • 调度器是一种数据结构。它可以依据优先级和其它一些配置知道如何来存储队列任务(queue tasks)
  • 调度器可以充当执行环境。这表示任务什么时候什么地方执行,是立即执行,还是在回调函数中执行(使用setTimeout或者setInterval, 又或者 animation frame)
  • 调度器拥有一个虚拟时钟。它通过getter方法 now() 提供了time的概念。任务会根据调度安排,在特定的时间执行。

Marco && microtask 宏任务和微任务的概念

setTimeout | setInterval 都归属于宏任务,而 promises 属于微任务。

宏任务:

  • setTimeout
  • setInterval
  • setImmediate
  • I/O
  • UI rendering

微任务:

  • process.nextTick
  • Promise
  • MutationObserver
const log = console.log;
const macro = v => setTimeout(() => log(v));
const micro = v => Promise.resolve().then(() => log(v));

log(1);
macro(2);
micro(3);
log(4);

// 执行结果
1 4 3 2

可以看出宏任务和微任务都属于异步操作,但是Promise会优于setTimeout先执行。这是因为,在每一次事件循环中,宏任务只会提取一个执行,而微任务会一直提取,直到微任务队列为空为止。换句话说就是 事件队列完成之后,会执行微任务,最后才执行宏任务

更多关于微任务和宏任务的区别:

setTimeout(() => log(1));
setTimeout(() => log(2), 0);
log(3);

// 因为第一个setTimeout没有设置delay,它将先于第二个setTimeout进入事件队列
// 运行结果
3
1
2

可能会好奇为什么讲这?这是因为下面调度就会用到微任务和宏任务的概念

Rxjs操作符与对应的调度器

  • 默认同步的操作符,比如:of | from | range,它们默认的调度器为 queue
  • 默认异步的操作符,比如:timer | interval,它们默认的调度器为 async, 内部使用 setInterval

改变默认调度器

像一些操作符最后一个参数可以为一个Scheduler,我们可以通过传参的形式来改变默认的调度器类型,比如下列操作符

  • bindCallback
  • bindNodeCallback
  • combineLatest
  • concat
  • empty
  • from
  • fromPromise
  • interval
  • merge
  • of
  • range
  • throw
  • timer

示例:改变 of 默认的调度方式

// RxJSV5.5+版本的引入方式
import { async } from 'rxjs/scheduler/async';
import { of } from 'rxjs/observable/of';

const log = console.log;
of(1, async).subscribe(val => log(val));
log(2);

// 输出结果
2
1

// 如果使用of默认的调度 queue
of(1).subscribe(val => log(val));
log(2);
// 输出结果
1
2

subscribeOn && observeOn

先说 subscribeOn,它的作用

  • 改变源(source observables)的执行时机
  • 只能用一次

示例

// 同步版本
const log = console.log;
let a$ = Rx.Observable.create(observer => {
  setTimeout(() => observer.next(1));        // A
  setTimeout(() => observer.next(2));        // B
  setTimeout(() => observer.next(3));        // C
  setTimeout(() => observer.complete());     // D
})

let subscription = a$.subscribe({
  next: v => log(v),                         // E
  complete: () => log('完成')                // F
})

# 它的执行顺序为
A - E
B - E
C - E
D - F

// 异步实现版本
// 使用 'subscribeOn'
import { async } from 'rxjs/scheduler/async';
const log = console.log;
let a$ = Rx.Observable.create(observer => {
  setTimeout(() => observer.next(1));        // A
  setTimeout(() => observer.next(2));        // B
  setTimeout(() => observer.next(3));        // C
  setTimeout(() => observer.complete());     // D
})

let subscription = a$.subscribeOn(async)        // 使用异步调度
  .subscribe({
    next: v => log(v),                       // E
    complete: () => log('完成')              // F
  })

# 现在它的执行顺序为
A - B - C - D
E - E - E - F

另外示例:

import { async } from 'rxjs/scheduler/async';
import { of } from 'rxjs/observable/of';

const log = console.log;
of(1, async).subscribe(val => log(val));
log(2);

// 可以使用 subscribeOn 等同写为
of(1)
  .subscribeOn(async)
  .subscribe(val => log(val));

log(2);

observeOn 它的作用:

  • 改变通知的 Notifications 执行时机,即Observabls中的Next, Error, Complete函数
  • 能够用于每个操作符的前面,即可以多次使用
var observable = Rx.Observable.create(function (observer) {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    observer.complete();
});

console.log('before subscribe');
observable.observeOn(Rx.Scheduler.async) // 设置为 async
.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
console.log('after subscribe');

// 执行结果
"before subscribe"
"after subscribe"
1
2
3
"complete"

调度 Schedulers

总共有4中调度:

类型 执行类型 内部调用
queue Sync同步的方式 scheduler.schedule(task, delay) scheduler.flush()
asap Async(异步微任务) Promise.resolve().then(() => task)
async Async(异步宏任务) id = setInterval(task, delay) clearInterval(id)
animationFrame Async id = requestAnimationFrame(task) cancelAnimationFrame(id)

queue

特点:

  • 同步执行
  • 任务按顺序执行
  • 当前任务结束后才执行下一个任务
  • 性能优于事件队列

示例

import { queue } from 'rxjs/scheduler/queue';

const log = console.log;
queue.schedule(() => log(1));
log(2);
queue.schedule(() => log(3));

// 执行结果
1
2
3


// 注意这种情况 使用回调
queue.schedule(() => {
  queue.schedule(() => log(1));
  log(2);
  queue.schedule(() => log(3));
});

// 执行结果
2
1
3

asap (as soon as possible)

特点:

  • 异步执行(微任务)
  • 任务在next tick之前执行,即比宏任务先执行
  • 内部实现使用 promise
  • 性能优于事件队列

示例

import { asap } from 'rxjs/scheduler/asap';
import { queue } from 'rxjs/scheduler/queue';

const log = console.log;
setTimeout(() => log(1));          // 异步 宏任务
asap.schedule(() => log(2));       // 异步 微任务
queue.schedule(() => log(3));      // 同步

// 执行结果
3
2
1

async

特点:

  • 异步执行(宏任务)
  • 内部实现使用 setInterval
  • 使用事件队列,性能比上面的方式要差

示例

import { async } from 'rxjs/scheduler/async';
import { asap } from 'rxjs/scheduler/asap';
import { queue } from 'rxjs/scheduler/queue';

const log = console.log;

async.schedule(() => log(1));        // 异步 宏任务
asap.schedule(() => log(2));       // 异步 微任务
queue.schedule(() => log(3));      // 同步

// 执行结果
3
2
1

取消任务 cancelling tasks

使用 AsyncSchedulerAsyncAction用来创建一个异步调用,async.schedule() 方法会返回一个 subcription

import { AsyncScheduler } from 'rxjs/scheduler/AsyncScheduler';
import { AsyncAction } from 'rxjs/scheduler/AsyncAction';

const log = console.log;

const s = new AsyncScheduler(AsyncAction); // 创建一个async示例
const DELAY = 0;
let subscription;
subscription = s.schedule((v) => log(v), DELAY, 1);  // 异步调度A
s.schedule((v) => log(v), DELAY, 2); // 异步调度B
log(3);
subscription.unsubscribe();  // 取消异步调度A

// 结果 并没有A的值,因为它被取消了
3
2

内部时钟 now() 方法

上面谈调度器特性时,谈到了虚拟时钟

import { AsyncScheduler } from 'rxjs/scheduler/AsyncScheduler';
import { AsyncAction } from 'rxjs/scheduler/AsyncAction';

const log = console.log;

const s = new AsyncScheduler(AsyncAction); // 创建一个async示例
const DELAY = 2000;
const start = Date.now();

s.schedule((v) => log(v), DELAY, 1);  // 异步调度A
s.schedule((v) => log(v), DELAY, 2); // 异步调度B

// s.now() 使用调度内部时钟
s.schedule(() => log(`${s.now() - start}ms`), DELAY)
log(3);

// 结果
3
2
1
2002ms        // 这个时间会根据机器的执行速度而定

animationFrame 动画调度

特点:

  • 异步执行
  • 内部实现使用 requestAnimationFrame
  • 适用于 DEVICE FRAME RATE
  • 没有激活时很慢
  • 平衡 CPU/GPU 负载

60FPS = 1000 / 60ms

使用 setInterval 的问题

  • 忽略 DEVICE FRAME RATE
  • 会一直运行,很耗电
  • 不会考虑 CPU/GPU 负载
let token;
const paintFrame = () => {
  // 动画
  token = setInterval(paintFrame, 1000/60);
}
paintFrame();

setTimeout(() => clearInterval(token), 2000); // 2s后清除动画

使用 requestFrameAnimation 代替

let token;
const paintFrame = (timestamp) => {
  // 动画
  token = requestFrameAnimation(paintFrame);
}
paintFrame();

setTimeout(() => cancelFrameAnimation(token), 2000); // 2s后清除动画

使用动画调度示例

import { animationFrame } from 'rxjs/scheduler/animationFrame';


const DELAY = 0;
let state = { angle: 0 };
const div = document.querySelector('.circle');

let subscription;

// 放上去之后就停止动画
div.addEventListener('mouseover', () => {
  if (!subscription) return;
  subscription.unsubscribe();
});

// 离开之后开始动画
div.addEventListener('mouseout', () => {
  subscription = animationFrame.schedule(work, DELAY);
});

// 动画函数
const work = () => {
  let { angle } = state;
  state = { angle: ++angle%360 };
  div.style.transform = `rotate(${angle}deg)`;
  subscription = animationFrame.schedule(work, DELAY);
}

subscription = animationFrame.schedule(work, DELAY);

VirtualTime Scheduler 虚拟时间调度

上面虽然列举了调度的4种类型,下面的虚拟时间其实也是一种调度

特点:

  • 同步执行
  • 通过delay延迟将所有的动作进行排队(Queues all actions sorting by delay)
  • 需要手动执行 使用flush()

示例

import { VirtualTimeScheduler, VirtualAction } from 'rxjs/scheduler/VirtualTimeScheduler';

const log = console.log;


const s = new VirtualTimeScheduler(VirtualAction); // 创建一个scheduler
const start = Date.now();

// tasks are sorted by delay
s.schedule(v => log(v), 2000, 2);  // 2000ms的delay
s.schedule(v => log(v), 50, 2);    // 50ms的delay

s.flush();                         // 手动执行(同步的)
log(3);
log(`VirtualTimeScheduler: ${s.now()}ms`);  // 虚拟时钟
log(`Execution: ${Date.now() - start}ms`);  // 程序执行时间

// 结果
1
2
3
VirtualTimeScheduler: 2000ms
Execution: 6ms

示例2:

import { VirtualTimeScheduler, VirtualAction } from 'rxjs/scheduler/VirtualTimeScheduler';
import { interval } from 'rxjs/observable/interval';

const log = console.log;


const s = new VirtualTimeScheduler(VirtualAction); // 创建一个scheduler
const start = Date.now();

// 3600 * 1000表示 1小时
// take(24) 表示 24小时 即每个小时运行一次
interval(3600 * 1000, s).pipe(take(24))
  .subscribe(v => log(v))

s.flush();                         // 手动执行(同步的)
log(3);
log(`VirtualTimeScheduler: ${s.now()}ms`);  // 虚拟时钟
log(`Execution: ${Date.now() - start}ms`);  // 程序执行时间

// 结果
0
1
2
// ...
23
VirtualTimeScheduler: 86400000ms (1天)
Execution: 25ms

总结

调度器使用到的场景不多,但是了解它,对任务的调度可以跟细粒的进行控制。动画调度用的比较多一点,虚拟时钟调度对定时任务比较有用。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,816评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,729评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,300评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,780评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,890评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,084评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,151评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,912评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,355评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,666评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,809评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,504评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,150评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,121评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,628评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,724评论 2 351

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,638评论 18 139
  • GCD调度队列是执行任务的强大工具。调度队列允许您相对于调度者异步或者同步的执行任意代码块。您能够使用调度队列来执...
    坤坤同学阅读 6,668评论 1 3
  • 当和底层系统交互时,必须花费大量时间为任务做好准备。调用内核或者其他系统层需要切换上下文,这也是比在进程内部调用昂...
    坤坤同学阅读 1,766评论 0 16
  • ———-正文开始———- 最近发现有不少介绍JS单线程运行机制的文章,但是发现很多都仅仅是介绍某一部分的知识,而且...
    流动码文阅读 2,192评论 7 32
  • 夜晚醒来,有的时候其实就是为了缓解焦虑,看一下时间。又或者起夜上个厕所。 在用了手电,自发电式手电、手电等所有能提...
    someday阅读 452评论 0 0