在做报表数据统计时,我们用的是mysql + kafka + Spark Streaming方案,
kafka监听mysql订单表中订单状态,然后发送到spark streaming中进行分析统计。
这里记录一下kafka监听mysql中数据变更方案
1、Kafka connect
-
1.简介
kafka connect 是一个可扩展的、可靠的在kafka和其他系统之间流传输的数据工具。简而言之就是他可以通过Connector(连接器)简单、快速的将大集合数据导入和导出kafka。可以接收整个数据库或收集来自所有的应用程序的消息到kafka的topic中Kafka connect是Confluent公司(当时开发出Apache Kafka的核心团队成员出来创立的新公司)开发的confluent platform的核心功能.大家都知道现在数据的ETL过程经常会选择kafka作为消息中间件应用在离线和实时的使用场景中,而kafka的数据上游和下游一直没有一个无缝衔接的pipeline来实现统一,比如会选择flume或者logstash采集数据到kafka,然后kafka又通过其他方式pull或者push数据到目标存储.
而kafka connect旨在围绕kafka构建一个可伸缩的,可靠的数据流通道,通过kafka connect可以快速实现大量数据进出kafka从而和其他源数据源或者目标数据源进行交互构造一个低延迟的数据pipeline。
具体官网文档www.confluent.io/.
https://docs.confluent.io/2.0.0/connect/connect-jdbc/docs/index.html#examples
-
2.安装
- kafka安装
-
下载kafka-connect-jdbc
下载成功后,从confluentinc-kafka-connect-jdbc-4.1.2.zip libs中获取到kafka-connect-jdbc-4.1.2.jar,并把其放到kafka安装目录下libs文件夹中 - 下载mysql-connector-java-5.1.47.jar,并把其放到kafka安装目录下libs文件夹中
-
3.使用
- 1.启动kafka sh kafkaStart.sh
- 2.创建kafka topic
./bin/kafka-run-class.sh kafka.admin.TopicCommand --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mysql-kafka-comments
- 3.新建source/sink配置文件,并放置在kafka config目录下
vim quickstart-mysql.properties name=mysql-b-source-comments connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=1 connection.url=jdbc:mysql://127.0.0.1:3306/android_service?user=xxx&password=xxxx # timestamp+incrementing 时间戳自增混合模式 mode=timestamp+incrementing # 时间戳 commenttime timestamp.column.name=commenttime # 自增字段 id incrementing.column.name=id # 白名单表 comments table.whitelist=comments # topic前缀 mysql-kafka- topic.prefix=mysql-kafka- vim quickstart-mysql-sink.properties name=mysql-b-sink-comments connector.class=io.confluent.connect.jdbc.JdbcSinkConnector tasks.max=1 #kafka的topic名称 topics=mysql-kafka-comments # 配置JDBC链接 connection.url=jdbc:mysql://127.0.0.1:3306/android_service?user=xxx&password=xxxx # 不自动创建表,如果为true,会自动创建表,表名为topic名称 auto.create=false # upsert model更新和插入 insert.mode=upsert # 下面两个参数配置了以pid为主键更新 pk.mode = record_value pk.fields = id #表名为kafkatable table.name.format=kafkacomments
- 4.启动kafka connect
./bin/connect-standalone.sh ./config/connect-standalone.properties ./config/quickstart-mysql.properties ./config/quickstart-mysql-sink.properties
启动过程中有报8083端口已经被占用,在config目录下,修改connect-standalone文件,在最后一样添加,用于修改监听REST API的默认端口
#用于监听REST API的端口 rest.port=8003
- 插入数据
在comments表中插入数据后,可以看到在kafkacomments表中也同步插入了数据
- 插入数据
在comments表中更新数据后,可以看到在kafkacomments表中也同步更新了数据
- 6.查看kafka topic中数据
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mysql-kafka-comments --from-beginning
7.自定义开发connect
https://github.com/confluentinc/kafka-connect-jdbc
如果有需求要自定义开发connect的话,可以直接在这个源码中开发,然后打成jar包。开发一个连接器只需要实现两个接口,Connector和Task8.参考文章
//www.greatytc.com/p/46b6fa53cae4
https://www.orchome.com/345
2、canal
canal是阿里开源的中间件,纯Java开发,基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB),主要用于同步mysql数据库变更,是一个非常成熟的数据库同步方案。
https://github.com/alibaba/canal/
canal是通过模拟成为mysql 的slave的方式,监听mysql 的binlog日志来获取数据,binlog设置为row模式以后,不仅能获取到执行的每一个增删改的脚本,同时还能获取到修改前和修改后的数据,基于这个特性,canal就能高性能的获取到mysql数据数据的变更。
canal的部署主要分为server端和client端。
server端部署好以后,可以直接监听mysql binlog,因为server端是把自己模拟成了mysql slave,所以,只能接受数据,没有进行任何逻辑的处理,具体的逻辑处理,需要client端进行处理。
client端一般是需要大家进行简单的开发。https://github.com/alibaba/canal/wiki/ClientAPI 有一个简单的示例,很容易理解。
canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ(kafka,RocketMQ)
https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart