思路:读入数据时,用flatMap算子过滤出PV(即PageVisit)的一条条的数据,在process算子中,用set对用户的id作去重,从而set的size即UV(UserVisit)数。
原始数据:
图片.png
实体类:
图片.png
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.util.HashSet;
public class UV {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(3);
DataStreamSource<String> source = env.readTextFile("input/UserBehavior.csv");
SingleOutputStreamOperator<UserBehavior> stream = source.flatMap(new FlatMapFunction<String, UserBehavior>() {
@Override
public void flatMap(String line, Collector<UserBehavior> out) throws Exception {
String[] arr = line.split(",");
UserBehavior userBehavior = new UserBehavior(
Long.valueOf(arr[0]),
Long.valueOf(arr[1]),
Integer.valueOf(arr[2]),
arr[3],
Long.valueOf(arr[4]));
if ("pv".equals(userBehavior.getBehavior()))
out.collect(userBehavior);
}
});
stream.keyBy(UserBehavior::getBehavior)
.process(new KeyedProcessFunction<String, UserBehavior, Long>() {
HashSet<Long> set = new HashSet<>();
@Override
public void processElement(UserBehavior value, Context ctx, Collector<Long> out) throws Exception {
set.add(value.getUserId());
out.collect((long) set.size());
}
}).print();
System.out.println("~~~~");
env.execute();
}
}
图片.png
增加去重逻辑:
图片.png