响应式思维 | Ben Lesh

响应式思维 (Thinking Reactively) | Ben Lesh

Ben Lesh 是 RxJS 库的领导者和布道者,提倡使用响应式思维来抽象逻辑和编写程序,现就职于 Google 。而本文则是对他的一篇研报的记录,该研报是在 AngularConnect 会议中汇报的。研报首先从一个实例开始谈起:

实例: Drag & Drop

内容

每次 在目标上按下鼠标(mousedown) ,开始监听 页面上鼠标移动(mousemove) 直到 鼠标弹起(mouseup)

相关概念(基础函数)定义:

const target = document.querySelect('#target')

const targetMouseDown$ = Observable.fromEvent(target, 'mousedown')

const docMouseMove$ = Observable.fromEvent(target, 'mouseover')

const docMouseUp$ = Observable.fromEvent(target, 'mouseup')
  1. 变量后 $ 表示该变量是 Observable 。
  2. Observable 本质是一个函数,后面 Ben 会解释。

分析

  1. 页面上鼠标移动(mousemove)直到鼠标弹起(mouseup) =>
docMouseMove$.takeUntil(docMouseUp$)

操作符 takeUntil 使得 docMouseMove$ 持续推送数据,直到 docMouseUp$ 推送一个通知(数据)后停止。附上 takeUntil 弹珠图:

takeUntil
  1. 每次在目标上按下鼠标(mousedown),开始监听 =>
const dragDrop$ = targetMouseDown$.switchMap(() =>
  docMouseMove$.takeUntil(docMouseUp$)
)

操作符 switchMaptargetMouseDown$ 推送的值传入进内部函数(该例不传入推送值,推送仅仅作为通知使用),然后 切换docMouseMove$ 并压平输出(例中 targtMouseDown$ 仅仅推送一次,内部函数仅执行一次,因此无须压平)。附上 switchMap 弹珠图:

switchMap

小结

解决问题的思路应该是从后向前推导,确认好每个事件流,根据问题组织事件流。

进一步了解响应式思维

流变量和非流变量 (自己臆想的,慎看)

在系统中流变量都是 Observable。

变量顾名思义是变化的,根据广义、狭义相对论可知,变化是针对参照物来说的(提高点 X 格),而非流变量的参照物是整个程序和时间轴。但是如果将参照物改为某一行代码,非流变量随时间是不变的,只是一个占位符。

var c = a + b // 站在这里,发现 c 一直不变。
doSometing(c) // 对非流变量的操作,仅调用一次

流变量则是一种指向流( stream )的标识符。

var c$ = a$.combineLatest(b$, (a, b) => a + b) // 站在这里感受下涓涓细流
c$.subscribe(doSomething) // 对流变量的操作,回调多次

操作符 combineLatest 对任一个 Observable 推送的值,都与其他 Observable 最后值融合。具体融合方法以函数形式给出。惯例附上 combineLatest 弹珠图:

combineLatest

没有了操作符,Observable 就是 。。。

// 名字就是标识符,可以改成 asy.ok(...)
promise.then(successFn, errorFn)

// 名字就是标识符,可以改成 asy.ok(...)
observable.subscribe(nextFn, errorFn, completeFn)

大概 60+ 的操作符,请查看官网

重点:解密 Observable

Observable 内部是什么?

  • 搅乱脑汁的复杂异步
  • 应用于航空科技的算法
  • 黑魔法
  • 独角兽

以上是 Ben 总结的。

Observable 仅仅是一个函数

  1. Observable 有一个名为 observer 的参数:
const myObservable = observer => {}
  1. observer 对象会有几个方法:
const myObservable = observer => {
  let i = 0
  const id = setInterval(() => {
    observer.next(i++) // next 方法
    if (i === 10) observer.complete() // complete 方法
  }, 200)
}
  1. Observable 会返回销毁逻辑:
const myObservable = observer => {
  let i = 0
  const id = setInterval(() => {
    observer.next(i++) // next 方法
    if (i === 10) observer.complete() // complete 方法
  }, 200)
  return () => clearInterval(id) // 用于终止订阅
}
  1. 调用 Observable 函数的同时订阅了你的 observer
const myObservable = (observer) => {
  let i = 0
  const id = setInterval(() => {
    observer.next(i++) // next 方法
    if (i === 10) observer.complete() // complete 方法
  }, 200)
  return () => clearInterval(id) // 用于终止订阅
}

// 订阅你的 observer
const teardown = myObservable({
  next(x) {console.log(x)},
  error(err) {console.error(err)},
  complete() {console.info('done)}
})

// 1 秒后取消订阅
setTimeout(() => {
  teardown()
}, 1000)

操作符也是一个函数

操作符函数吃进一个 Observable 吐出一个 Observable :

const operator = InputObservable => OutputObservable
  1. 将 OutputObservable 变量展开成函数形式:
const operator = (InputObservable) => {
  return (OutObserver) => {...}
}
  1. 操作符是一个函数,她的参数是 Observable 并且输出也是 Observable ,也就是通过 InputObservable 构建 OutputObservable :
const operator = InputObservable => {
  return OutObserver => {
    return InputObservable(InObserver)
  }
}
  1. Observable 就是一个拥有 observer 参数的函数,而 observer 对象的形式是约定好的:
const observer = {
  next: (data) => {...},
  error: (err) => {...},
  complete: () => {...}
}

也可以短方法声明:

const observer = {
  next(data) {...},
  error(err) {...},
  complete() {...}
}
  1. 构建 InObserver 和 OutObserver 之间的映射关系(操作符是 Observable 和 Observer 之间的操作,而此时还没有给出映射函数,所以 InObserver 和 OutObserver 其实现在还没有变化 ):
const operator = InputObservable => {
  return OutObserver => {
    return InputObservable({
      next(data) {
        OutObserver.next(data)
      },
      error(err) {
        OutObserver.error(err)
      },
      complete() {
        OutObserver.complete()
      }
    })
  }
}

不难看出 next(data) { OutObserver.next(data)} 等同于 next = OutObserver.next, errorcomplete 类似,意味着此时 InObserver 等于 OutObserver

  1. 最后添加操作推送数据的映射函数:
const operator = (InputObservable, mapFn) => {
  return OutObserver => {
    return InputObservable({
      next(data) {
        OutObserver.next(mapFn(data))
      },
      error(err) {
        OutObserver.error(err)
      },
      complete() {
        OutObserver.complete()
      }
    })
  }
}

单独分析 next 方法来观察 InObserverOutObserver 的关系:

function InObserver.next(data) {
  let newData = mapFn(data)
  OutObserver.next(newData)
}
  1. 验证。现在把之前创建的 myObservableoperator 应用到程序中:
const source = operator(myObservable, x => x + '!')

const teardown = source({
  next(data) {
    console.log(data)
  },
  error(err) {
    consol.log(err)
  },
  complete() {
    console.log('done')
  }
})

// 4 秒后取消订阅
setTimeout(() => {
  teardown()
}, 4000)

输出的结果:

0!
1!
2!
...

这里的结果有问题,因为收到 complete 推送后,并没有取消订阅,因此上面代码设置了显式的取消订阅过程。Ben 在另外的研报中详细介绍了使用 safeObserver 解决上述问题。

  1. 酷,来几个串行操作。
const source = operator(operator(myObservable, x => x + '!'), x => x + '?')

const teardown = source({
  next(data) {
    console.log(data)
  },
  error(err) {
    consol.log(err)
  },
  complete() {
    console.log('done')
  }
})

// 4 秒后取消订阅
setTimeout(() => {
  teardown()
}, 4000)

太繁琐了,想想就头痛:

const source = operator(operator(opserator(observable, mapFn), mapFn), mapFn)
  1. 把 Observable 函数用类来包裹(注意仅仅是把 Observable 函数打包进类里,并不是把 Observable 函数转化成类),操作符作为类的方法,这样便可以使用链式写法调用操作符了:
class Observable {
  constructor(observableFn) {
    this.subscribe = observableFn // 好记忆的标识 subscribe
  }
}

const myObservable = new Observable((observer) => {...})

const teardown = myObservable.subscribe({
  next(data) { console.log(data) },
  error(err) { consol.log(err) },
  complete() { console.log('done') }
})
  1. 添加 map 操作符到类中:
class Observable {
  constructor(observableFn) {
    this.subscribe = observableFn // 好记忆的标识 subscribe
  }

  map(mapFn) {
    return new Observable(observer => {
      return this.subscribe({
        next(data) {
          observer.next(mapFn(data))
        },
        error(err) {
          observer.error(err)
        },
        complete() {
          observer.complete()
        }
      })
    })
  }
}

现在使用链式写操作符试试:

myObservable
  .map(x => x + '!')
  .map(x => x + '?')
  .map(x => x + '.')
  .subscribe({
    next(data) {
      console.log(data)
    }
  })

小结

  • Observable 就是函数。
  • 因为函数仅在调用时执行,所以 Observable 是惰性的。
  • 操作符也是函数,输入是 Observable,输出也是 Observable 。
  • 链式操作就是连接每个操作的 observer 。

图示

正常推送数据

下面将 Observable 的运行过程可视化,先给出 Observable 的订阅实例:

Observable.interval(1000) // like setInterval
  .filter(x => x % 2 === 0)
  .map(x => x + x)
  .subscribe(next, error, complete)
  1. 开始是数据的产生者
producer
  1. 流程的最后是你的回调处理,也就是数据的消费者
consumer
  1. 最初推送的是 0 ,每一步的图示:
推送数值 0
filter
map
consumer
  1. 然后是推送 1 :
推送数值 1
filter

然后就没有然后了, 1 被 filter 过滤掉了。

重点:异常处理

异常处理过程可能会给我们带来一些疑惑,主要是因为以下几个事件影响:

  • error() 被调用
  • complete() 被调用
  • 取消订阅

这些事件发生后, Observable 将不再推送数据。继续给出实例:

Observable.interval(1000)
  .map(x => {
    if (x === 1) {
      throw new Error('haha')
    }
    return x
  })
  .subscribe(next, error, complete)
  1. 处理推送 0 的过程略过。

  2. 生产者推送 1 是,抛出了异常:

map计算
得到异常
取消订阅
处理异常

当抛出异常后,Observable 不会继续推送数据(取消订阅),而消费者将会使用 error() 处理异常。

这个还有个问题,Ben 在他的专栏里提到过,多播场景的错误捕获

  1. 当异常抛出后,Observable 就挂掉了,如果还想继续推送如何实现?答案是:创建 observer 分支。
Observable.interval(10000)
  .switchMap(() => this.http.get(url).catch(err => Observable.empty()))
  .subscribe(data => render(data))

先来学习操作符 catch ,它会捕获和处理 Observable 推送的异常,并返回一个新的 Observable 或者继续抛出异常。附上弹珠图:

catch

图示具体流程:

生产者
switchMap

网络不好,查询过程超时。

Ajax超时

取消 Ajax Observable 的订阅

catch

转化异常到新的 Observable

取消分支订阅

取消 Observable.empty 的订阅

小结:

  • 创建另外一个 observer 链
  • 使用 catch 增强这个链的鲁棒性
  • 守护了原始的 observer 链

响应式思维的适用场景

Ben 在研报的最后分析了响应式思维的使用场景,这里简单的将 PPT 页翻译,具体的实际应用还是需要在编程中发觉和选择的。

  • 将多个事件融合在一起
  • 添加延时
  • 客户端限制流量
  • 协调异步任务
  • 需要注销机制

总结

最后将该文的具体内容概述为以下 6 个方面。

  • 逆向思维
  • 任何的变量都可以被观察
  • Observable 是函数
  • Observer 链处理计算
  • 调用 error() 会终止 observer 链
  • 尽情使用 Rx 。

本文是在我学习 RxJS 过程中为了加强记忆和便于理解而记录的,里面添加了大量的个人学习倾向,局限于个人知识面有限,难免有不当和错误之处,欢迎大家批评指导。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,402评论 6 499
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,377评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,483评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,165评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,176评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,146评论 1 297
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,032评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,896评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,311评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,536评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,696评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,413评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,008评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,659评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,815评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,698评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,592评论 2 353

推荐阅读更多精彩内容