Flink cdc 1.2 的 Stream操作

1. 场景

1. 对数据库下面的表发生变化的时候进行讲sql 语句打印出来进行其他的操作

2.条件 配置mysql

[root@basenode ~]# vi /etc/my.cnf

# For advice on how to change settings please see
# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html

[mysqld]
max_allowed_packet=1024M
log-bin=mysql-bin
server-id=180
binlog-format=row
binlog-do-db=test
expire_logs_days=30

3. 需要的 链接mysql

  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        Properties proper = new Properties();
        proper.setProperty("scan.startup.mode", "initial");
        DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
                .hostname("192.168.1.180")
                .username("root")
                .password("123456")
                .port(3306)
                .databaseList("test")
//                .tableList("test.test02")
                .deserializer(new MySchema())
                .debeziumProperties(proper)
                .build();

4. 对Debezium 解析到的Binlog 数据进行格式化

 private static class MySchema implements DebeziumDeserializationSchema<String> {

        /*
         * 反序列化方法
         */
        @Override
        public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {

            String topic = sourceRecord.topic();
            String[] split = topic.split("\\.");
            String dbName = split[1];
            String table = split[2];

            // 获取数据
            Struct value = (Struct) sourceRecord.value();
            Struct after = value.getStruct("after");
            Struct before = value.getStruct("before");
            JSONObject data = new JSONObject();
            if (after != null) {
                Schema schema = after.schema();
                for (Field field : schema.fields()) {
                    if (field.name().contains("time"))
                    {
                        String formatDate = DateUtils.getFormatDate(after.get(field.name()).toString());
                        System.out.println("---field.name()"+field.name());
                        data.put(field.name(), formatDate);
                    }else {
                        data.put(field.name(), after.get(field.name()));
                    }
                }
            }
            if (before != null) {
                Schema schema = before.schema();
                for (Field field : schema.fields()) {
                    System.out.println("------------"+field.name()+"==========" +before.get(field.name()));
                    if (field.name().contains("time"))
                    {
                        String formatDate = DateUtils.getFormatDate(before.get(field.name()).toString());
                        System.out.println("---field.name()"+field.name());
                        data.put(field.name(), formatDate);
                    }else
                    {
                        data.put(field.name(), before.get(field.name()));
                    }

                }
            }
            // 获取操作类型
            Envelope.Operation operation = Envelope.operationFor(sourceRecord);

            // 创建json用于存最终的结果
            JSONObject result = new JSONObject();
            result.put("database", dbName);
            result.put("table", table);
            result.put("type", operation.toString().toLowerCase());
            result.put("date", data);

            collector.collect(result.toJSONString());
        }
        @Override
        public TypeInformation<String> getProducedType() {
            return TypeInformation.of(String.class);
        }
    }

5. 总的代码

5.1 需要的pom 文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>com.wudl.flink2</groupId>
    <artifactId>Flink-cdc-instance</artifactId>
    <version>1.0.0</version>
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.12.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.12.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.12.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.3</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.49</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>1.2.0</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>1.12.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>

            </plugin>

        </plugins>
    </build>
</project>

5.2 java 代码

package com.wudl.flink;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ibm.icu.impl.Row;
import com.sun.jersey.core.util.StringIgnoreCaseKeyComparator;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.Properties;

/**
 * @ClassName : FlinkCdcWithCustomer
 * @Description :
 * @Author :wudl
 * @Date: 2021-03-13 23:37
 */

public class FlinkCdcWithCustomer {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        Properties proper = new Properties();
        proper.setProperty("scan.startup.mode", "initial");
        DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
                .hostname("192.168.1.180")
                .username("root")
                .password("123456")
                .port(3306)
                .databaseList("test")
//                .tableList("test.test02")
                .deserializer(new MySchema())
                .debeziumProperties(proper)
                .build();
        DataStreamSource<String> dataStreamSource = env.addSource(mysqlSource);
//        dataStreamSource.print();
        SingleOutputStreamOperator<String> streamOperator = dataStreamSource.map(new MapFunction<String, String>() {
            @Override
            public String map(String s) throws Exception {
                JSONObject jsonObject = JSONObject.parseObject(s);
                String typeStr = (String) jsonObject.get("type");

                if (typeStr.equals("select")  )
                {
                    return  null;
                }
                String sql = Utils.opreator(s);
//                String sql = "DELETE FROM activity WHERE id = 1";
                System.out.println("sql--"+sql);
                return sql;
            }
        });

//        streamOperator.addSink(new SinkToMySQL());

//        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//        tableEnv.executeSql(" INSERT  INTO `activity`(`id`,`activity_name`,`activity_type`,`activity_desc`,`start_time`,`end_time`,`create_time`) VALUES (1,'联想品牌优惠活动','2201','满减活动 ','2020-02-27 17:11:09','2020-02-29 17:11:12','2020-02-27 17:11:16') "  +
//                "WITH ( " +
//                " 'connector' = 'mysql-cdc', " +
//                " 'hostname' = '192.168.1.180', " +
//                " 'port' = '3306', " +
//                " 'username' = 'root', " +
//                " 'password' = '123456', " +
//                " 'database-name' = 'test', " +
//                " 'table-name' = 'activity' " +
//                ")");
//
//        //3.打印
//        Table table = tableEnv.sqlQuery("select * from test.activity");
//        tableEnv.toRetractStream(table, Row.class).print();


        env.execute();


    }


    private static class MySchema implements DebeziumDeserializationSchema<String> {

        /*
         * 反序列化方法
         */
        @Override
        public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {

            String topic = sourceRecord.topic();
            String[] split = topic.split("\\.");
            String dbName = split[1];
            String table = split[2];

            // 获取数据
            Struct value = (Struct) sourceRecord.value();
            Struct after = value.getStruct("after");
            Struct before = value.getStruct("before");
            JSONObject data = new JSONObject();
            if (after != null) {
                Schema schema = after.schema();
                for (Field field : schema.fields()) {
                    if (field.name().contains("time"))
                    {
                        String formatDate = DateUtils.getFormatDate(after.get(field.name()).toString());
                        System.out.println("---field.name()"+field.name());
                        data.put(field.name(), formatDate);
                    }else {
                        data.put(field.name(), after.get(field.name()));
                    }
                }
            }
            if (before != null) {
                Schema schema = before.schema();
                for (Field field : schema.fields()) {
                    System.out.println("------------"+field.name()+"==========" +before.get(field.name()));
                    if (field.name().contains("time"))
                    {
                        String formatDate = DateUtils.getFormatDate(before.get(field.name()).toString());
                        System.out.println("---field.name()"+field.name());
                        data.put(field.name(), formatDate);
                    }else
                    {
                        data.put(field.name(), before.get(field.name()));
                    }

                }
            }

            // 获取操作类型
            Envelope.Operation operation = Envelope.operationFor(sourceRecord);

            // 创建json用于存最终的结果
            JSONObject result = new JSONObject();
            result.put("database", dbName);
            result.put("table", table);
            result.put("type", operation.toString().toLowerCase());
            result.put("date", data);

            collector.collect(result.toJSONString());
        }

        @Override
        public TypeInformation<String> getProducedType() {
            return TypeInformation.of(String.class);
        }
    }
}

5.3 执行后的结果

log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
SLF4J: Defaulting to no-operation MDCAdapter implementation.
SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details.


---{"dt":"2021-09-24","name":"flink-mysql","id":10011,"age":19}
sql***insert into Flink_iceberg-cdc  (dt,name,id,age) values( '2021-09-24','flink-mysql',10011,19)
sql--insert into Flink_iceberg-cdc  (dt,name,id,age) values( '2021-09-24','flink-mysql',10011,19)
---{"dt":"2021-09-24","name":"flink-mysqA","id":10012,"age":19}
sql***insert into Flink_iceberg-cdc  (dt,name,id,age) values( '2021-09-24','flink-mysqA',10012,19)
sql--insert into Flink_iceberg-cdc  (dt,name,id,age) values( '2021-09-24','flink-mysqA',10012,19)
---{"dt":"2021-09-24","name":"flink-mysqA","id":10012,"age":19}
sql***insert into Flink_iceberg-cdc  (dt,name,id,age) values( '2021-09-24','flink-mysqA',10012,19)
sql--insert into Flink_iceberg-cdc  (dt,name,id,age) values( '2021-09-24','flink-mysqA',10012,19)
---{"dt":"2021-09-24","name":"flink-mysql","id":10011,"age":19}
sql***insert into Flink_iceberg-cdc  (dt,name,id,age) values( '2021-09-24','flink-mysql',10011,19)
sql--insert into Flink_iceberg-cdc  (dt,name,id,age) values( '2021-09-24','flink-mysql',10011,19)
---{"dt":"2021-09-24","name":"flink-mysqA","id":10012,"age":19}
sql***insert into Flink_iceberg-cdc  (dt,name,id,age) values( '2021-09-24','flink-mysqA',10012,19)
sql--insert into Flink_iceberg-cdc  (dt,name,id,age) values( '2021-09-24','flink-mysqA',10012,19)
---{"dt":"2021-09-28","name":"flink-mysqA4","id":10014,"age":19}
sql***insert into Flink_iceberg-cdc  (dt,name,id,age) values( '2021-09-28','flink-mysqA4',10014,19)
sql--insert into Flink_iceberg-cdc  (dt,name,id,age) values( '2021-09-28','flink-mysqA4',10014,19)
---{"dt":"2021-09-24","name":"flink-mysqA3","id":10013,"age":19}
sql***insert into Flink_iceberg-cdc  (dt,name,id,age) values( '2021-09-24','flink-mysqA3',10013,19)
sql--insert into Flink_iceberg-cdc  (dt,name,id,age) values( '2021-09-24','flink-mysqA3',10013,19)
九月 28, 2021 11:56:53 下午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
信息: Connected to 192.168.1.180:3306 at mysql-bin.000034/14533 (sid:5696, cid:42)

6. 另外可以讲结果写入到其他的数据库或者其他的操作

但是该例子中的mysql 实同步的同一个数据库, 如果是不同的数据完全可以操作

加入 一个sink
streamOperator.addSink(new SinkToMySQL());

6.1 比如 数据同步到一个mysql 中

package com.wudl.flink;

import com.mysql.jdbc.PreparedStatement;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;

/**
 * @ClassName : SinkToMySQL
 * @Description :
 * @Author :wudl
 * @Date: 2021-03-14 15:08
 */

public class SinkToMySQL extends RichSinkFunction<String> {

    PreparedStatement ps;
    private Connection connection;
    Statement statement;

    /**
     * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接
     *
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        connection = getConnection();
        statement = connection.createStatement();
//        ps = (PreparedStatement) this.connection.createStatement();
    }

    @Override
    public void close() throws Exception {
        super.close();
        //关闭连接和释放资源
        if (connection != null) {
            connection.close();
        }
        if (ps != null) {
            ps.close();
        }
        if (statement !=null)
        {
            statement.close();
        }
    }

    /**
     * 每条数据的插入都要调用一次 invoke() 方法
     *
     * @param value
     * @param context
     * @throws Exception
     */
    @Override
    public void invoke(String sql, Context context) {
        try {
            if (sql !=null) {
                statement.executeUpdate(sql);
            }
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("***************************");
            System.out.println(ex.getMessage());
        }
    }

    private static Connection getConnection() {
        Connection con = null;
        try {
            Class.forName("com.mysql.jdbc.Driver");
            con = DriverManager.getConnection("jdbc:mysql://192.168.1.180:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=UTC", "root", "123456");
        } catch (Exception e) {
            System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());
        }
        return con;
    }
}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 227,572评论 6 531
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 98,071评论 3 414
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 175,409评论 0 373
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 62,569评论 1 307
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 71,360评论 6 404
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 54,895评论 1 321
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 42,979评论 3 440
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 42,123评论 0 286
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 48,643评论 1 333
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 40,559评论 3 354
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 42,742评论 1 369
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 38,250评论 5 356
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 43,981评论 3 346
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 34,363评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 35,622评论 1 280
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 51,354评论 3 390
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 47,707评论 2 370

推荐阅读更多精彩内容