1. 简介
在2016年7月14号,Elixir发布了GenStage。官方对GenStage的描述是:
GenStage is a new Elixir behaviour for exchanging events with back-pressure between Elixir processes.
GenStage是生产者和消费者之间交互事件(event)的规范。简言之,Elixir希望用GenStage代替GenEvent并提供可组合的抽象层,来从第三方系统获取和处理数据。
查看官方文档,我们可以对Stage有个初步的理解:
Stages are computation steps that send and/or receive data from other stages.
Stage就是运算步骤,每个Stage能发送数据或从其它Stage获取数据。
本文主要基于Announcing GenStage和Elixir Conf 2016的Keynote内容对于GenStage进行阐述。
2. 背景
当初José Valim创建Elixir的一大初衷是引入更好的抽象来处理集合。所以Elixir才有List,Enum,Stream,Pipe |>
这么多好东西。当然,不仅这样,Elixir也提供给开发者一条处理集合的路径,从激进到懒惰,再到并发,再到分布式(from eager to lazy, to concurrent and then distributed)。
下面我们从一个简单的单词计数程序开始,探讨集合处理过程的演变。
Eager / Enum
File.read!("path/to/some/file")
|> String.split("\n")
|> Enum.flat_map(fn line ->
String.split(line, " ")
end)
|> Enum.reduce(%{}, fn word, acc ->
Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()
这个方案对于小文件很适用,但是对于大文件,它需要把文本全部读入内存,并且Enum.flat_map/2
会创建一个巨大的List,这个List包含了文件中的所有单词,然后才能计数。这样我们浪费了大量内存,并且浪费了很多构建List的时间,不用想,这段程序的效率也很低。
Lazy / Stream
幸运的是,Elixir打一开始就提供了解决这个问题的方案,也就是大家耳熟能详的streams
。相比于Enum的eager,Stream则是lazy。Stream会遍历List的每个元素,在这个例子中,就是每一行,而不是之前那样把整个文件存到内存中。
不大清楚eager和lazy的同学可以回想一下布尔表达式,exp1 && exp2,如果exp1求值是false,整个表达式肯定是false,我们不对exp2进行计算,偷一下懒,这就是lazy。假如我们还对exp2进行求值,就是想知道它的值,这就是eager了。
我们来看一下采用Stream替代Enum后的版本:
File.stream!("path/to/some/file")
|> Stream.flat_map(fn line ->
String.split(line, " ")
end)
|> Enum.reduce(%{}, fn word, acc ->
Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()
通过使用File.stream!
和 Stream.flat_map
Stream折叠了(folds)了计算过程,元素一个一个进入流,而不是加载一个巨大的文件,通过Stream可以处理大型文件,或者是「无限的」数据流,就比如twitter上每天产生的新信息。
TODO
Concurrent / Flow
当然,这个版本还是有些小缺陷,它还是没有用到并发。现代计算机一般都有多个核心,能否合理利用多核是我们高效完成任务的关键。
在ElixirConf 2015 keynote中,José Valim给出了一个最直接的多核解决方案。这个方案将你pipeline的一部分给到了另外的processes。
File.stream!("path/to/some/file")
|> Stream.flat_map(fn line ->
String.split(line, " ")
end)
|> Stream.async() # NEW!
|> Enum.reduce(%{}, fn word, acc ->
Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()
Stream.async
将在另外的process中运行之前的计算然后流式的把结果给到Enum.reduce
这个process。不幸的是,这个方法仍不完善。
首先,不同process之间我们想尽量避免数据迁移。相反,我们想开启多个processes来并发地做同一种计算任务。其次,如果我们需要开发者手动放置Stream.async
的话,会出现很多低效或错误用法。
尽管这个解决方案存在缺陷,但是它帮助我们提出了正确的问题:
- 如果
Stream.async
建立新的process,如何保证这些process被监控? - 由于我们在进程间交换数据,如何防止一个进程获取太多数据?我们需要一个back-pressure机制来让接收进程规定来自发送进程的信息承载量。
在2016的keynote中给出了Flow的解决方案。
在一个标准双核电脑上,对于2GB的文本文件进行字数统计,Enum的方式花时远超10分钟,等不下去了,Stream的方式花时60秒,Flow的方式花时36秒。
Flow
- We give up ordering and process locality for concurrency
- Tools for working with bounded and unbounded data
- It is not magic! There is an overhead when data flows through processes
- Requires volume and/or cpu/io bound work to see benefits
第三条是说,假设我们要对一个文件的所有数字求和,我们使用Flow并不会比Stream更快,因为我们要在不同process之前传输大量的数字。???
Flow总共有1200行代码,1300行的文档。
3. GenStage
我们来写一个简单的pipeline,它将产生events,增加数字,将数字乘二,打印到终端。
三个stages,分别是:producer
, :producer_consumer
和 :consumer
。把它们简称为A, B, C
。
我们首先从producer A开始。A作为producer,它的主要职责是接收需求,consumer需要处理的事件的数量,并且产生事件。这些事件存在于内存中或者来自外部数据源。现在实现一个简单的计数器,通过init/1
给计数器一个初始值。
注意:所有GenStage项目都有Experimental命名空间作为前缀。所以下方的代码中你都将看到
Experimental.GenStage
。
alias Experimental.GenStage
defmodule A do
use GenStage
def init(counter) do
{:producer, counter}
end
def handle_demand(demand, counter) when demand > 0 do
# If the counter is 3 and we ask for 2 items, we will
# emit the items 3 and 4, and set the state to 5.
events = Enum.to_list(counter..counter+demand-1)
# The events to emit is the second element of the tuple,
# the third being the state.
{:noreply, events, counter + demand}
end
end
B是producer-consumer。这意味着它并不显式地处理需求,因为需求总是被转发到它的producers。一旦A接收了B的需求,它会给B发送事件,B会转换这些事件然后发送给C。在本例中,B会接收事件,并且把它们乘以初始存储在state中的数字。
alias Experimental.GenStage
defmodule B do
use GenStage
def init(number) do
{:producer_consumer, number}
end
def handle_events(events, _from, number) do
events = Enum.map(events, & &1 * number)
{:noreply, events, number}
end
end
C is the consumer which will finally receive those events and print them every second to the terminal:
C是一个consumer,最终接收这些事件并且每时每刻输出到终端。
alias Experimental.GenStage
defmodule C do
use GenStage
def init(sleeping_time) do
{:consumer, sleeping_time}
end
def handle_events(events, _from, sleeping_time) do
# Print events to terminal.
IO.inspect(events)
# Sleep the configured time.
Process.sleep(sleeping_time)
# We are a consumer, so we never emit events.
{:noreply, [], sleeping_time}
end
end
{:ok, a} = GenStage.start_link(A, 0) # starting from zero
{:ok, b} = GenStage.start_link(B, 2) # multiply by 2
{:ok, c} = GenStage.start_link(C, 1000) # sleep for a second
GenStage.sync_subscribe(c, to: b)
GenStage.sync_subscribe(b, to: a)
# Sleep so we see events printed.
Process.sleep(:infinity)