Flink-SQL如何连接外部资源

Flink-SQL如何连接外部资源

最近项目中需要把FlinkSQL对标SparkSQL做一套可视化页面,但网上针对Flink的相关博客很少,官网的例子给的也不太全。目前大多数人给的结论就是现在Flink Table/SQL的功能还不稳定,都在等待阿里的Blink和Flink合并后在使用。这篇我想用现在最新发行版1.7.2给大家点参考demo。

目前官网支持的Connectors

官网支持的连接器和对应依赖

**注意 :目前1.72版本的CSV只支持批处理不支持流处理,如果流处理的环境使用会报下面的错误。
Exception in thread "main"org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.

Reason: No context matches.
**

编程范式

tableEnvironment
  .connect(...)      //  需要传入继承ConnectorDescriptor的实现类 eg/ Kafka,FileSystem
  .withFormat(...)  // 需要传入继承FormatDescriptor的实现类 eg/ Json,Avor,Csv 
  .withSchema(...)  // 需要传入new Schema() 这里边的Schema是FIink注册表的字段和类型
  .inAppendMode()  
  /** 支持三种格式
    inAppendMode(只支持动态表的insert过来的数据)
    inRetractMode (支持动态刷新数据表 包括update 和 delete,但性能会受影响)
    inUpsertMode (因为操作的是单条数据,所以性能高于inRetractMode)
**/
  .registerTableSource("MyTable") // 注册的表名称

Example1 Kafka连接器 JSON -> CSV

package mqz.connector;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.types.Row;

/**
 * @author maqingze
 * @version v1.0
 * @date 2019/3/7 11:24
 */
public class KafkaConnectorFormatJSON2CSV {
    private final static String SOURCE_TOPIC = "source";
    private final static String SINK_TOPIC = "sink";
    private final static String ZOOKEEPER_CONNECT = "hadoop003:2181,hadoop004:2181";
    private final static String GROUP_ID = "group1";
    private final static String METADATA_BROKER_LIST = "hadoop003:9092,hadoop004:9092";

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
        tEnv.connect(
            new Kafka()
                .version("0.10")
                .topic(SOURCE_TOPIC)
                .startFromEarliest()
                .property("zookeeper.connect", ZOOKEEPER_CONNECT)
                .property("bootstrap.servers", METADATA_BROKER_LIST)
        )
            .withFormat(
            new Json()
                .schema(
                    org.apache.flink.table.api.Types.ROW(
                        new String[]{"id", "product", "amount"},
                        new TypeInformation[]{
                            org.apache.flink.table.api.Types.LONG()
                            , org.apache.flink.table.api.Types.STRING()
                            , org.apache.flink.table.api.Types.INT()
                        }))
                .failOnMissingField(true)   // optional: flag whether to fail if a field is missing or not, false by default
            )
            .withSchema(
                new Schema()
                    .field("id", Types.LONG)
                    .field("product", Types.STRING)
                    .field("amount", Types.INT)
            )
            .inAppendMode()
            .registerTableSource("sourceTable");

        Table result = tEnv.sqlQuery("select * from sourceTable ");

        DataStream<Row> rowDataStream = tEnv.toAppendStream(result, Row.class);

        rowDataStream.print();

        CsvTableSink sink = new CsvTableSink(
            "D:\\Aupload\\flink\\sink.csv",                  // 输出路径
            "|",                   // 字段分隔符
            1,                     // 写入的文件个数
            FileSystem.WriteMode.OVERWRITE);  // 是否覆盖原文件 还有NO_OVERWRITE模式

        tEnv.registerTableSink(
            "csvOutputTable",
            new String[]{"f0", "f1", "f2"},
            new TypeInformation[]{Types.LONG, Types.STRING, Types.INT},
            sink);

        result.insertInto("csvOutputTable");

        env.execute(" tesst kafka connector demo");

    }

}

Example2 Kafka连接器 JSON -> JSON

package mqz.connector;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.types.Row;

/**
 * @author maqingze
 * @version v1.0
 * @date 2019/3/7 11:24
 */
public class KafkaConnectorFormatJSON2JSON {
    private final static String SOURCE_TOPIC = "source";
    private final static String SINK_TOPIC = "sink";
    private final static String ZOOKEEPER_CONNECT = "hadoop003:2181,hadoop004:2181";
    private final static String GROUP_ID = "group1";
    private final static String METADATA_BROKER_LIST = "hadoop003:9092,hadoop004:9092";

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
        tEnv.connect(
            new Kafka()
                .version("0.10")
                .topic(SOURCE_TOPIC)
                .startFromEarliest()
                .property("zookeeper.connect", ZOOKEEPER_CONNECT)
                .property("bootstrap.servers", METADATA_BROKER_LIST)
        )
            .withFormat(
                new Json()
                    .schema(
                        org.apache.flink.table.api.Types.ROW(
                            new String[]{"id", "product", "amount"},
                            new TypeInformation[]{
                                org.apache.flink.table.api.Types.LONG()
                                , org.apache.flink.table.api.Types.STRING()
                                , org.apache.flink.table.api.Types.INT()
                            }))
                    .failOnMissingField(true)   // optional: flag whether to fail if a field is missing or not, false by default
            )
            .withSchema(
                new Schema()
                    .field("id", Types.LONG)
                    .field("product", Types.STRING)
                    .field("amount", Types.INT)
            )
            .inAppendMode()
            .registerTableSource("sourceTable");

        tEnv.connect(
            new Kafka()
                .version("0.10")    // required: valid connector versions are
                //   "0.8", "0.9", "0.10", "0.11", and "universal"
                .topic(SINK_TOPIC)       // required: topic name from which the table is read
                // optional: connector specific properties
                .property("zookeeper.connect", ZOOKEEPER_CONNECT)
                .property("bootstrap.servers", METADATA_BROKER_LIST)
                .property("group.id", GROUP_ID)
                // optional: select a startup mode for Kafka offsets
                .startFromEarliest()
                .sinkPartitionerFixed()         // each Flink partition ends up in at-most one Kafka partition (default)
        ).withFormat(
            new Json()
                .schema(
                    org.apache.flink.table.api.Types.ROW(
                        new String[]{"yid", "yproduct", "yamount"},
                        new TypeInformation[]{
                            org.apache.flink.table.api.Types.LONG()
                            , org.apache.flink.table.api.Types.STRING()
                            , org.apache.flink.table.api.Types.INT()
                        }))
                .failOnMissingField(true)   // optional: flag whether to fail if a field is missing or not, false by default
        )
            .withSchema(
                new Schema()
                    .field("id", Types.LONG)
                    .field("product", Types.STRING)
                    .field("amount", Types.INT)
            )
            .inAppendMode()
            .registerTableSink("sinkTable");


        tEnv.sqlUpdate("insert into sinkTable(id,product,amount) select id,product,33 from sourceTable  ");

        env.execute(" tesst kafka connector demo");

    }

}


项目全代码

GitHub

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,948评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,371评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,490评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,521评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,627评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,842评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,997评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,741评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,203评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,534评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,673评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,339评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,955评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,770评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,000评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,394评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,562评论 2 349

推荐阅读更多精彩内容