基于flink 1.9.1 扩展
场景
公司基于flink sql 二次开发了etl 功能。
其中有一个场景是要根据输入,实时预览sql执行后的结果。
比如有这样一条sql
select cast(order_id as bigint),
str_to_timestamp('2017-11-01 23:59:55','yyyy-MM-dd HH:mm:ss'),
false,
is_exist('order_id'),
to_base64('hello'),
['3','4'],
__baas__all__
from order_source
说明:
- order_source是 kafka中流表
- ['3','4'] 是我扩展flink sql后定义数组的方式,对应原生Array['3','4']
- 输出的数据同样返回到kafka中
有以下几种方案可以实现
- 本地启动flink,实际执行
- 解析sql,自己实现
第一种方式 开销太大,即使是以local模式启动也不能接受
第二种方式 与flink sql内部实现不一致,到时候数据输出也会不一致
我当时想如果能够在一个方法中过一遍flink 的operator就好了,这样免除了启动flink 各种组件的开销,又能保持数据一致。说做就做
设计与实现如下
- flink 内部使用calcite 解析sql,生成ast,然后validate->optimatize。优化后的语法树,就用经典的火山模型老一套,生成Transformations,也就是DAG。
Planner planner = tableEnv.getPlanner();
StreamPlanner streamPlanner = (StreamPlanner) planner;
//反射获取private 字段
List<ModifyOperation> bufferedModifyOperations = BeanUtil.getField(TableEnvironmentImpl.class, tableEnv, "bufferedModifyOperations");
List<Transformation<?>> translate = streamPlanner.translate(bufferedModifyOperations);
flink内部做了一些转换,我们调用tableEnv.sqlQuery 的时候,生成Operation缓存起来,execute时候再生成Transformation。
通过上述代码我们可以获取Transformation。
- 从source到sink 排列transform,假设只有一个 sink,取数组第一个元素。
这种火山模型的转换,默认是从sink执行execute,不断获取input,我们用栈
Transformation next = translate.get(0);
LinkedList<Transformation> linkedList = new LinkedList<>();
while (next != null) {
linkedList.push(next);
if (next instanceof SinkTransformation) {
next = ((SinkTransformation<?>) next).getInput();
} else if (next instanceof OneInputTransformation) {
next = ((OneInputTransformation<?, ?>) next).getInput();
} else if (next instanceof SourceTransformation) {
next = null;
} else {
//不支持TwoInput,需要可以自己扩展
throw new UnsupportedOperationException();
}
}