前言:
最近需要调研Kafka Connect
可连接哪些数据源,特此学习官网并翻译了下文档。
Confluent JDBC Connector官网地址(官网布局可能有些凌乱)
本文为纯理论学习,实践请看下文 Kafka Connect 实现MySQL增量同步
希望看过的小伙伴点个❤,谢谢。
JDBC Connector
JDBC connector
允许您通过JDBC驱动程序将任何关系型数据库中的数据导入到Kafka的主题Topic
中。通过使用JDBC,这个连接器可以支持各种数据库,不需要为每个数据库定制代码。
通过定期地执行SQL查询语句并为结果集中的每一行创建输出记录来加载数据。在默认情况下,在一个数据库中的所有表都会被复制,每个表都复制到自己的输出主题。数据库那些新的或删除的表被监视并自动适应调整。当从表中复制数据时,连接器可以仅仅加载新增或修改的行通过指定哪些列应当被用来发现新增或修改的数据。
Quickstart
为了了解连接器的基本功能,我们将从本地SQLite数据库中复制单个表。在这个简单的示例中,我们假设表中的每个条目都被分配了一个唯一的ID,并且在创建之后不会对其进行修改。Confluent Platform内置包含了用于SQLite
和PostgreSQL
的JDBC驱动程序,但是如果使用的是不同的数据库(如MySQL),则还需要确保JDBC驱动程序在Kafka Connect进程的CLASSPATH上可用。
快速开始
创建表中测试数据
sqlite> CREATE TABLE accounts(id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, name VARCHAR(255));
sqlite> INSERT INTO accounts(name) VALUES('alice');
sqlite> INSERT INTO accounts(name) VALUES('bob');
创建一个配置文件,用于从该数据库中加载数据。此文件包含在etc/kafka-connect-jdbc/quickstart-sqlite.properties中的连接器中,并包含以下设置:
(学习了解配置结构即可)
name=test-sqlite-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:sqlite:test.db
mode=incrementing
incrementing.column.name=id
topic.prefix=test-sqlite-jdbc-
前几个设置是您将为所有连接器指定的常见设置。connection.url
指定要连接的数据库,在本例中是本地SQLite数据库文件。mode
指示我们想要如何查询数据。在本例中,我们有一个自增的唯一ID,因此我们选择incrementing
递增模式并设置incrementing.column.name
递增列的列名为id。在这种mode
模式下,每次查询新的数据将只返回那些ID大于之前查询中最大ID的行。最后,我们可以通过设置topic.prefix
,来控制每个表的输出发送到的Topic
主题的名称。因为现在我们只有一个表,所以本例中唯一的输出主题将会是test-sqlite-jdbc-accounts。
在另一个终端的独立的Kafka Connect
中运行(运行刚刚的配置文件)
$ ./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-jdbc/quickstart-sqlite.properties
可以看到进程启动并记录一些消息,然后它将开始执行查询并将结果发送到Topic
$ ./bin/kafka-avro-console-consumer --new-consumer --bootstrap-server localhost:9092 --topic test-sqlite-jdbc-accounts --from-beginning
{"id":1,"name":{"string":"alice"}}
{"id":2,"name":{"string":"bob"}}
默认的轮询间隔是5秒,因此可能需要几秒钟才能显示。根据您预期的速率更新或期望的延迟,可以使用较短的轮询间隔更快地显示。
Features 特性
JDBC connector
支持使用各种JDBC数据类型来复制表,动态地从数据库中添加和删除表、whitelists
白名单和blacklists
黑名单、更改轮询间隔以及其他设置。然而,对于大多数用户来说,最重要的特性是控制如何从数据库增量地复制数据的设置。
Kafka Connect
跟踪监测从每个表中检索到的最新记录,因此在下一次迭代循环中(或者在崩溃的情况下),它可以在正确的位置开始。JDBC connector
使用此功能,仅在每次迭代中从表(或从自定义查询的输出)获取更新过updated
的行。支持多种模式,每一种模式在修改行上的不同都能被发现查明。
Incremental Query Modes 增量查询模式
每一种增量查询模式跟踪每行的一组列,用途来跟踪哪些行已被处理以及哪些行是新增的或者已经被更新的。mode
模式的设置控制它的行为,并支持以下选项:
-
Incrementing Column
自增列
每行包含唯一ID的列,它的较新的行保证具有较大的ID,即AUTOINCREMENT
自增列。注意,此模式只能检测发现新增的行。对于存在的记录行的更新是无法检测到的,因此此模式应当仅用于那些不变的数据行。举个例子,是在数据仓库中流式传输事实表时,因为事实表通常是只插入的。 -
Timestamp Column
时间戳列
在这种模式下,一个列包含修改时间戳用来跟踪最近一次latest time
处理的数据,并且仅查询从那时起已经修改的行。注意,由于时间戳不一定是唯一的,因此此模式不能保证所有更新的数据都将被传递:如果2行共有相同的时间戳,并且通过增量查询返回,在崩溃之前只处理了一行,那么当系统恢复时,将错过第二个修改更新。 -
Timestamp and Incrementing Columns
时间戳和自增列
这是将一个递增列和一个时间戳列组合在一起的最稳健和精确的模式。通过组合这两者,只要时间戳足够细,每个(id,timestamp)元组将唯一地标识对行的更新。即使部分完成后更新失败,当系统恢复时,仍然可以正确地检测和传递未处理的更新。 -
Custom Query
定制化查询
JDBC connector
支持使用定制查询而不是复制整个表。通过自定义查询,可以使用其他自动更新模式之一,只要必要的WHERE
子句可以正确地附加到查询中。或者,指定的查询可以处理对新的修改更新本身的过滤;然而,请注意,将不执行offset
偏移量跟踪(不同于自动模式,其中为每个记录记录递增和/或时间戳列值),因此查询必须自己跟踪offset
偏移量。 -
Bulk
大块查询
此模式未经过滤,因此根本不递增。它将在每次迭代中从表中加载所有行。如果希望周期性地消费转储整个表,其中条目最终会被删除,并且下游系统可以安全地处理副本,那么这非常有用。
Configuration 配置
JDBC Drivers
JDBC connector
在可以从数据库导入数据以及如何导入数据的数据库中提供了相当大的灵活性。本节首先描述如何访问那些其驱动程序不包含在Confluent Platform中的数据库,然后给出几个覆盖常见场景的示例配置文件,然后提供可用配置选项的详尽描述。
JDBC connector
在通用JDBC API上实现数据复制的功能,但是依赖于JDBC驱动来处理这些API的数据库特定实现。Confluent Platform附带了一些JDBC驱动程序,但是如果没有包括你用的数据库的驱动程序,则需要通过CLASSPATH
使其可用。
一种选择是在连接器安装JDBC驱动程序jar
。相对于安装目录,打包的连接器安装在share/java/kafka-connect-jdbc
目录中。如果您已经从Debian
或RPM
包安装,连接器将安装在/usr/share/java/kafka-connect-jdbc
中。如果你是从zip或tar文件安装,连接器将安装在上面给出的路径中,该路径位于您解压缩Confluent Platform归档的目录下。
另一种选择,你也可以设置CLASSPATH
变量在运行copycat-standalone
或copycat-distributed
$ CLASSPATH=/usr/local/firebird/* ./bin/copycat-distributed ./config/copycat-distributed.properties
jdbc:firebirdsql:localhost/3050:/var/lib/firebird/example.db
Examples*
使用whitelist
白名单来限制MySQL数据库表中子集的更改,对于所有白名单表上符合标准的,使用id
和modified
两列列来检测已修改的行。这种模式是最稳健的,因为它可以将唯一、不可变的行id
与修改时间戳相结合,以确保即使过程在增量更新查询中死亡,也不会错过这些更新。
name=mysql-whitelist-timestamp-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10
connection.url=jdbc:mysql://mysql.example.com:3306/my_database?user=alice&password=secret
table.whitelist=users,products,transactions
mode=timestamp+incrementing
timestamp.column.name=modified
incrementing.column.name=id
topic.prefix=mysql-
下面这个例子使用了自定义查询query
代替加载整个表,允许您从多个表中`join``连接数据。只要查询不包含其自身的过滤,您仍然可以使用内置模式进行增量查询(在本例中,使用时间戳列)。注意,这限制了您对每个连接器的单个输出,并且因为没有表名,所以本例中主题“prefix”实际上是完整的主题名。
name=mysql-whitelist-timestamp-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10
connection.url=jdbc:postgresql://postgres.example.com/test_db?user=bob&password=secret&ssl=true
query=SELECT users.id, users.name, transactions.timestamp, transactions.user_id, transactions.payment FROM users JOIN transactions ON (users.id = transactions.user_id)
mode=timestamp
timestamp.column.name=timestamp
topic.prefix=mysql-joined-data
Configuration Options 配置项
connection.url
要加载数据库的JDBC连接URL
-类型: String
-默认: ""
-重要度: 高
topic.prefix
众多表名之前的前缀,向其发布数据的KafkaTopic
主题的名称,或者在自定义查询的情况下,生成要向其发布的Topic
主题的全名。
-类型: String
-默认: ""
-重要度: 高
mode
incrementing、timestamp、timestamp+incrementing、bulk
-类型: String
-默认: ""
-重要度: 高
poll.interval.ms
以ms为单位轮询每个表中的新数据
-类型: int
-默认: "5000"
-重要度: 高
incrementing.column.name
用于检测新增行的递增列的名称。此列可能不为空。
-类型: String
-默认: ""
-重要度: 中
query
使用自定义查询代替加载表
-类型: String
-默认: ""
-重要度: 中
table.blacklist
不允许复制的表清单。如果指定table.blacklist
,则不能设置table.whitelist
。
-类型: List
-默认: ""
-重要度: 中
table.whitelist
允许复制的表清单。如果指定table.whitelist
,则不能设置table.blacklist
-类型: List
-默认: ""
-重要度: 中
timestamp.column.name
时间戳列的名字,用于检测新增或修改的行记录。此列可能不为空。
-类型: List
-默认: ""
-重要度: 中
batch.max.rows
在轮询新数据时,单个批处理中包括的最大行数。此设置可用于限制连接器内部缓冲的数据量。
-类型: int
-默认: 100
-重要度: 低
table.poll.interval.ms
以ms为单位轮询新表或已删除表的频率,这可能导致更新任务配置,以开始轮询已添加表中的数据或停止轮询已删除表中的数据。
-类型: long
-默认: 60000
-重要度: 低