Kafka Connect用于在Kafka与其他系统间数据传输的工具。Kafka connect可以获取整个数据库或从所有应用程序服务器收集指标到Kafka;也可以从Kafka中topic数据传输至其他存储或者查询系统或者批处理系统进行离线分析。
Kafka Connect功能
Kafka能用框架,提供统一的集成API
支持分布式模式(distributed)及单机模式(standalone)
REST接口,用于查看和管理Kafka connectors
自动化offset管理,开发人员不必担心错误处理的影响
分布式,可扩展
流/批处理集成
Kafka connect两个核心组成 Source和Sink。
Source:负责导入数据到Kafka;
Sink :负责从Kafka导出数据;
(如上二者都称为 connector)
Kafka connect的几个重要的概念包括:connectors、tasks、workers和converters。
Connectors-通过管理任务来细条数据流的高级抽象;
Tasks- 数据写入kafka和数据从kafka读出的实现;
Workers-运行connectors和tasks的进程;
Converters- kafka connect和其他存储系统直接发送或者接受数据之间转换数据;
distribute模式启动:
需要先建三个broker
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic connect-configs
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 50 --topic connect-offsets
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 10 --topic connect-status
启动
bin/connect-distributed.sh config/connect-distributed.properties
通过rest api管理connector
因为kafka connect的意图是以服务的方式去运行,所以它提供了REST API去管理connectors,默认的端口是8083,你也可以在启动kafka connect之前在配置文件中添加rest.port配置。
GET /connectors – 返回所有正在运行的connector名
POST /connectors – 新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。
GET /connectors/{name} – 获取指定connetor的信息
GET /connectors/{name}/config – 获取指定connector的配置信息
PUT /connectors/{name}/config – 更新指定connector的配置信息
GET /connectors/{name}/status – 获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
GET /connectors/{name}/tasks – 获取指定connector正在运行的task。
GET /connectors/{name}/tasks/{taskid}/status – 获取指定connector的task的状态信息
PUT /connectors/{name}/pause – 暂停connector和它的task,停止数据处理知道它被恢复。
PUT /connectors/{name}/resume – 恢复一个被暂停的connector
POST /connectors/{name}/restart – 重启一个connector,尤其是在一个connector运行失败的情况下比较常用
POST /connectors/{name}/tasks/{taskId}/restart – 重启一个task,一般是因为它运行失败才这样做。
DELETE /connectors/{name} – 删除一个connector,停止它的所有task并删除配置。
#standalone模式启动
bin/connect-standalone.sh config/connect-standalone.properties config/connector1.properties