Operators

原文链接

操作符将一个或多个DataStream转换成为新的DataStream。程序可以将多个转换组合成复杂的数据流拓步。

本节给出了基本转换的描述,以及应用这些转换后的有效物理分区和Flink的操作符链接的见解。

DataStream转换操作

转换操作 描述
Map
DataStream → DataStream
输入一个元素并返回一个元素。一个将输入流的value乘以2的map函数:
<pre> DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return 2 * value;
}
}); </pre>
FlatMap
DataStream → DataStream
输入一个元素,返回0,1或多个元素。一个将句子拆分成单词的flatmap函数:
<pre> <code> dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
for(String word: value.split(" ")){
out.collect(word);
}
}
});</code> </pre>
Filter
DataStream → DataStream
计算每个元素的布尔函数,并保留函数返回true的值,一个过滤掉0值的filter:
<pre> dataStream.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value != 0;
}
}); <code></code> </pre>
KeyBy
DataStream → KeyedStream
逻辑上将一条流划分为不同的区,每个区包含相同键的元素。在内部,这是通过hash分区实现的。如何指定键请看keys。这个转换操作返回一个KeyedStream。
<pre> <code>dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple</code></pre>
注意:下述类型不能成为key:
1. 它是一个POJO类,但是没有重写hashCode()方法而是依赖于Object.hashCode()的实现。
2. 它是任意类型的数组。
Reduce
KeyedStream → DataStream
keyed数据流上的“滚动”reduce。将当前元素与最后一个reduce值组合并发出新的值。
一个创建流的部分和的reduce函数:
<pre> <code> keyedStream.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2) throws Exception {
return value1 + value2;
}
}); </code></pre>
Fold
KeyedStream → DataStream
具有初始值的keyed数据流上的“滚动”fold。将当前元素与最后一个fold值组合并发出新的值。
当应用于序列(1,2,3,4,5)上时,fold函数会发出"start-1", "start-1-2", "start-1-2-3", ...
<pre> <code> DataStream<String> result = keyedStream.fold("start", new FoldFunction<Integer, String>() {
@Override
public String fold(String current, Integer value) {
return current + "-" + value;
}
}); </code></pre>
Aggregations
KeyedStream → DataStream
keyed数据流上的滚动聚合。min和minBy的区别在于min返回最小值,minBy返回这个属性上具有最小值的元素(max和maxBy相同).
<pre> <code> keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key"); </code></pre>
Window
KeyedStream → WindowedStream
可以在已经分区的KeyedStream上定义窗口。窗口根据键的某些特性(例如,在最后5秒到达的数据)分组数据。关于窗口的完整描述请见Windows
<pre> <code> dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data </code></pre>
WindowAll
DataStream → AllWindowedStream
可以在常规的DataStream上定义窗口。窗口根据一些特性(例如,在最后5秒到达的数据)分组所有流的事件。关于窗口的完整描述请见Windows
警告: 这在很多情况下不是一个并行的转换。windowAll操作符会把所有的记录收集到一个任务中。
<pre> <code> dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data</code></pre>
Window Apply
WindowedStream → DataStream
AllWindowedStream → DataStream
将一个通用函数应用于窗口。下面是一个手动计算窗口元素的函数。
注意: 如果使用windowAll转换,需要使用AllWindowFunction代替。
<pre> <code> windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
public void apply (Tuple tuple, Window window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception {
int sum = 0;
for (value t: values) {
sum += t.f1;
}
out.collect (new Integer(sum));
}
});
// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
public void apply (Window window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception {
int sum = 0;
for (value t: values) {
sum += t.f1;
}
out.collect (new Integer(sum));
}
}); </code></pre>
Window Reduce
WindowedStream → DataStream
将reduce函数应用到窗口上,并返回reduce的值。
<pre> <code> windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
}
}); </code></pre>
Window Fold
WindowedStream → DataStream
将fold函数应用到窗口上,并返回fold值。当应用到列表(1,2,3,4,5)上时,示例函数fold列表成字符串"start-1-2-3-4-5":
<pre> <code> windowedStream.fold("start", new FoldFunction<Integer, String>() {
public String fold(String current, Integer value) {
return current + "-" + value;
}
}); </code></pre>
Aggregations on windows
WindowedStream → DataStream
聚合窗口的内容。min和minBy的区别在于min返回最小值,minBy返回这个属性上具有最小值的元素(max和maxBy相同).
<pre> <code> windowedStream.sum(0);
windowedStream.sum("key");
windowedStream.min(0);
windowedStream.min("key");
windowedStream.max(0);
windowedStream.max("key");
windowedStream.minBy(0);
windowedStream.minBy("key");
windowedStream.maxBy(0);
windowedStream.maxBy("key"); </code></pre>
Union
DataStream* → DataStream
合并两个或多个数据流,然后生成一个新的包含所有流的所有元素的流。注意:如果将数据流与自己合并,在新的流中会得到每个元素两次。
dataStream.union(otherStream1, otherStream2, ...);
Window Join
DataStream,DataStream → DataStream
在公共窗口和给定键上连接两个数据流。
<pre> <code> dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new JoinFunction () {...}); </code></pre>
Window CoGroup
DataStream,DataStream → DataStream
在公共窗口和给定键上对两个数据流进行分组。
<pre> <code> dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new CoGroupFunction () {...}); </code></pre>
Connect
DataStream,DataStream → ConnectedStreams
“连接”两个数据流保留它们的类型。连接允许两个流之间的共享状态。
<pre> <code> DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...
ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);</code></pre>
CoMap, CoFlatMap
ConnectedStreams → DataStream
类似于连接数据流上的map和flatMap。
<pre> <code> connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
@Override
public Boolean map1(Integer value) {
return true;
}
@Override
public Boolean map2(String value) {
return false;
}
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {
@Override
public void flatMap1(Integer value, Collector<String> out) {
out.collect(value.toString());
}
@Override
public void flatMap2(String value, Collector<String> out) {
for (String word: value.split(" ")) {
out.collect(word);
}
}
});</code></pre>
Split
DataStream → SplitStream
根据某些标准将流分成两个或多个流。
<pre> <code> SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) {
List<String> output = new ArrayList<String>();
if (value % 2 == 0) {
output.add("even");
}
else {
output.add("odd");
}
return output;
}
}); </code></pre>
Select
SplitStream → DataStream
从拆分流中选择一个或多个流。
<pre> <code> SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd"); </code></pre>
Iterate
DataStream → IterativeStream → DataStream
通过重定向操作符的输出到前一个操作符,创建一个流上的“反馈”循环。这对于连续更新模型的算法特别有用。下面的代码以一个数据流开始并持续的应用迭代体。大于0的元素被发送回反馈通道,其余元素被发送到下游。完整的描述请参见iterations
<pre> <code> IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/do something/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Integer value) throws Exception {
return value > 0;
}
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Integer value) throws Exception {
return value <= 0;
}
}); </code></pre>
Extract Timestamps
DataStream → DataStream
从记录中提取时间戳,以便于使用事件时间语义的窗口工作。见Event Time
stream.assignTimestamps (new TimeStampExtractor() {...});

可以对Tuple数据流有以下转换:

转换操作 描述
Project
DataStream → DataStream
选取元组属性的子集。
<pre> <code> DataStream<Tuple3<Integer, Double, String>> in = // [...]
DataStream<Tuple2<String, Integer>> out = in.project(2,0); </code></pre>

物理分区

Flink通过下述函数,在转换后的流上提供低层次的精确流分区控制(如果需要的话)。

转换操作 描述
Custom partitioning
DataStream → DataStream
使用用户定义的分区器来为每个元素选择目标任务。
<pre> <code> dataStream.partitionCustom(partitioner, "someKey");
dataStream.partitionCustom(partitioner, 0); </code></pre>
Random partitioning
DataStream → DataStream
根据均匀分布随机划分元素。
<pre> <code> dataStream.shuffle(); </code></pre>
Rebalancing (Round-robin partitioning)
DataStream → DataStream
分区元素循环,为每个分区创建相同的负载。在数据倾斜的情况下对性能优化有用。
<pre> <code> dataStream.rebalance(); </code></pre>
Rescaling
DataStream → DataStream
Partitions elements, round-robin, to a subset of downstream operations. This is useful if you want to have pipelines where you, for example, fan out from each parallel instance of a source to a subset of several mappers to distribute load but don't want the full rebalance that rebalance() would incur. This would require only local data transfers instead of transferring data over network, depending on other configuration values such as the number of slots of TaskManagers.

The subset of downstream operations to which the upstream operation sends elements depends on the degree of parallelism of both the upstream and downstream operation. For example, if the upstream operation has parallelism 2 and the downstream operation has parallelism 6, then one upstream operation would distribute elements to three downstream operations while the other upstream operation would distribute to the other three downstream operations. If, on the other hand, the downstream operation has parallelism 2 while the upstream operation has parallelism 6 then three upstream operations would distribute to one downstream operation while the other three upstream operations would distribute to the other downstream operation.

In cases where the different parallelisms are not multiples of each other one or several downstream operations will have a differing number of inputs from upstream operations.

Please see this figure for a visualization of the connection pattern in the above example:
Apache_Flink_1_4_Documentation__Operators.png

<pre> <code> dataStream.rescale(); </code></pre>
Broadcasting
DataStream → DataStream
向每个分区广播元素。 <pre> <code> dataStream.broadcast(); </code></pre>

任务链接和资源组

链接两个子转换意味着它们在同一个线程中以获得更好的性能。Flink默认如果可能会链接操作符(例如,两个map转换)。如果需要,API对链接提供细粒度的控制。
如果想要在整个作业中禁止链接,使用StreamExecutionEnvironment.disableOperatorChaining()方法。对于更细粒度的控制,可以使用下述函数。注意这些函数只能在DataStream转换之后使用,因为它们引用了前面的转换。例如,你可以使用someStream.map(…). startnewchain(),但你不能使用someStream.startNewChain()。
在Flink中,资源组是一个槽,见slots。如果需要,您可以在单独的槽中手动隔离操作符。

转换操作 描述
Start new chain 从这个操作符开始一个新的链。两个map将被链接,并且filter不会被链接到第一个map。 <pre> <code> someStream.filter(...).map(...).startNewChain().map(...); </code></pre>
Disable chaining 不要链接map操作符. <pre> <code>someStream.map(...).disableChaining();
</code></pre>
Set slot sharing group Set the slot sharing group of an operation. Flink will put operations with the same slot sharing group into the same slot while keeping operations that don't have the slot sharing group in other slots. This can be used to isolate slots. The slot sharing group is inherited from input operations if all input operations are in the same slot sharing group. The name of the default slot sharing group is "default", operations can explicitly be put into this group by calling slotSharingGroup("default"). <pre> <code> someStream.filter(...).slotSharingGroup("name");</code></pre>
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,454评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,553评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,921评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,648评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,770评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,950评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,090评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,817评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,275评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,592评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,724评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,409评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,052评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,815评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,043评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,503评论 2 361
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,627评论 2 350

推荐阅读更多精彩内容