Process Function(过程函数)
ProcessFunction
是一个低层次的流处理操作,允许返回所有(无环的)流程序的基础构建模块:
1、事件(event)(流元素)
2、状态(state)(容错性,一致性,仅在keyed stream中)
3、定时器(timers)(event time和processing time, 仅在keyed stream中)
ProcessFunction
可以认为是能够访问到keyed state和timers的FlatMapFunction
,输入流中接收到的每个事件都会调用它来处理。
对于容错性状态,ProcessFunction可以通过RuntimeContext来访问Flink的keyed state,方法与其他状态性函数访问keyed state一样。
定时器允许应用程序对processing time
和event time
的变化做出反应,每次对processElement(...)
的调用都会得到一个Context
对象,该对象允许访问元素事件时间的时间戳和TimeServer
。TimeServer
可以用来为尚未发生的event-time或者processing-time注册回调,当定时器的时间到达时,onTimer(...)方法会被调用。在这个调用期间,所有的状态都会限定到创建定时器的键,并允许定时器操纵键控状态(keyed states)。
注意:如果你想访问一个键控状态(keyed state)和定时器,你需要将ProcessFunction应用到一个键控流(keyed stream)中:
stream.keyBy(...).process(new MyProcessFunction())
低层次的Join(Low-Level Joins)
为了在两个输入流中实现低层次的操作,应用程序可以使用CoProcessFunction
,这个函数绑定了两个不同的输入流,并通过分别调用processElement1(...)
和processElement2(...)
来获取两个不同输入流中的记录。
实现一个低层次的join通常按下面的模式进行:
1、为一个输入源(或者两个都)创建一个状态(state)
2、在接收到输入源中的元素时更新状态(state)
3、在接收到另一个输入源的元素时,探查状态并产生连接结果
例如:你可能将客户数据连接到金融交易中,同时保存客户信息的状态,如果您关心在无序事件的情况下进行完整和确定性的连接,则当客户数据流的水印已经通过交易时,您可以使用计时器来评估和发布交易的连接。
例子
以下示例维护了每个key的计数,并且每一分钟发出一个没有更新key的key/count对。
1、count,key和last-modification-timestamp保存在ValueState中,该ValueState由key隐含地限定。
2、对于每条记录,ProcessFunction增加counter的计数并设置最后更新时间戳
3、该函数还会在未来一分钟内安排回调(基于event time)
4、在每次回调时,它会根据存储的count的最后更新时间来检查callback的事件时间,如果匹配的话,就会将key/count对发出来
注意:这个简单的例子可以通过session window来实现,我们这里使用ProcessFunction是为了说明它提供的基本模式。
Java 代码:
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
import org.apache.flink.util.Collector;
// 定义源数据流
DataStream<Tuple2<String, String>> stream = ...;
// 将 process function 应用到一个键控流(keyed stream)中
DataStream<Tuple2<String, Long>> result = stream
.keyBy(0)
.process(new CountWithTimeoutFunction());
/**
* The data type stored in the state
* state中保存的数据类型
*/
public class CountWithTimestamp {
public String key;
public long count;
public long lastModified;
}
/**
* The implementation of the ProcessFunction that maintains the count and timeouts
* ProcessFunction的实现,用来维护计数和超时
*/
public class CountWithTimeoutFunction extends ProcessFunction<Tuple2<String, String>, Tuple2<String, Long>> {
/** The state that is maintained by this process function */
/** process function维持的状态 */
private ValueState<CountWithTimestamp> state;
@Override
public void open(Configuration parameters) throws Exception {
state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
}
@Override
public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, Long>> out)
throws Exception {
// retrieve the current count
// 获取当前的count
CountWithTimestamp current = state.value();
if (current == null) {
current = new CountWithTimestamp();
current.key = value.f0;
}
// update the state's count
// 更新 state 的 count
current.count++;
// set the state's timestamp to the record's assigned event time timestamp
// 将state的时间戳设置为记录的分配事件时间戳
current.lastModified = ctx.timestamp();
// write the state back
// 将状态写回
state.update(current);
// schedule the next timer 60 seconds from the current event time
// 从当前事件时间开始计划下一个60秒的定时器
ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out)
throws Exception {
// get the state for the key that scheduled the timer
//获取计划定时器的key的状态
CountWithTimestamp result = state.value();
// 检查是否是过时的定时器或最新的定时器
if (timestamp == result.lastModified + 60000) {
// emit the state on timeout
out.collect(new Tuple2<String, Long>(result.key, result.count));
}
}
}
Scala 代码:
import org.apache.flink.api.common.state.ValueState
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.ProcessFunction.Context
import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext
import org.apache.flink.util.Collector
// 定义源数据流
val stream: DataStream[Tuple2[String, String]] = ...
// 将 process function 应用到一个键控流(keyed stream)中
val result: DataStream[Tuple2[String, Long]] = stream
.keyBy(0)
.process(new CountWithTimeoutFunction())
/**
* state中保存的数据类型
*/
case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
/**
* ProcessFunction的实现,用来维护计数和超时
*/
class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String, Long)] {
/** process function维持的状态 */
lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
.getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))
override def processElement(value: (String, String), ctx: Context, out: Collector[(String, Long)]): Unit = {
// 初始化或者获取/更新状态
val current: CountWithTimestamp = state.value match {
case null =>
CountWithTimestamp(value._1, 1, ctx.timestamp)
case CountWithTimestamp(key, count, lastModified) =>
CountWithTimestamp(key, count + 1, ctx.timestamp)
}
// 将状态写回
state.update(current)
// 从当前事件时间开始计划下一个60秒的定时器
ctx.timerService.registerEventTimeTimer(current.lastModified + 60000)
}
override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[(String, Long)]): Unit = {
state.value match {
case CountWithTimestamp(key, count, lastModified) if (timestamp == lastModified + 60000) =>
out.collect((key, count))
case _ =>
}
}
}