1. 场景
1. 对数据库下面的表发生变化的时候进行讲sql 语句打印出来进行其他的操作
2.条件 配置mysql
[root@basenode ~]# vi /etc/my.cnf
# For advice on how to change settings please see
# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
[mysqld]
max_allowed_packet=1024M
log-bin=mysql-bin
server-id=180
binlog-format=row
binlog-do-db=test
expire_logs_days=30
3. 需要的 链接mysql
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties proper = new Properties();
proper.setProperty("scan.startup.mode", "initial");
DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
.hostname("192.168.1.180")
.username("root")
.password("123456")
.port(3306)
.databaseList("test")
// .tableList("test.test02")
.deserializer(new MySchema())
.debeziumProperties(proper)
.build();
4. 对Debezium 解析到的Binlog 数据进行格式化
private static class MySchema implements DebeziumDeserializationSchema<String> {
/*
* 反序列化方法
*/
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
String topic = sourceRecord.topic();
String[] split = topic.split("\\.");
String dbName = split[1];
String table = split[2];
// 获取数据
Struct value = (Struct) sourceRecord.value();
Struct after = value.getStruct("after");
Struct before = value.getStruct("before");
JSONObject data = new JSONObject();
if (after != null) {
Schema schema = after.schema();
for (Field field : schema.fields()) {
if (field.name().contains("time"))
{
String formatDate = DateUtils.getFormatDate(after.get(field.name()).toString());
System.out.println("---field.name()"+field.name());
data.put(field.name(), formatDate);
}else {
data.put(field.name(), after.get(field.name()));
}
}
}
if (before != null) {
Schema schema = before.schema();
for (Field field : schema.fields()) {
System.out.println("------------"+field.name()+"==========" +before.get(field.name()));
if (field.name().contains("time"))
{
String formatDate = DateUtils.getFormatDate(before.get(field.name()).toString());
System.out.println("---field.name()"+field.name());
data.put(field.name(), formatDate);
}else
{
data.put(field.name(), before.get(field.name()));
}
}
}
// 获取操作类型
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
// 创建json用于存最终的结果
JSONObject result = new JSONObject();
result.put("database", dbName);
result.put("table", table);
result.put("type", operation.toString().toLowerCase());
result.put("date", data);
collector.collect(result.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
}
5. 总的代码
5.1 需要的pom 文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.wudl.flink2</groupId>
<artifactId>Flink-cdc-instance</artifactId>
<version>1.0.0</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.12.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
5.2 java 代码
package com.wudl.flink;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ibm.icu.impl.Row;
import com.sun.jersey.core.util.StringIgnoreCaseKeyComparator;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.Properties;
/**
* @ClassName : FlinkCdcWithCustomer
* @Description :
* @Author :wudl
* @Date: 2021-03-13 23:37
*/
public class FlinkCdcWithCustomer {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties proper = new Properties();
proper.setProperty("scan.startup.mode", "initial");
DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
.hostname("192.168.1.180")
.username("root")
.password("123456")
.port(3306)
.databaseList("test")
// .tableList("test.test02")
.deserializer(new MySchema())
.debeziumProperties(proper)
.build();
DataStreamSource<String> dataStreamSource = env.addSource(mysqlSource);
// dataStreamSource.print();
SingleOutputStreamOperator<String> streamOperator = dataStreamSource.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(s);
String typeStr = (String) jsonObject.get("type");
if (typeStr.equals("select") )
{
return null;
}
String sql = Utils.opreator(s);
// String sql = "DELETE FROM activity WHERE id = 1";
System.out.println("sql--"+sql);
return sql;
}
});
// streamOperator.addSink(new SinkToMySQL());
// StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// tableEnv.executeSql(" INSERT INTO `activity`(`id`,`activity_name`,`activity_type`,`activity_desc`,`start_time`,`end_time`,`create_time`) VALUES (1,'联想品牌优惠活动','2201','满减活动 ','2020-02-27 17:11:09','2020-02-29 17:11:12','2020-02-27 17:11:16') " +
// "WITH ( " +
// " 'connector' = 'mysql-cdc', " +
// " 'hostname' = '192.168.1.180', " +
// " 'port' = '3306', " +
// " 'username' = 'root', " +
// " 'password' = '123456', " +
// " 'database-name' = 'test', " +
// " 'table-name' = 'activity' " +
// ")");
//
// //3.打印
// Table table = tableEnv.sqlQuery("select * from test.activity");
// tableEnv.toRetractStream(table, Row.class).print();
env.execute();
}
private static class MySchema implements DebeziumDeserializationSchema<String> {
/*
* 反序列化方法
*/
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
String topic = sourceRecord.topic();
String[] split = topic.split("\\.");
String dbName = split[1];
String table = split[2];
// 获取数据
Struct value = (Struct) sourceRecord.value();
Struct after = value.getStruct("after");
Struct before = value.getStruct("before");
JSONObject data = new JSONObject();
if (after != null) {
Schema schema = after.schema();
for (Field field : schema.fields()) {
if (field.name().contains("time"))
{
String formatDate = DateUtils.getFormatDate(after.get(field.name()).toString());
System.out.println("---field.name()"+field.name());
data.put(field.name(), formatDate);
}else {
data.put(field.name(), after.get(field.name()));
}
}
}
if (before != null) {
Schema schema = before.schema();
for (Field field : schema.fields()) {
System.out.println("------------"+field.name()+"==========" +before.get(field.name()));
if (field.name().contains("time"))
{
String formatDate = DateUtils.getFormatDate(before.get(field.name()).toString());
System.out.println("---field.name()"+field.name());
data.put(field.name(), formatDate);
}else
{
data.put(field.name(), before.get(field.name()));
}
}
}
// 获取操作类型
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
// 创建json用于存最终的结果
JSONObject result = new JSONObject();
result.put("database", dbName);
result.put("table", table);
result.put("type", operation.toString().toLowerCase());
result.put("date", data);
collector.collect(result.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
}
}
5.3 执行后的结果
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
SLF4J: Defaulting to no-operation MDCAdapter implementation.
SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details.
---{"dt":"2021-09-24","name":"flink-mysql","id":10011,"age":19}
sql***insert into Flink_iceberg-cdc (dt,name,id,age) values( '2021-09-24','flink-mysql',10011,19)
sql--insert into Flink_iceberg-cdc (dt,name,id,age) values( '2021-09-24','flink-mysql',10011,19)
---{"dt":"2021-09-24","name":"flink-mysqA","id":10012,"age":19}
sql***insert into Flink_iceberg-cdc (dt,name,id,age) values( '2021-09-24','flink-mysqA',10012,19)
sql--insert into Flink_iceberg-cdc (dt,name,id,age) values( '2021-09-24','flink-mysqA',10012,19)
---{"dt":"2021-09-24","name":"flink-mysqA","id":10012,"age":19}
sql***insert into Flink_iceberg-cdc (dt,name,id,age) values( '2021-09-24','flink-mysqA',10012,19)
sql--insert into Flink_iceberg-cdc (dt,name,id,age) values( '2021-09-24','flink-mysqA',10012,19)
---{"dt":"2021-09-24","name":"flink-mysql","id":10011,"age":19}
sql***insert into Flink_iceberg-cdc (dt,name,id,age) values( '2021-09-24','flink-mysql',10011,19)
sql--insert into Flink_iceberg-cdc (dt,name,id,age) values( '2021-09-24','flink-mysql',10011,19)
---{"dt":"2021-09-24","name":"flink-mysqA","id":10012,"age":19}
sql***insert into Flink_iceberg-cdc (dt,name,id,age) values( '2021-09-24','flink-mysqA',10012,19)
sql--insert into Flink_iceberg-cdc (dt,name,id,age) values( '2021-09-24','flink-mysqA',10012,19)
---{"dt":"2021-09-28","name":"flink-mysqA4","id":10014,"age":19}
sql***insert into Flink_iceberg-cdc (dt,name,id,age) values( '2021-09-28','flink-mysqA4',10014,19)
sql--insert into Flink_iceberg-cdc (dt,name,id,age) values( '2021-09-28','flink-mysqA4',10014,19)
---{"dt":"2021-09-24","name":"flink-mysqA3","id":10013,"age":19}
sql***insert into Flink_iceberg-cdc (dt,name,id,age) values( '2021-09-24','flink-mysqA3',10013,19)
sql--insert into Flink_iceberg-cdc (dt,name,id,age) values( '2021-09-24','flink-mysqA3',10013,19)
九月 28, 2021 11:56:53 下午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
信息: Connected to 192.168.1.180:3306 at mysql-bin.000034/14533 (sid:5696, cid:42)
6. 另外可以讲结果写入到其他的数据库或者其他的操作
但是该例子中的mysql 实同步的同一个数据库, 如果是不同的数据完全可以操作
加入 一个sink
streamOperator.addSink(new SinkToMySQL());
6.1 比如 数据同步到一个mysql 中
package com.wudl.flink;
import com.mysql.jdbc.PreparedStatement;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
/**
* @ClassName : SinkToMySQL
* @Description :
* @Author :wudl
* @Date: 2021-03-14 15:08
*/
public class SinkToMySQL extends RichSinkFunction<String> {
PreparedStatement ps;
private Connection connection;
Statement statement;
/**
* open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接
*
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = getConnection();
statement = connection.createStatement();
// ps = (PreparedStatement) this.connection.createStatement();
}
@Override
public void close() throws Exception {
super.close();
//关闭连接和释放资源
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
if (statement !=null)
{
statement.close();
}
}
/**
* 每条数据的插入都要调用一次 invoke() 方法
*
* @param value
* @param context
* @throws Exception
*/
@Override
public void invoke(String sql, Context context) {
try {
if (sql !=null) {
statement.executeUpdate(sql);
}
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("***************************");
System.out.println(ex.getMessage());
}
}
private static Connection getConnection() {
Connection con = null;
try {
Class.forName("com.mysql.jdbc.Driver");
con = DriverManager.getConnection("jdbc:mysql://192.168.1.180:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=UTC", "root", "123456");
} catch (Exception e) {
System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());
}
return con;
}
}