1.APP市场推广统计
对用户渠道来源进行统计
public class MarketingUserBehavior {
private Long userId;
private String behavior;
private String channel;//渠道
private Long timestamp;
@Override
public String toString() {
return "MarketingUserBehavior{" +
"userId=" + userId +
", behavior='" + behavior + '\'' +
", channel='" + channel + '\'' +
", timestamp=" + timestamp +
'}';
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
public String getBehavior() {
return behavior;
}
public void setBehavior(String behavior) {
this.behavior = behavior;
}
public String getChannel() {
return channel;
}
public void setChannel(String channel) {
this.channel = channel;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
public MarketingUserBehavior(Long userId, String behavior, String channel, Long timestamp) {
this.userId = userId;
this.behavior = behavior;
this.channel = channel;
this.timestamp = timestamp;
}
public MarketingUserBehavior() {
}
}
public class ChannelPromotionCount {
private String channel;
private String behavior;
private String windowEnd;
private Long count;
@Override
public String toString() {
return "ChannelPromotionCount{" +
"channel='" + channel + '\'' +
", behavior='" + behavior + '\'' +
", windowEnd='" + windowEnd + '\'' +
", count=" + count +
'}';
}
public String getChannel() {
return channel;
}
public void setChannel(String channel) {
this.channel = channel;
}
public String getBehavior() {
return behavior;
}
public void setBehavior(String behavior) {
this.behavior = behavior;
}
public String getWindowEnd() {
return windowEnd;
}
public void setWindowEnd(String windowEnd) {
this.windowEnd = windowEnd;
}
public Long getCount() {
return count;
}
public void setCount(Long count) {
this.count = count;
}
public ChannelPromotionCount(String channel, String behavior, String windowEnd, Long count) {
this.channel = channel;
this.behavior = behavior;
this.windowEnd = windowEnd;
this.count = count;
}
public ChannelPromotionCount() {
}
}
public class AppMarketingByChannel {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
//从自定义数据源中读取数据
SingleOutputStreamOperator<MarketingUserBehavior> operator = env.addSource(new SourceFunction<MarketingUserBehavior>() {
Boolean flag = true;
Random random = new Random();
// 定义用户行为和渠道的范围
List<String> behaviorList = Arrays.asList("CLICK", "DOWNLOAD", "INSTALL", "UNINSTALL");
List<String> channelList = Arrays.asList("app store", "wechat", "weibo");
@Override
public void run(SourceContext<MarketingUserBehavior> ctx) throws Exception {
while (flag) {
long userid = random.nextLong();
String behavior = behaviorList.get(random.nextInt(behaviorList.size()));
String channel = channelList.get(random.nextInt(channelList.size()));
long timeMillis = System.currentTimeMillis();
ctx.collect(new MarketingUserBehavior(userid, behavior, channel, timeMillis));
Thread.sleep(101L);
}
}
@Override
public void cancel() {
flag = false;
}
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MarketingUserBehavior>() {
@Override
public long extractAscendingTimestamp(MarketingUserBehavior element) {
return element.getTimestamp();
}
});
operator.filter(data -> !"UNINSTALL".equals(data.getBehavior()))
.keyBy("behavior", "channel")
.timeWindow(Time.minutes(1L), Time.seconds(5L))
.aggregate(new AggregateFunction<MarketingUserBehavior, Long, Long>() {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(MarketingUserBehavior marketingUserBehavior, Long aLong) {
return aLong + 1;
}
@Override
public Long getResult(Long aLong) {
return aLong;
}
@Override
public Long merge(Long aLong, Long acc1) {
return null;
}
}, new ProcessWindowFunction<Long, ChannelPromotionCount, Tuple, TimeWindow>() {
@Override
public void process(Tuple tuple, Context context, Iterable<Long> elements, Collector<ChannelPromotionCount> out) throws Exception {
String channel = tuple.getField(1);
String behavior = tuple.getField(0);
String windowEnd = new Timestamp(context.window().getEnd()).toString();
Long count = elements.iterator().next();
out.collect(new ChannelPromotionCount(channel, behavior, windowEnd, count));
}
}).print();
env.execute("job");
}
}
2.广告点击量统计(黑名单过滤)
思路:
1.对用户的点击事件分析
2.如果某个用户点击次数超过指定次数,那么这个用户就是黑名单用户,使用侧输出流进行报警
3.使用两个状态来实现,一个存储count值,一个用来判断是否是黑名单用户,但是不清空状态数据就会一直增加,使用定时器来定时清空状态的数据(判断是否是第一个数据,如果是的话,注册一个第二天0点的定时器)
pojo
AdClickEvent
private Long userId;
private Long adId;
private String province;
private String city;
private Long timestamp;
BlackListUserWarning
private Long userId;
private Long adId;
private String warningMsg;
public class AdStatisticsByProvince {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1. 从文件中读取数据
URL resource = AdStatisticsByProvince.class.getResource("/AdClickLog.csv");
DataStream<AdClickEvent> adClickEventDataStream = env.readTextFile(resource.getPath())
.map(line -> {
String[] fields = line.split(",");
return new AdClickEvent(new Long(fields[0]), new Long(fields[1]), fields[2], fields[3], new Long(fields[4]));
})
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksAdapter.Strategy<>(
new BoundedOutOfOrdernessTimestampExtractor<AdClickEvent>(Time.of(200, TimeUnit.MILLISECONDS)) {
@Override
public long extractTimestamp(AdClickEvent element) {
return element.getTimestamp() * 1000L;
}
}
));
// 2. 对同一个用户点击同一个广告的行为进行检测报警
SingleOutputStreamOperator<AdClickEvent> filterAdClickStream = adClickEventDataStream
.keyBy(new KeySelector<AdClickEvent, Tuple2<Long, Long>>() {
@Override
public Tuple2<Long, Long> getKey(AdClickEvent value) throws Exception {
return new Tuple2<>(value.getUserId(), value.getAdId());
}
})
.process(new FilterBlackListUser(100));
// 3. 基于省份分组,开窗聚合
DataStream<AdCountViewByProvince> adCountResultStream = filterAdClickStream
.keyBy(AdClickEvent::getProvince)
// 定义滑窗,5min输出一次
.window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)))
.aggregate(new AdCountAgg(), new AdCountResult());
adCountResultStream.print();
filterAdClickStream
.getSideOutput(new OutputTag<BlackListUserWarning>("blacklist"){})
.print("blacklist-user");
env.execute("ad count by province job");
}
public static class AdCountAgg implements AggregateFunction<AdClickEvent, Long, Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(AdClickEvent value, Long accumulator) {
return accumulator + 1;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return a + b;
}
}
public static class AdCountResult implements WindowFunction<Long, AdCountViewByProvince, String, TimeWindow> {
@Override
public void apply(String province, TimeWindow window, Iterable<Long> input, Collector<AdCountViewByProvince> out) throws Exception {
String windowEnd = new Timestamp(window.getEnd()).toString();
Long count = input.iterator().next();
out.collect(new AdCountViewByProvince(province, windowEnd, count));
}
}
// 实现自定义处理函数
public static class FilterBlackListUser extends KeyedProcessFunction<Tuple2<Long, Long>, AdClickEvent, AdClickEvent> {
// 定义属性:点击次数上线
private Integer countUpperBound;
public FilterBlackListUser(Integer countUpperBound) {
this.countUpperBound = countUpperBound;
}
// 定义状态,保存当前用户对某一广告的点击次数
ValueState<Long> countState;
// 定义一个标志状态,保存当前用户是否已经被发送到了黑名单里
ValueState<Boolean> isSentState;
@Override
public void open(Configuration parameters) throws Exception {
countState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("ad-count", Long.class));
isSentState = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("is-sent", Boolean.class));
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<AdClickEvent> out) throws Exception {
// 清空所有状态
countState.clear();
isSentState.clear();
}
@Override
public void processElement(AdClickEvent value, Context ctx, Collector<AdClickEvent> out) throws Exception {
// 判断当前用户对同一广告的点击次数,如果不够上限,该count加1正常输出;
// 如果到达上限,直接过滤掉,并侧输出流输出黑名单报警
// 首先获取当前count值
Long curCount = countState.value();
Boolean isSent = isSentState.value();
if(null == curCount){
curCount = 0L;
}
if(null == isSent){
isSent = false;
}
// 1. 判断是否是第一个数据,如果是的话,注册一个第二天0点的定时器
if (curCount == 0) {
long ts = ctx.timerService().currentProcessingTime();
long fixedTime = DateUtils.addDays(new Date(ts), 1).getTime();
ctx.timerService().registerProcessingTimeTimer(fixedTime);
}
// 2. 判断是否报警
if (curCount >= countUpperBound) {
// 判断是否输出到黑名单过,如果没有的话就输出到侧输出流
if (!isSent) {
isSentState.update(true);
ctx.output(new OutputTag<BlackListUserWarning>("blacklist"){},
new BlackListUserWarning(value.getUserId(), value.getAdId(), "click over " + countUpperBound + "times."));
}
// 不再进行下面操作
return;
}
// 如果没有返回,点击次数加1,更新状态,正常输出当前数据到主流
countState.update(curCount + 1);
out.collect(value);
}
}
}