keyBy
意思:分组之意。
DataStream -> KeyedStream : 逻辑的将一个流拆分成不相交的“分区”,每个分区包含相同的 key元素,在内部以 hash 的形式实现。
滚动聚合算子(Rolling Aggregation)
- sum()
- min()
- max()
- minBy()
- maxBy()
- reduce()
//转换成 SensorReading 类型
DataStream<SensorReading> dataStream = inputStream.map(new MapFunction<String, SensorReading>() {
@Override
public SensorReading map(String value) throws Exception {
String[] fields = value.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
}
});
// DataStream<SensorReading> dataStream = inputStream.map(line -> {
// String[] fields = line.split(",");
// return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
// });
//分组
KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id");
//max 0r maxBy
// SingleOutputStreamOperator<SensorReading> resultStream = keyedStream.maxBy("temperature");
//reduce 聚合
SingleOutputStreamOperator<SensorReading> resultStream = keyedStream.reduce(new ReduceFunction<SensorReading>() {
@Override
public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception {
return new SensorReading(value1.getId(), value2.getTimestamp(), Math.max(value1.getTemperature(), value2.getTemperature()));
}
});
//lambda
// keyedStream.reduce((curState, newData) -> {
// return new SensorReading(curState.getId(), newData.getTimestamp(), Math.max(curState.getTemperature(), newData.getTemperature()));
// });
resultStream.print();
多流转换算子
- Split 和 Select
Split : DataStream -> SplitStream : 根据某些特征把一个DataStream 拆分2个或者多个DataStream . - Connect 和CoMap
DataStream ,DataStream -> ConnectedStream : 链接两个保持他们类型的数据流,两个数据流Connect 之后,只是被放在了同一个流中,内部依然保持各自的数据和形式不发生任何的变化,两个流相互独立
之后要做转化用 CoMap ,CoFlatMap ,真正的转换成一条流。
缺点:不能链接多条流。只能是两条流。 - Union
DataStream -> DataStream : 对两个或者两个以上的DataStream 进行 union 操作,产生一个包含所有DataStream 元素的新DataStream.
要求:当前合并的多条流,必须是同样的数据类型。