Elixir GenStage 简介

1. 简介

在2016年7月14号,Elixir发布了GenStage。官方对GenStage的描述是:

GenStage is a new Elixir behaviour for exchanging events with back-pressure between Elixir processes.

GenStage是生产者和消费者之间交互事件(event)的规范。简言之,Elixir希望用GenStage代替GenEvent并提供可组合的抽象层,来从第三方系统获取和处理数据。

查看官方文档,我们可以对Stage有个初步的理解:

Stages are computation steps that send and/or receive data from other stages.

Stage就是运算步骤,每个Stage能发送数据或从其它Stage获取数据。

本文主要基于Announcing GenStage和Elixir Conf 2016的Keynote内容对于GenStage进行阐述。

2. 背景

当初José Valim创建Elixir的一大初衷是引入更好的抽象来处理集合。所以Elixir才有List,Enum,Stream,Pipe |>这么多好东西。当然,不仅这样,Elixir也提供给开发者一条处理集合的路径,从激进到懒惰,再到并发,再到分布式(from eager to lazy, to concurrent and then distributed)。

Elixir Collection

下面我们从一个简单的单词计数程序开始,探讨集合处理过程的演变。

Word Counting

Eager / Enum

File.read!("path/to/some/file")
|> String.split("\n")
|> Enum.flat_map(fn line ->
    String.split(line, " ")
   end)
|> Enum.reduce(%{}, fn word, acc ->
    Map.update(acc, word, 1, & &1 + 1)
   end)
|> Enum.to_list()

这个方案对于小文件很适用,但是对于大文件,它需要把文本全部读入内存,并且Enum.flat_map/2会创建一个巨大的List,这个List包含了文件中的所有单词,然后才能计数。这样我们浪费了大量内存,并且浪费了很多构建List的时间,不用想,这段程序的效率也很低。

Lazy / Stream

幸运的是,Elixir打一开始就提供了解决这个问题的方案,也就是大家耳熟能详的streams。相比于Enum的eager,Stream则是lazy。Stream会遍历List的每个元素,在这个例子中,就是每一行,而不是之前那样把整个文件存到内存中。

不大清楚eager和lazy的同学可以回想一下布尔表达式,exp1 && exp2,如果exp1求值是false,整个表达式肯定是false,我们不对exp2进行计算,偷一下懒,这就是lazy。假如我们还对exp2进行求值,就是想知道它的值,这就是eager了。

我们来看一下采用Stream替代Enum后的版本:

File.stream!("path/to/some/file")
|> Stream.flat_map(fn line ->
    String.split(line, " ")
   end)
|> Enum.reduce(%{}, fn word, acc ->
    Map.update(acc, word, 1, & &1 + 1)
   end)
|> Enum.to_list()

通过使用File.stream!Stream.flat_map

Stream折叠了(folds)了计算过程,元素一个一个进入流,而不是加载一个巨大的文件,通过Stream可以处理大型文件,或者是「无限的」数据流,就比如twitter上每天产生的新信息。

TODO

Concurrent / Flow

当然,这个版本还是有些小缺陷,它还是没有用到并发。现代计算机一般都有多个核心,能否合理利用多核是我们高效完成任务的关键。

在ElixirConf 2015 keynote中,José Valim给出了一个最直接的多核解决方案。这个方案将你pipeline的一部分给到了另外的processes。

File.stream!("path/to/some/file")
|> Stream.flat_map(fn line ->
    String.split(line, " ")
   end)
|> Stream.async()  # NEW!
|> Enum.reduce(%{}, fn word, acc ->
    Map.update(acc, word, 1, & &1 + 1)
   end)
|> Enum.to_list()

Stream.async将在另外的process中运行之前的计算然后流式的把结果给到Enum.reduce这个process。不幸的是,这个方法仍不完善。

首先,不同process之间我们想尽量避免数据迁移。相反,我们想开启多个processes来并发地做同一种计算任务。其次,如果我们需要开发者手动放置Stream.async的话,会出现很多低效或错误用法。

尽管这个解决方案存在缺陷,但是它帮助我们提出了正确的问题:

  1. 如果Stream.async建立新的process,如何保证这些process被监控?
  2. 由于我们在进程间交换数据,如何防止一个进程获取太多数据?我们需要一个back-pressure机制来让接收进程规定来自发送进程的信息承载量。

在2016的keynote中给出了Flow的解决方案。

Flow

在一个标准双核电脑上,对于2GB的文本文件进行字数统计,Enum的方式花时远超10分钟,等不下去了,Stream的方式花时60秒,Flow的方式花时36秒。

Flow

  1. We give up ordering and process locality for concurrency
  2. Tools for working with bounded and unbounded data
  3. It is not magic! There is an overhead when data flows through processes
  4. Requires volume and/or cpu/io bound work to see benefits

第三条是说,假设我们要对一个文件的所有数字求和,我们使用Flow并不会比Stream更快,因为我们要在不同process之前传输大量的数字。???

Flow总共有1200行代码,1300行的文档。

3. GenStage

Paste_Image.png
Paste_Image.png
Paste_Image.png
Paste_Image.png

我们来写一个简单的pipeline,它将产生events,增加数字,将数字乘二,打印到终端。

三个stages,分别是:producer, :producer_consumer:consumer。把它们简称为A, B, C

我们首先从producer A开始。A作为producer,它的主要职责是接收需求,consumer需要处理的事件的数量,并且产生事件。这些事件存在于内存中或者来自外部数据源。现在实现一个简单的计数器,通过init/1给计数器一个初始值。

注意:所有GenStage项目都有Experimental命名空间作为前缀。所以下方的代码中你都将看到Experimental.GenStage

alias Experimental.GenStage

defmodule A do
  use GenStage

  def init(counter) do
    {:producer, counter}
  end

  def handle_demand(demand, counter) when demand > 0 do
    # If the counter is 3 and we ask for 2 items, we will
    # emit the items 3 and 4, and set the state to 5.
    events = Enum.to_list(counter..counter+demand-1)

    # The events to emit is the second element of the tuple,
    # the third being the state.
    {:noreply, events, counter + demand}
  end
end

B是producer-consumer。这意味着它并不显式地处理需求,因为需求总是被转发到它的producers。一旦A接收了B的需求,它会给B发送事件,B会转换这些事件然后发送给C。在本例中,B会接收事件,并且把它们乘以初始存储在state中的数字。

alias Experimental.GenStage

defmodule B do
  use GenStage

  def init(number) do
    {:producer_consumer, number}
  end

  def handle_events(events, _from, number) do
    events = Enum.map(events, & &1 * number)
    {:noreply, events, number}
  end
end

C is the consumer which will finally receive those events and print them every second to the terminal:
C是一个consumer,最终接收这些事件并且每时每刻输出到终端。

alias Experimental.GenStage

defmodule C do
  use GenStage

  def init(sleeping_time) do
    {:consumer, sleeping_time}
  end

  def handle_events(events, _from, sleeping_time) do
    # Print events to terminal.
    IO.inspect(events)

    # Sleep the configured time.
    Process.sleep(sleeping_time)

    # We are a consumer, so we never emit events.
    {:noreply, [], sleeping_time}
  end
end
{:ok, a} = GenStage.start_link(A, 0)    # starting from zero
{:ok, b} = GenStage.start_link(B, 2)    # multiply by 2
{:ok, c} = GenStage.start_link(C, 1000) # sleep for a second

GenStage.sync_subscribe(c, to: b)
GenStage.sync_subscribe(b, to: a)

# Sleep so we see events printed.
Process.sleep(:infinity)

Refs

  1. http://elixir-lang.org/blog/2016/07/14/announcing-genstage/
  2. Elixir Conf 2016 GenStage
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,817评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,329评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,354评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,498评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,600评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,829评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,979评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,722评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,189评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,519评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,654评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,329评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,940评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,762评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,993评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,382评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,543评论 2 349

推荐阅读更多精彩内容