操作符将一个或多个DataStream转换成为新的DataStream。程序可以将多个转换组合成复杂的数据流拓步。
本节给出了基本转换的描述,以及应用这些转换后的有效物理分区和Flink的操作符链接的见解。
DataStream转换操作
转换操作 | 描述 |
---|---|
Map DataStream → DataStream |
输入一个元素并返回一个元素。一个将输入流的value乘以2的map函数: <pre> DataStream<Integer> dataStream = //... dataStream.map(new MapFunction<Integer, Integer>() { @Override public Integer map(Integer value) throws Exception { return 2 * value; } }); </pre> |
FlatMap DataStream → DataStream |
输入一个元素,返回0,1或多个元素。一个将句子拆分成单词的flatmap函数: <pre> <code> dataStream.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { for(String word: value.split(" ")){ out.collect(word); } } });</code> </pre> |
Filter DataStream → DataStream |
计算每个元素的布尔函数,并保留函数返回true的值,一个过滤掉0值的filter: <pre> dataStream.filter(new FilterFunction<Integer>() { @Override public boolean filter(Integer value) throws Exception { return value != 0; } }); <code></code> </pre> |
KeyBy DataStream → KeyedStream |
逻辑上将一条流划分为不同的区,每个区包含相同键的元素。在内部,这是通过hash分区实现的。如何指定键请看keys。这个转换操作返回一个KeyedStream。 <pre> <code>dataStream.keyBy("someKey") // Key by field "someKey" dataStream.keyBy(0) // Key by the first element of a Tuple</code></pre> 注意:下述类型不能成为key: 1. 它是一个POJO类,但是没有重写hashCode()方法而是依赖于Object.hashCode()的实现。 2. 它是任意类型的数组。 |
Reduce KeyedStream → DataStream |
keyed数据流上的“滚动”reduce。将当前元素与最后一个reduce值组合并发出新的值。 一个创建流的部分和的reduce函数: <pre> <code> keyedStream.reduce(new ReduceFunction<Integer>() { @Override public Integer reduce(Integer value1, Integer value2) throws Exception { return value1 + value2; } }); </code></pre> |
Fold KeyedStream → DataStream |
具有初始值的keyed数据流上的“滚动”fold。将当前元素与最后一个fold值组合并发出新的值。 当应用于序列(1,2,3,4,5)上时,fold函数会发出"start-1", "start-1-2", "start-1-2-3", ... <pre> <code> DataStream<String> result = keyedStream.fold("start", new FoldFunction<Integer, String>() { @Override public String fold(String current, Integer value) { return current + "-" + value; } }); </code></pre> |
Aggregations KeyedStream → DataStream |
keyed数据流上的滚动聚合。min和minBy的区别在于min返回最小值,minBy返回这个属性上具有最小值的元素(max和maxBy相同). <pre> <code> keyedStream.sum(0); keyedStream.sum("key"); keyedStream.min(0); keyedStream.min("key"); keyedStream.max(0); keyedStream.max("key"); keyedStream.minBy(0); keyedStream.minBy("key"); keyedStream.maxBy(0); keyedStream.maxBy("key"); </code></pre> |
Window KeyedStream → WindowedStream |
可以在已经分区的KeyedStream上定义窗口。窗口根据键的某些特性(例如,在最后5秒到达的数据)分组数据。关于窗口的完整描述请见Windows。 <pre> <code> dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data </code></pre> |
WindowAll DataStream → AllWindowedStream |
可以在常规的DataStream上定义窗口。窗口根据一些特性(例如,在最后5秒到达的数据)分组所有流的事件。关于窗口的完整描述请见Windows。 警告: 这在很多情况下不是一个并行的转换。windowAll操作符会把所有的记录收集到一个任务中。 <pre> <code> dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data</code></pre> |
Window Apply WindowedStream → DataStream AllWindowedStream → DataStream |
将一个通用函数应用于窗口。下面是一个手动计算窗口元素的函数。 注意: 如果使用windowAll转换,需要使用AllWindowFunction代替。 <pre> <code> windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() { public void apply (Tuple tuple, Window window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); } }); // applying an AllWindowFunction on non-keyed window stream allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() { public void apply (Window window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); } }); </code></pre> |
Window Reduce WindowedStream → DataStream |
将reduce函数应用到窗口上,并返回reduce的值。 <pre> <code> windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() { public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1); } }); </code></pre> |
Window Fold WindowedStream → DataStream |
将fold函数应用到窗口上,并返回fold值。当应用到列表(1,2,3,4,5)上时,示例函数fold列表成字符串"start-1-2-3-4-5": <pre> <code> windowedStream.fold("start", new FoldFunction<Integer, String>() { public String fold(String current, Integer value) { return current + "-" + value; } }); </code></pre> |
Aggregations on windows WindowedStream → DataStream |
聚合窗口的内容。min和minBy的区别在于min返回最小值,minBy返回这个属性上具有最小值的元素(max和maxBy相同). <pre> <code> windowedStream.sum(0); windowedStream.sum("key"); windowedStream.min(0); windowedStream.min("key"); windowedStream.max(0); windowedStream.max("key"); windowedStream.minBy(0); windowedStream.minBy("key"); windowedStream.maxBy(0); windowedStream.maxBy("key"); </code></pre> |
Union DataStream* → DataStream |
合并两个或多个数据流,然后生成一个新的包含所有流的所有元素的流。注意:如果将数据流与自己合并,在新的流中会得到每个元素两次。dataStream.union(otherStream1, otherStream2, ...);
|
Window Join DataStream,DataStream → DataStream |
在公共窗口和给定键上连接两个数据流。 <pre> <code> dataStream.join(otherStream) .where(<key selector>).equalTo(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply (new JoinFunction () {...}); </code></pre> |
Window CoGroup DataStream,DataStream → DataStream |
在公共窗口和给定键上对两个数据流进行分组。 <pre> <code> dataStream.coGroup(otherStream) .where(0).equalTo(1) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply (new CoGroupFunction () {...}); </code></pre> |
Connect DataStream,DataStream → ConnectedStreams |
“连接”两个数据流保留它们的类型。连接允许两个流之间的共享状态。 <pre> <code> DataStream<Integer> someStream = //... DataStream<String> otherStream = //... ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);</code></pre> |
CoMap, CoFlatMap ConnectedStreams → DataStream |
类似于连接数据流上的map和flatMap。 <pre> <code> connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() { @Override public Boolean map1(Integer value) { return true; } @Override public Boolean map2(String value) { return false; } }); connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() { @Override public void flatMap1(Integer value, Collector<String> out) { out.collect(value.toString()); } @Override public void flatMap2(String value, Collector<String> out) { for (String word: value.split(" ")) { out.collect(word); } } });</code></pre> |
Split DataStream → SplitStream |
根据某些标准将流分成两个或多个流。 <pre> <code> SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() { @Override public Iterable<String> select(Integer value) { List<String> output = new ArrayList<String>(); if (value % 2 == 0) { output.add("even"); } else { output.add("odd"); } return output; } }); </code></pre> |
Select SplitStream → DataStream |
从拆分流中选择一个或多个流。 <pre> <code> SplitStream<Integer> split; DataStream<Integer> even = split.select("even"); DataStream<Integer> odd = split.select("odd"); DataStream<Integer> all = split.select("even","odd"); </code></pre> |
Iterate DataStream → IterativeStream → DataStream |
通过重定向操作符的输出到前一个操作符,创建一个流上的“反馈”循环。这对于连续更新模型的算法特别有用。下面的代码以一个数据流开始并持续的应用迭代体。大于0的元素被发送回反馈通道,其余元素被发送到下游。完整的描述请参见iterations。 <pre> <code> IterativeStream<Long> iteration = initialStream.iterate(); DataStream<Long> iterationBody = iteration.map (/do something/); DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){ @Override public boolean filter(Integer value) throws Exception { return value > 0; } }); iteration.closeWith(feedback); DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){ @Override public boolean filter(Integer value) throws Exception { return value <= 0; } }); </code></pre> |
Extract Timestamps DataStream → DataStream |
从记录中提取时间戳,以便于使用事件时间语义的窗口工作。见Event Time。stream.assignTimestamps (new TimeStampExtractor() {...});
|
可以对Tuple数据流有以下转换:
转换操作 | 描述 |
---|---|
Project DataStream → DataStream |
选取元组属性的子集。 <pre> <code> DataStream<Tuple3<Integer, Double, String>> in = // [...] DataStream<Tuple2<String, Integer>> out = in.project(2,0); </code></pre> |
物理分区
Flink通过下述函数,在转换后的流上提供低层次的精确流分区控制(如果需要的话)。
转换操作 | 描述 |
---|---|
Custom partitioning DataStream → DataStream |
使用用户定义的分区器来为每个元素选择目标任务。 <pre> <code> dataStream.partitionCustom(partitioner, "someKey"); dataStream.partitionCustom(partitioner, 0); </code></pre> |
Random partitioning DataStream → DataStream |
根据均匀分布随机划分元素。 <pre> <code> dataStream.shuffle(); </code></pre> |
Rebalancing (Round-robin partitioning) DataStream → DataStream |
分区元素循环,为每个分区创建相同的负载。在数据倾斜的情况下对性能优化有用。 <pre> <code> dataStream.rebalance(); </code></pre> |
Rescaling DataStream → DataStream |
Partitions elements, round-robin, to a subset of downstream operations. This is useful if you want to have pipelines where you, for example, fan out from each parallel instance of a source to a subset of several mappers to distribute load but don't want the full rebalance that rebalance() would incur. This would require only local data transfers instead of transferring data over network, depending on other configuration values such as the number of slots of TaskManagers. The subset of downstream operations to which the upstream operation sends elements depends on the degree of parallelism of both the upstream and downstream operation. For example, if the upstream operation has parallelism 2 and the downstream operation has parallelism 6, then one upstream operation would distribute elements to three downstream operations while the other upstream operation would distribute to the other three downstream operations. If, on the other hand, the downstream operation has parallelism 2 while the upstream operation has parallelism 6 then three upstream operations would distribute to one downstream operation while the other three upstream operations would distribute to the other downstream operation. In cases where the different parallelisms are not multiples of each other one or several downstream operations will have a differing number of inputs from upstream operations. Please see this figure for a visualization of the connection pattern in the above example: <pre> <code> dataStream.rescale(); </code></pre> |
Broadcasting DataStream → DataStream |
向每个分区广播元素。 <pre> <code> dataStream.broadcast(); </code></pre> |
任务链接和资源组
链接两个子转换意味着它们在同一个线程中以获得更好的性能。Flink默认如果可能会链接操作符(例如,两个map转换)。如果需要,API对链接提供细粒度的控制。
如果想要在整个作业中禁止链接,使用StreamExecutionEnvironment.disableOperatorChaining()方法。对于更细粒度的控制,可以使用下述函数。注意这些函数只能在DataStream转换之后使用,因为它们引用了前面的转换。例如,你可以使用someStream.map(…). startnewchain(),但你不能使用someStream.startNewChain()。
在Flink中,资源组是一个槽,见slots。如果需要,您可以在单独的槽中手动隔离操作符。
转换操作 | 描述 |
---|---|
Start new chain | 从这个操作符开始一个新的链。两个map将被链接,并且filter不会被链接到第一个map。 <pre> <code> someStream.filter(...).map(...).startNewChain().map(...); </code></pre> |
Disable chaining | 不要链接map操作符. <pre> <code>someStream.map(...).disableChaining(); |
</code></pre> | |
Set slot sharing group | Set the slot sharing group of an operation. Flink will put operations with the same slot sharing group into the same slot while keeping operations that don't have the slot sharing group in other slots. This can be used to isolate slots. The slot sharing group is inherited from input operations if all input operations are in the same slot sharing group. The name of the default slot sharing group is "default", operations can explicitly be put into this group by calling slotSharingGroup("default"). <pre> <code> someStream.filter(...).slotSharingGroup("name");</code></pre> |