使用场景:
在处理数据的时候,有些配置是要实时动态改变的,比如说我要过滤一些关键字,这些关键字呢是在MYSQL里随时配置修改的,那我们在高吞吐计算的Function中动态查询配置文件有可能使整个计算阻塞,甚至任务停止。
广播流可以通过查询配置文件,广播到某个 operator 的所有并发实例中,然后与另一条流数据连接进行计算。
实现步骤:
1、定义一个MapStateDescriptor来描述我们要广播的数据的格式
final MapStateDescriptor<String, String> CONFIG_DESCRIPTOR = new MapStateDescriptor<>(
"wordsConfig",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);
2、需要一个Stream来广播下游的operator
我这里实现了一个只有1个并发度的数据源,定时查配置文件,发动到下游
public class MinuteBroadcastSource extends RichParallelSourceFunction<String> {
private volatile boolean isRun;
private volatile int lastUpdateMin = -1;
private R2mClusterClient redisDao;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
isRun = true;
}
@Override
public void run(SourceContext<String> ctx) throws Exception {
while(isRun){
LocalDateTime date = LocalDateTime.now();
int min = date.getMinute();
if(min != lastUpdateMin){
lastUpdateMin = min;
Set<String> configs = readConfigs();
if(configs != null && configs.size() > 0){
for(String config : configs){
ctx.collect(config);
}
}
}
Thread.sleep(1000);
}
}
private Set<String> readConfigs(){
//这里读取配置信息
return null;
}
@Override
public void cancel() {
isRun = false;
}
}
3、添加数据源并把数据源注册成广播流
BroadcastStream<String> broadcastStream = env.addSource(new MinuteBroadcastSource()).setParallelism(1).broadcast(CONFIG_DESCRIPTOR);
4、连接广播流和处理数据的流
DataStream<SkuOrder> connectedStream = orderStream.connect(broadcastStream).process(new BroadcastProcessFunction<Order, String, Order>(){
@Override
public void processElement(Order order, ReadOnlyContext ctx, Collector<Order> collector) throws Exception {
HeapBroadcastState<String,String> config = (HeapBroadcastState)ctx.getBroadcastState(CONFIG_DESCRIPTOR);
Iterator<Map.Entry<String, String>> iterator = config.iterator();
while (iterator.hasNext()){
Map.Entry<String, String> entry =iterator.next();
logger.info("all config:" + entry.getKey() + " value:" + entry.getValue());
}
collector.collect(order);
}
@Override
public void processBroadcastElement(String s, Context ctx, Collector<SkuOrder> collector) throws Exception {
logger.info("收到广播:" + s);
BroadcastState<String,String> state = ctx.getBroadcastState(CONFIG_DESCRIPTOR);
ctx.getBroadcastState(CONFIG_DESCRIPTOR).put(s,"1");
}
});
需要注意到的问题:
1、数据源发送数据时候如果数据是集合,必须使用线程安全的集合类
2、如果上面的MinuteBroadcastSource并行度大于1,那么每一个JOB都会发一条广播,这样的话如果每个JOB一分钟发一次,那么processBroadcastElement会收到 并行度数 * n条消息
3、获取到的BroadcastState是一个map,相同的KEY,put进去会覆盖掉