1. map
DataStream mapStram = dataStream.map(new MapFunction<String, Integer>() {
public Integer map(String value) throws Exception {
return value.length();
}
});
2. flatMap
DataStream flatMapStream = dataStream.flatMap(new FlatMapFunction<String, String>() {
public void flatMap(String value, Collector out) throws Exception {
String[] fields = value.split(",");
for (String field: fields) {
out.collect(field);
}
}
});
3. filter
DataStream filterStream = dataStream.filter(new FilterFunction()<String> {
public boolean filter(String value) throws Exception {
return value == 1;
}
});
4. keyBy
DataStream -> KeyedStream. 逻辑上将相同key拆分到同一分区
5. Rolling Aggregation
对KeyedStream的每一个支流做滚动聚合。sum(), min(), max(), minBy(), maxBy()
6. reduce
KeyedStream -> DataStream. 合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果
// 分组
KeyedStream keyedStream= dataStream.keyBy("id");
// reduce 聚合,取最小的温度值,并输出当前的时间戳
DataStream reduceStream = keyedStream.reduce(new ReduceFunction()<SensorReading> {
@Override
public SensorReading reduce(SensorReading value1,SensorReading value2) throws Exception {
return new SensorReading(value1.getId(), value2.getTimestamp(), Math.min(value1.getTemperature(), value2.getTemperature()));
}
});
7. Split和Select
DataStream -> split -> SplitStream -> select -> DataStream.
SplitStream splitStream = dataStream.split(new OutputSelector()<SensorReading> {
@Override
public Iterable select(SensorReading value) {
return (value.getTemperature()>30) ? Collections.singletonList("high") : Collections.singletonList("low");
}
});
DataStream highTempStream = splitStream.select("high");
DataStream lowTempStream = splitStream.select("low");
DataStream allTempStream = splitStream.select("high","low");
8. Connect
DataStream,DataStream -> ConnectedStreams. 可以连接两个不同类型的数据流,两个流被connect之后,只是被放到了同一个流里,仍然保持各自的数据类型
9. CoMap和CoFlatMap
ConnectedStreams -> DataStream.
// 合流 connect
DataStream<Tuple2<String, Double>> warningStream = highTempStream.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
@Override
public Tuple2 map(SensorReading value) throws Exception {
return new Tuple2<>(value.getId(), value.getTemperature());
}
});
ConnectedStreams<Tuple2<String, Double>, SensorReading> connectedStreams = warningStream.connect(lowTempStream);
DataStream<Object> resultStream = connectedStreams.map(new CoMapFunction<Tuple2<String,Double>, SensorReading, Object>() {
@Override
public Object map1(Tuple2 value) throws Exception{
return new Tuple3<>(value.f0, value.f1,"warning");
}
@Override
public Object map2(SensorReading value) throws Exception {
return new Tuple2<>(value.getId(),"healthy");
}
});
10. Union
连接数据类型一样的多条流