Flink学习笔记(3):Sink to JDBC

1. 前言

1.1 说明

本文通过一个Demo程序,演示Flink从Kafka中读取数据,并将数据以JDBC的方式持久化到关系型数据库中。通过本文,可以学习如何自定义Flink Sink和Flink Steaming编程的步骤。

1.2 软件版本

  • Centos 7.1
  • JDK 1.8
  • Flink 1.1.2
  • Kafka 0.10.0.1

1.3 依赖jar包

请将以下依赖放在pom.xml中。这里使用的关系型数据是PostgreSQL,也可以换成其它关系型数据库的驱动程序。

 <properties>
    <flink.version>1.1.2</flink.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.8_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>postgresql</groupId>
        <artifactId>postgresql</artifactId>
        <version>9.1-901-1.jdbc4</version>
    </dependency>
</dependencies>

2. 自定义Sink

2.1 内置的Streaming Connector

Flink 内置了一些Streaming Connector,用于和第三方的系统交互。截至到当前为止,Flink支持以下Connector。括号中的source代表数据从这些第三方系统中流入Flink中,sink代表数据从Flink流到这些第三方系统中。

  • Apache Kafka (sink/source)
  • Elasticsearch (sink)
  • Elasticsearch 2x (sink)
  • Hadoop FileSystem (sink)
  • RabbitMQ (sink/source)
  • Amazon Kinesis Streams (sink/source)
  • Twitter Streaming API (source)
  • Apache NiFi (sink/source)
  • Apache Cassandra (sink)
  • Redis (sink)

除此之外,Flink还允许我们自定义source和sink。本文所述例子是从Kafka中读取数据,并把数据写入数据库中;由于Flink已经内置了Kafka source,因此还需要自定义JDBC sink。

2.2 自定义JDBC sink

下面的代码就是一个JDBC sink的实现,其效果就是向PostgreSQL数据库中插入数据,具体请看代码中的注释说明。

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;


public class PostgreSQLSink extends RichSinkFunction<Tuple3<String,String,String>> {

    private static final long serialVersionUID = 1L;

    private Connection connection;
    private PreparedStatement preparedStatement;
    /**
     * open方法是初始化方法,会在invoke方法之前执行,执行一次。
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        // JDBC连接信息
        String USERNAME = "postgres" ;
        String PASSWORD = "********";
        String DRIVERNAME = "org.postgresql.Driver";
        String DBURL = "jdbc:postgresql://192.168.1.213/flink";
        // 加载JDBC驱动
        Class.forName(DRIVERNAME);
        // 获取数据库连接
        connection = DriverManager.getConnection(DBURL,USERNAME,PASSWORD);
        String sql = "insert into kafka_message(
                        timeseq, thread, message) values (?,?,?)";
        preparedStatement = connection.prepareStatement(sql);
        super.open(parameters);
    }

    /**
     * invoke()方法解析一个元组数据,并插入到数据库中。
     * @param data 输入的数据
     * @throws Exception
     */
    @Override
    public  void invoke(Tuple3<String,String,String> data) throws Exception{
        try {
            String timeseq = data.getField(0);
            String thread = data.getField(1);
            String message = data.getField(2);
            preparedStatement.setString(1,timeseq);
            preparedStatement.setString(2,thread);
            preparedStatement.setString(3,message);
            preparedStatement.executeUpdate();
        }catch (Exception e){
            e.printStackTrace();
        }

    };

    /**
     * close()是tear down的方法,在销毁时执行,关闭连接。
     */
    @Override
    public void close() throws Exception {
        if(preparedStatement != null){
            preparedStatement.close();
        }
        if(connection != null){
            connection.close();
        }
        super.close();
    }
}

3. Flink Streaming Job 编程

3.1 Flink Stream编程的步骤

Flink job 编程基本上都是由一些基本部分组成:

  1. 获得一个 execution environment
  2. 加载/创建初始数据(Source)
  3. 指定在该数据上进行的转换(Transformations)
  4. 指定计算结果的存储地方(Sink)
  5. 启动程序执行。

3.2 Kafka-Flink-DB

下面的代码,是一个Flink Job,从Kafka中读取消息,并把消息写到关系型数据库中。

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

public class KafkaToDB {

    public static void main(String[] args) throws Exception {
        // 解析参数
        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
        if (parameterTool.getNumberOfParameters() < 4) {
            System.out.println("Missing parameters!");
            System.out.println("\nUsage: Kafka --topic <topic> " +
                    "--bootstrap.servers <kafka brokers> "+
                    "--zookeeper.connect <zk quorum> --group.id <some id>");
            return;
        }

        // 获取StreamExecutionEnvironment。
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().disableSysoutLogging();
        env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
        // create a checkpoint every 5 secodns
        env.enableCheckpointing(5000); 
        // make parameters available in the web interface
        env.getConfig().setGlobalJobParameters(parameterTool); 

        // source
        DataStream<String> sourceStream = env.addSource(
                new FlinkKafkaConsumer08<String>(parameterTool.getRequired("topic"),
                        new SimpleStringSchema(), parameterTool.getProperties()));
        // Transformation,这里仅仅是过滤了null。
        DataStream<Tuple3<String, String, String>> messageStream = sourceStream
                .map(new InputMap())
                .filter(new NullFilter());
        //sink
        messageStream.addSink(new PostgreSQLSink());

        env.execute("Write into PostgreSQL");
    }
    
    // 过滤Null数据。
    public static class NullFilter implements FilterFunction<Tuple3<String, String, String>>{
        @Override
        public boolean filter(Tuple3<String, String, String> value) throws Exception {
            return value != null;
        }
    }
    
    // 对输入数据做map操作。
    public static class InputMap implements MapFunction<String, Tuple3<String, String, String>> {
        private static final long serialVersionUID = 1L;

        @Override
        public Tuple3<String, String, String> map(String line) throws Exception {
            // normalize and split the line
            String[] arr = line.toLowerCase().split(",");
            if (arr.length > 2) {
                return new Tuple3<>(arr[0], arr[1], arr[2]);
            }
            return null;
        }
    }

}

4. 把Job提交Flink集群

将上面的代码打包成jar后,通过下面的命令把job提交到Flink集群上。其中-c指定了flink-db.jar的Main class,其余的参数是本文job所用的kafka相关的参数。

bin/flink run -c com.bigknow.flink.KafkaToDB examples/flink-db.jar \
--topic my-topic \
 --bootstrap.servers 192.168.1.170:9092 \
--zookeeper.connect 192.168.1.170:2181 \
--group.id test01`

(完)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,717评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,501评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,311评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,417评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,500评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,538评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,557评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,310评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,759评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,065评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,233评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,909评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,548评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,172评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,420评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,103评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,098评论 2 352

推荐阅读更多精彩内容