一文搞懂 FlinkSQL 的 KafkaSource

背景

前面我们了解了 写给大忙人看的Flink 消费 Kafka,今天我们一起来看一下 FlinkSQL Kafka 是如何与 Flink Streaming Kafka 结合起来的

正文

创建 kafka source

CREATE TABLE orders
(
    status      int,
    courier_id  bigint,
    id          bigint,
    finish_time BIGINT,
    place_time  BIGINT,
    PRIMARY KEY (id) NOT ENFORCED
)
    WITH (
        'connector' = 'kafka','topic' = 'test',
        'properties.bootstrap.servers' = 'xxx','properties.group.id' = 'testGroup',
        'format' = 'ss-canal-json','ss-canal-json.table.include' = 'orders','scan.startup.mode' = 'latest-offset');

经过 Apache Calcite 的一系列转化( 具体转化的过程后续会写 ),最终达到 CatalogSourceTable 类,此类继承自 FlinkPreparingTableBase,负责将 Calcite 的 RelOptTable 转化为 Flink 的 TableSourceTable

@Override
    //入口方法  SqlToRelConverter toRel 方法
    public RelNode toRel(ToRelContext toRelContext) {
        final RelOptCluster cluster = toRelContext.getCluster();
        final List<RelHint> hints = toRelContext.getTableHints();// sql Hint
        final FlinkContext context = ShortcutUtils.unwrapContext(cluster);
        final FlinkTypeFactory typeFactory = ShortcutUtils.unwrapTypeFactory(cluster);
        final FlinkRelBuilder relBuilder = FlinkRelBuilder.of(cluster, relOptSchema);

        // 0. finalize catalog table
        final Map<String, String> hintedOptions = FlinkHints.getHintedOptions(hints);
        final CatalogTable catalogTable = createFinalCatalogTable(context, hintedOptions);

        // 1. create and prepare table source
        final DynamicTableSource tableSource = createDynamicTableSource(context, catalogTable);
        prepareDynamicSource(
                schemaTable.getTableIdentifier(),
                catalogTable,
                tableSource,
                schemaTable.isStreamingMode(),
                context.getTableConfig());

        // 2. push table scan
        pushTableScan(relBuilder, cluster, catalogTable, tableSource, typeFactory, hints);

        // 3. push project for non-physical columns
        final TableSchema schema = catalogTable.getSchema();
        if (!TableSchemaUtils.containsPhysicalColumnsOnly(schema)) {
            pushMetadataProjection(relBuilder, typeFactory, schema);
            pushGeneratedProjection(context, relBuilder, schema);
        }

        // 4. push watermark assigner
        if (schemaTable.isStreamingMode() && !schema.getWatermarkSpecs().isEmpty()) {
            pushWatermarkAssigner(context, relBuilder, schema);
        }

        return relBuilder.build();
    }

0-4 转化完成。这篇 blog 主要关心部分是 1 ,我们继续追踪到 FactoryUtil.createTableSource 方法

public static DynamicTableSource createTableSource(
            @Nullable Catalog catalog, //GenericlnMemoryCatalog
            ObjectIdentifier objectIdentifier,//`default_catalog`.`default_database`.`orders`
            CatalogTable catalogTable,//CatalogTableImpl
            ReadableConfig configuration,
            ClassLoader classLoader,
            boolean isTemporary) {
        final DefaultDynamicTableContext context =
                new DefaultDynamicTableContext(
                        objectIdentifier, catalogTable, configuration, classLoader, isTemporary);
        try {
            final DynamicTableSourceFactory factory = // 找到 KafkaDynamicTableFactory
                    getDynamicTableFactory(DynamicTableSourceFactory.class, catalog, context);
            return factory.createDynamicTableSource(context);
        } catch (Throwable t) {
            throw new ValidationException(
                    String.format(
                            "Unable to create a source for reading table '%s'.\n\n"
                                    + "Table options are:\n\n"
                                    + "%s",
                            objectIdentifier.asSummaryString(),
                            catalogTable.getOptions().entrySet().stream()
                                    .map(e -> stringifyOption(e.getKey(), e.getValue()))
                                    .sorted()
                                    .collect(Collectors.joining("\n"))),
                    t);
        }
    }

我们到 KafkaDynamicTableFactory 的 createDynamicTableSource 方法

@Override
    public DynamicTableSource createDynamicTableSource(Context context) {
        final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);

        final ReadableConfig tableOptions = helper.getOptions();//with 里的配置信息

        // 通过 format (SPI)
        final Optional<DecodingFormat<DeserializationSchema<RowData>>> keyDecodingFormat =
                getKeyDecodingFormat(helper);

        final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat =//SSCanalJsonFormatFactory
                getValueDecodingFormat(helper);

        // 一些类的校验 validate
        helper.validateExcept(PROPERTIES_PREFIX);

        validateTableSourceOptions(tableOptions);

        validatePKConstraints(
                context.getObjectIdentifier(), context.getCatalogTable(), valueDecodingFormat);

        final StartupOptions startupOptions = getStartupOptions(tableOptions);

        //获取 kafka 本身的一些配置 servers、group.id 等
        final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions());

        // add topic-partition discovery
        properties.setProperty(
                FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
                String.valueOf(
                        tableOptions
                                .getOptional(SCAN_TOPIC_PARTITION_DISCOVERY)
                                .map(Duration::toMillis)
                                .orElse(FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED)));

        final DataType physicalDataType =//ROW<`status` INT, `courier_id` BIGINT, `id` BIGINT, `finish_time` BIGINT> NOT NULL
                context.getCatalogTable().getSchema().toPhysicalRowDataType();

        final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType);

        final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType);

        final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);

        return createKafkaTableSource(
                physicalDataType,
                keyDecodingFormat.orElse(null),
                valueDecodingFormat,
                keyProjection,
                valueProjection,
                keyPrefix,
                KafkaOptions.getSourceTopics(tableOptions),
                KafkaOptions.getSourceTopicPattern(tableOptions),
                properties,
                startupOptions.startupMode,
                startupOptions.specificOffsets,
                startupOptions.startupTimestampMillis);
    }

首先做了一些校验,然后传入一些配置来创建 tableSource ,如下

protected KafkaDynamicSource createKafkaTableSource(
            DataType physicalDataType,//要查询的字段 ROW<`status` INT, `courier_id` BIGINT, `id` BIGINT, `finish_time` BIGINT> NOT NULL
            @Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
            DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,//SSCanalJsonFormatFactory
            int[] keyProjection,
            int[] valueProjection,
            @Nullable String keyPrefix,
            @Nullable List<String> topics,// topics
            @Nullable Pattern topicPattern,//topicPattern
            Properties properties,// kafka 的一些配置信息,servers、group.id 等
            StartupMode startupMode,
            Map<KafkaTopicPartition, Long> specificStartupOffsets,
            long startupTimestampMillis) {
        return new KafkaDynamicSource(
                physicalDataType,
                keyDecodingFormat,
                valueDecodingFormat,
                keyProjection,
                valueProjection,
                keyPrefix,
                topics,
                topicPattern,
                properties,
                startupMode,
                specificStartupOffsets,
                startupTimestampMillis,
                false);
    }

继续执行

 prepareDynamicSource(
                schemaTable.getTableIdentifier(),
                catalogTable,
                tableSource,
                schemaTable.isStreamingMode(),
                context.getTableConfig());

会调用 KafkaDynamicSource.getScanRuntimeProvider 方法,创建 FlinkKafkaConsumer 成功

其他

关于 'format' = 'ss-canal-json' 的一些事情可以参考 FlinkSQL 平台

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,029评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,395评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,570评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,535评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,650评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,850评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,006评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,747评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,207评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,536评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,683评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,342评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,964评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,772评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,004评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,401评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,566评论 2 349

推荐阅读更多精彩内容