作为一名初学者来说,Flink 的各种API着实使人头晕乱象,建以这种情况,今天总结下:热门商品的统计。接下来我们先看下数据源的格式(这里为了方便我们直接网上下载公开数据即可-> wget https://raw.githubusercontent.com/wuchong/my-flink-project/master/src/main/resources/UserBehavior.csv):
用户ID,商品ID,商品类目ID,行为类型,时间戳
543462,1715,1464116,pv,1511658000
662867,2244074,1575622,pv,1511658000
561558,3611281,965809,pv,1511658000
894923,3076029,1879194,pv,1511658000
834377,4541270,3738615,pv,1511658000
315321,942195,4339722,pv,1511658000
先明确下具体的需求:按一小时的窗口大小,每5分钟统计一次《做滑动窗口聚合 - Sliding Window》,按每个窗口聚合,输出每个窗口中点击量前N名的商品。
废话不多说,既然需求和数据源已确定,那么接下来直接上代码实践:FlinkSQL 和 Stateful Stream Processing
一、FlinkSQL 的实践
1.创建实现类 - AnalysisHotItemSQL.java,整体代码如下:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStream<String> inputStream = env.readTextFile("C:\\Users\\Administrator\\Desktop\\my-gitlib\\shishi-daping\\dip\\shishi-daping\\NFDWSYYBigScreen\\TestJsonDmon\\src\\main\\resources\\UserBehavior.csv");
DataStream<UserBehavior> dataStream = inputStream.map(new MapFunction<String, UserBehavior>() {
@Override
public UserBehavior map(String s) throws Exception {
String[] dataArray = s.split(",");
return new UserBehavior(Long.parseLong(dataArray[0]),Long.parseLong(dataArray[1]),Integer.parseInt(dataArray[2]),dataArray[3],Long.parseLong(dataArray[4]));
}
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserBehavior>(Time.seconds(1)) {
@Override
public long extractTimestamp(UserBehavior element) {
return element.getTimestamp()*1000L;
}
});
// --------------- 开始区 ---------------------------------------------
// 具体代码实现
// --------------- 结束区 ------------------------------------------------
env.execute("Top PV");
}
2.具体逻辑代码实现如下:
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
// 创建临时表
tableEnv.createTemporaryView("UserBehavior", dataStream,"itemId,behavior,timestamp.rowtime as ts");
String sql = ("select * from (select *,row_number() over(partition by windowEnd order by cnt desc) as row_num from(select itemId, count(itemId) as cnt, hop_end(ts, interval '5' minute, interval '1' hour) as windowEnd from " +
"UserBehavior where behavior = 'pv' group by itemId, hop(ts, interval '5' minute, interval '1' hour))) where row_num <= 5").trim();
Table topNResultTable = tableEnv.sqlQuery(sql);
DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnv.toRetractStream(topNResultTable, Row.class);
tuple2DataStream.print();
3.输出结果如下:从整体代码结构来看确实是很精简,只要创建临时表我们就可以用 SQL 来实现了,但是由于 SQL 默认是用一个状态进程存储的,因此所耗的资源自然会大一些,还有就是 SQL 的实现只能输出满足条件的数据,不能输出多余的数据,这自然会不满足某些苛刻的需求;那么如何避免这种情况呢?当然可以实现,那就是用底层的API进行开发。
二、Stateful Stream Processing 的实践
1.首先我们先写下逻辑的实现代码:
SingleOutputStreamOperator<String> singleOutputStream = dataStream.filter(ub -> ub.getBehavior().equals("pv"))
// 1. 先按窗口进行统计
.keyBy(ub -> ub.getItemId())
.timeWindow(Time.hours(1), Time.minutes(5))
.aggregate(new CountAgg(), new WindowResult())
// 2. 根据时间窗口的结束时间分组并排序
.keyBy(ub -> ub.getWindowEnd())
.process(new TopN(3));
singleOutputStream.print();
2.从逻辑代码可以看出第一步先做分组然后开窗,接下来使用 aggregate(AggregateFunction af, WindowFunction wf)做增量的聚合操作,它能使用AggregateFunction提前聚合掉数据,减少 state 的存储压力、第二部在根据窗口的结束日期进行分组然后再排序即可得到人们商品;为了方便操作聚合函数,我们先创建相应的实体类:
/** 源
* @author feiniu
* @create 2020-09-01 9:50
*/
public class UserBehavior {
private long userId; // 用户ID
private long itemId; // 商品ID
private int categoryId; // 商品类目ID
private String behavior; // 用户行为, 包括("pv", "buy", "cart", "fav")
private long timestamp; // 行为发生的时间戳,单位秒
......
}
/** 结果
* @author feiniu
* @create 2020-09-01 10:23
*/
public class ItemViewCount {
private Long itemId;
private Long windowEnd;
private Long count;
......
}
3.CountAgg 实现了 AggregateFunction接口,功能是统计窗口中的条数,即遇到一条数据就加一:
/** COUNT 统计的聚合函数实现,每出现一条记录加一
* in, acc, out
* */
public static class CountAgg implements AggregateFunction<UserBehavior, Long, Long>{
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(UserBehavior userBehavior, Long aLong) {
return aLong + 1;
}
@Override
public Long getResult(Long aLong) {
return aLong;
}
@Override
public Long merge(Long aLong, Long acc1) {
return aLong + acc1;
}
}
- aggregate(AggregateFunction af, WindowFunction wf) 的第二个参数WindowFunction 将每个 key 每个窗口聚合后的结果带上其他信息进行输出。我们这里实现的 WindowResult 将主键商品ID,窗口,点击量封装成了ItemViewCount进行输出:
/** 用于输出窗口的结果
* in, out, key, window
* */
public static class WindowResult implements WindowFunction<Long, ItemViewCount, Long, TimeWindow> {
/**
* 窗口的主键,即 itemId
* 窗口
* 聚合函数的结果,即 count 值
* 输出类型为 ItemViewCount
*/
@Override
public void apply(Long aLong, TimeWindow timeWindow, Iterable<Long> iterable, Collector<ItemViewCount> collector) throws Exception {
collector.collect(new ItemViewCount(aLong, timeWindow.getEnd(), iterable.iterator().next()));
}
}
目前为止我们得到了每个商品在每个窗口的点击量的数据流。
- TopN 计算最热门商品 - 我们实现了逻辑代码中的第二部分求出前3的热门商品,我们使用 ProcessFunction进行操作,它是 Flink 提供的一个 low-level API,用于实现更高级的功能:
/** 求某个窗口中前 N 名的热门点击商品,key 为窗口时间戳,输出为 TopN 的结果字符串
* K , I , O
* */
public static class TopN extends KeyedProcessFunction<Long, ItemViewCount, String>{
private final int n;
public TopN(int n) {
this.n = n;
}
// 用于存储商品与点击数的状态,待收齐同一个窗口的数据后,再触发 TopN 计算
private transient ListState<ItemViewCount> itemState = null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 状态的注册
ListStateDescriptor<ItemViewCount> itemsStateDesc = new ListStateDescriptor<>(
"itemState-state",
ItemViewCount.class);
itemState = getRuntimeContext().getListState(itemsStateDesc);
}
@Override
public void processElement(ItemViewCount itemViewCount, Context context, Collector<String> collector) throws Exception {
// 每条数据都保存到状态中
this.itemState.add(itemViewCount);
// 注册 windowEnd + 1 的 EventTime Timer, 当触发时,说明收齐了属于windowEnd窗口的所有商品数据
context.timerService().registerEventTimeTimer(itemViewCount.getWindowEnd() + 1);
}
// -------------------------------------------
// 定时器的代码实现
// -------------------------------------------
}
- 定时器的代码实现如下:
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
super.onTimer(timestamp, ctx, out);
// 获取收到的所有商品点击量
List<ItemViewCount> allItems = new ArrayList<>();
for (ItemViewCount item : itemState.get()) {
allItems.add(item);
}
// 提前清除状态中的数据,释放空间
itemState.clear();
// 按照点击量从大到小排序
allItems.sort(new Comparator<ItemViewCount>() {
@Override
public int compare(ItemViewCount o1, ItemViewCount o2) {
return (int) (o2.getCount() - o1.getCount());
}
});
// 将排名信息格式化成 String, 便于打印
StringBuilder result = new StringBuilder();
result.append("====================================\n");
result.append("时间: ").append(new Timestamp(timestamp-1)).append("\n");
for (int i = 0; i < n; i++) {
ItemViewCount currentItem = allItems.get(i);
result.append("No").append(i).append(":")
.append(" 商品ID=").append(currentItem.getItemId())
.append(" 浏览量=").append(currentItem.getCount())
.append("\n");
}
result.append("====================================\n\n");
out.collect(result.toString());
}
- 运行结果如下:
到此,我们用两种方式实现了热门商品的操作,不过使用哪种方式,适合自己才是最好的;这篇文章的写作之前我在网上查了好多资料,万变不离其中,代码基本上都是相近的,在此特别感谢 云邪 大佬的资料参考:《https://mp.weixin.qq.com/s?__biz=MzUxNjkzMzc0MA==&mid=2247483698&idx=1&sn=f1e6e7c44274af25da70a239eb47a48d&scene=19#wechat_redirect》
欢迎各位同学留言讨论,共同进步。