Streaming -- Operators -- Overview

操作符将一个或多个数据令转换为一个新的数据令。程序可以将多种转换组合成复杂的数据流拓扑。

本节将介绍基本的转换、应用这些转换后的有效物理分区以及对Flink s操作符链接的深入了解。

DataStream Transformations

Transformation Description
Map
DataStream → DataStream
获取一个元素并生成一个元素。将输入流的值加倍的映射函数:
dataStream.map { x => x * 2 }
FlatMap
DataStream → DataStream
获取一个元素并生成零个、一个或多个元素。一个将句子分割成单词的平面图函数:
dataStream.flatMap { str => str.split(" ") }
Filter
DataStream → DataStream
为每个元素计算布尔函数,并保留函数返回true的元素。过滤掉零值的过滤器:
dataStream.filter { _ != 0 }
KeyBy
DataStream → KeyedStream
将流逻辑地划分为不相交的分区,每个分区包含具有相同键的元素。在内部,这是通过散列分区实现的。有关如何指定键,请参阅。这个转换返回一个KeyedStream。
dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
Reduce
KeyedStream → DataStream
键控数据流上的“滚动”减少。将当前元素与最后减少的值组合并发出新值。
一种生成部分和流的reduce函数:
keyedStream.reduce { _ + _ }
Fold
KeyedStream → DataStream
对具有初始值的键控数据流进行“滚动”折叠。将当前元素与最后折叠的值组合并发出新值。
一个折叠函数,当应用于序列(1,2,3,4,5)时,会发出序列“start-1”、“start-1-2”、“start-1-2”,…
val result: DataStream[String] =
keyedStream.fold("start")((str, i) => { str + "-" + i })
Aggregations
KeyedStream → DataStream
键控数据流上的滚动聚合。min和minBy之间的区别是min返回最小值,而minBy返回该字段中具有最小值的元素(max和maxBy也是如此)。
keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")
Window
KeyedStream → WindowedStream
Windows可以在已经分区的KeyedStreams上定义。Windows根据某些特征(例如,最近5秒内到达的数据)对每个键中的数据进行分组。有关windows的描述,请参阅windows
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
WindowAll
DataStream → AllWindowedStream
可以在常规的数据表中定义Windows。Windows根据某些特征(例如,最后5秒内到达的数据)对所有流事件进行分组。有关windows的完整描述,请参阅windows
警告:在许多情况下,这是一个非并行转换。windowAll操作符的所有记录将被收集到一个任务中。
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
Window Apply
WindowedStream → DataStreamAllWindowedStream → DataStream
对整个窗口应用一个通用函数。下面是一个手动对窗口元素进行求和的函数。
注意:如果你正在使用windowAll转换,你需要使用一个AllWindowFunction代替。
windowedStream.apply { WindowFunction }
// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply { AllWindowFunction }
Window Reduce
WindowedStream → DataStream
对窗口应用函数reduce函数并返回减少的值。
windowedStream.reduce { _ + _ }
Window Fold
WindowedStream → DataStream
对窗口应用函数折叠函数并返回折叠后的值。当应用于序列(1,2,3,4,5)时,示例函数将序列折叠成字符串“start-1-2-3-4-5”:
val result: DataStream[String] =
windowedStream.fold("start", (str, i) => { str + "-" + i })
Aggregations on windows
WindowedStream → DataStream
聚合窗口的内容。min和minBy之间的区别是min返回最小值,而minBy返回该字段中具有最小值的元素(max和maxBy也是如此)。
windowedStream.sum(0)
windowedStream.sum("key")
windowedStream.min(0)
windowedStream.min("key")
windowedStream.max(0)
windowedStream.max("key")
windowedStream.minBy(0)
windowedStream.minBy("key")
windowedStream.maxBy(0)
windowedStream.maxBy("key")
Union
DataStream* → DataStream
合并两个或多个数据流,创建包含所有流中的所有元素的新流。注意:如果你将一个数据流和它本身联合起来,你将在结果流中得到每个元素两次。
dataStream.union(otherStream1, otherStream2, ...)
Window Join
DataStream,DataStream → DataStream
在一个给定的键和一个公共窗口上连接两个数据流。
dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply { ... }
Window CoGroup
DataStream,DataStream → DataStream
在一个给定的密钥和一个公共窗口上协同组两个数据流。
dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply {}
Connect
DataStream,DataStream → ConnectedStreams
“连接”两个数据流,保持它们的类型,允许在两个流之间共享状态。
someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...

val connectedStreams = someStream.connect(otherStream)
CoMap, CoFlatMap
ConnectedStreams → DataStream
类似于连接数据流上的map和flatMap
connectedStreams.map( (_ : Int) => true, (_ : String) => false)
connectedStreams.flatMap((_ : Int) => true,(_ : String) => false)
Split
DataStream → SplitStream
根据某些标准将流分割为两个或更多的流。
val split = someDataStream.split(
(num: Int) =>
(num % 2) match {
case 0 => List("even")
case 1 => List("odd")
}
)
Select
SplitStream → DataStream
从分割流中选择一个或多个流。
val even = split select "even"
val odd = split select "odd"
val all = split.select("even","odd")
Iterate
DataStream → IterativeStream → DataStream
通过将一个操作符的输出重定向到前一个操作符,在流中创建一个“反馈”循环。这对于定义不断更新模型的算法特别有用。下面的代码从一个流开始,并持续地应用迭代体。大于0的元素被发送回反馈通道,其余的元素被转发到下游。有关完整的描述,请参阅迭代。
initialStream.iterate {
iteration => {
val iterationBody = iteration.map {/*do something*/}
(iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
}
}
Extract Timestamps
DataStream → DataStream
从记录中提取时间戳,以便与使用事件时间语义的窗口一起工作。看到事件时间
stream.assignTimestamps { timestampExtractor }

通过匿名模式匹配从元组、case类和集合中提取,如下所示:

val data: DataStream[(Int, String, Double)] = // [...]
data.map {
  case (id, name, temperature) => // [...]
}

不支持开箱即用的API。要使用这个特性,您应该使用一个Scala API扩展

以下转换可用于元组的数据流

Transformation Description
Project
DataStream → DataStream
从元组中选择字段的子集
DataStream<Tuple3<Integer, Double, String>> in = // [...]
DataStream<Tuple2<String, Integer>> out = in.project(2,0);

Physical partitioning

Flink还通过以下函数对转换后的流分区进行低级控制(如果需要的话)。

Transformation Description
Custom partitioning
DataStream → DataStream
使用用户定义的分拆程序为每个元素选择目标任务。
dataStream.partitionCustom(partitioner, "someKey")
dataStream.partitionCustom(partitioner, 0)
Random partitioning
DataStream → DataStream
按均匀分布随机划分元素。
dataStream.shuffle()
Rebalancing (Round-robin partitioning)
DataStream → DataStream
分区元素循环,创建每个分区的相同负载。对于存在数据倾斜时的性能优化非常有用
dataStream.rebalance()
Rescaling
DataStream → DataStream
循环地将元素划分为下游操作的子集。例如,如果您希望拥有从源的每个并行实例扇出到多个映射器子集的管道,以分发负载,但又不希望进行再平衡()导致的完全再平衡,那么这是非常有用的。这将只需要本地数据传输,而不需要通过网络传输数据,这取决于其他配置值,比如taskmanager的槽数。

上游操作向其发送元素的下游操作的子集取决于上游操作和下游操作两者的并行度。 例如,如果上游操作具有并行度2,而下游操作具有并行度4,则一个上游操作将元素分配给两个下游操作,而另一个上游操作将分配给另外两个下游操作。 另一方面,如果下游操作具有并行性2而上游操作具有并行性4,则两个上游操作将分配给一个下游操作,而其他两个上游操作将分配给其他下游操作。

如果不同的并行性不是彼此的倍数,一个或几个下游操作将拥有与上游操作不同数量的输入。

请参见上图中连接模式的可视化。
image.png

dataStream.rescale()
Broadcasting
DataStream → DataStream
将元素广播到每个分区。
dataStream.broadcast()

Task chaining and resource groups

链接两个后续转换意味着将它们放在同一个线程中以获得更好的性能。如果可能的话,默认情况下使用Flink链操作符(例如,两个后续映射转换)。如果需要,该API可以对链接进行细粒度控制

如果希望在整个作业中禁用链接,请使用StreamExecutionEnvironment.disableOperatorChaining()。对于更细粒度的控制,可以使用以下功能。注意,这些函数只能在DataStream转换之后使用,因为它们引用前一个转换。例如,可以使用someStream.map(…). startnewchain(),但是不能使用someStream.startNewChain()

资源组是Flink中的一个槽,请参阅。如果需要,可以手动将运算符隔离在不同的槽中。

Transformation Description
Start new chain 开始一个新的链,从这个操作符开始。这两个映射器将被链接,而filter将不会链接到第一个映射器。
someStream.filter(...).map(...).startNewChain().map(...)
Disable chaining 不链接地图操作符
someStream.map(...).disableChaining()
设置插槽共享组 设置操作的插槽共享组。Flink将把具有相同插槽共享组的操作放入相同的插槽中,而将没有该插槽共享组的操作保留在其他插槽中。这可以用来隔离插槽。如果所有输入操作都在同一个槽共享组中,则从输入操作继承槽共享组。默认槽共享组的名称为“default”,可以通过调用slotSharingGroup(“default”)显式地将操作放入到这个组中。
someStream.filter(...).slotSharingGroup("name")
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容