1.数据采集流程
2. 环境准备
2.1 MySQL主从数据库环境准备
组件 | hdp01(主机) | hdp02(主机) |
---|---|---|
MySQL数据库(主) | √ | |
MySQL数据库(从) | √ | |
Canal Server | √ |
说明:
- DBus-0.6.1使用Canal-v1.1.4,支持MySQL5.6和5.7
- 被同步的MySQL bin-log需要是row模式
- 考虑到Kafka的message大小不宜太大,目前设置的是最大10MB,因此不支持同步MySQL MEDIUUMTEXT/MediumBlob和LongTEXT/LongBlob类型的数据,如果表中有这样类型的数据会直接被替换为空
MySQL主从配置:这里不再说明MySQL如何安装,只说明主从如何配置
主库配置:
# 主库配置
[client]
default-character-set=utf8mb4
[mysql]
socket=/var/lib/mysql/mysql.sock
default-character-set=utf8mb4
[mysqld]
socket=/var/lib/mysql/mysql.sock
symbolic-links=0
log-error=/var/log/mysqld.log
pid-file=/var/run/mysqld/mysqld.pid
datadir=/data/mysql
character_set_server=utf8mb4
character-set-client-handshake=FALSE
collation-server=utf8mb4_unicode_ci
max_connections=800
max_connect_errors=1000
############################### 以下为主从配置以及binlog配置,新增这些配置 ###############################
# 用于标识唯一的数据库,不能和别的服务器重复,建议使用ip的最后一段,默认值0代表不允许任何从库同步数据,不可以使用
server-id=105
# 用于指定binlog日志文件名前缀
log-bin=mysql-bin
binlog-format=Row
# 这些是表示同步的时候忽略的数据库
binlog-ignore-db=information_schema
binlog-ignore-db=ambari
binlog-ignore-db=dbusmgr
binlog-ignore-db=hive
binlog-ignore-db=mysql
binlog-ignore-db=performance_schema
binlog-ignore-db=sys
重启主库服务:
[admin@hdp01 ~]$ sudo systemctl restart mysqld
# 登录MySQL,创建一个用户,从库使用此用户连接主库进行bin-log同步
mysql> set global validate_password_policy=0;
mysql> set global validate_password_mixed_case_count=0;
mysql> set global validate_password_number_count=3;
mysql> set global validate_password_special_char_count=0;
mysql> set global validate_password_length=3;
mysql> CREATE USER 'repl'@'%' IDENTIFIED BY '123456';
mysql> GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%' identified by '123456';
mysql> flush privileges;
# 查看主库状态
mysql> show master status;
+------------------+----------+--------------+---------------------------------------------------------------------+-------------------+
| File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------------+----------+--------------+---------------------------------------------------------------------+-------------------+
| mysql-bin.000001 | 834 | | information_schema,ambari,dbusmgr,hive,mysql,performance_schema,sys | |
+------------------+----------+--------------+---------------------------------------------------------------------+-------------------+
1 row in set (0.00 sec)
从库配置:
# 从库配置
[client]
default-character-set=utf8mb4
[mysql]
socket=/var/lib/mysql/mysql.sock
default-character-set=utf8mb4
[mysqld]
socket=/var/lib/mysql/mysql.sock
symbolic-links=0
log-error=/var/log/mysqld.log
pid-file=/var/run/mysqld/mysqld.pid
datadir=/data/mysql
character_set_server=utf8mb4
character-set-client-handshake=FALSE
collation-server=utf8mb4_unicode_ci
max_connections=800
max_connect_errors=1000
############################### 以下为主从配置以及binlog配置,新增这些配置 ###############################
# 用于标识唯一的数据库,一定不能和主库的server-id一样
server-id=106
# 用于指定binlog日志文件名前缀
log-bin=mysql-bin
# 这个必须加上,因为从库上的MySQL可以是slave也可以是master,加上该选项才会生成级联binlog,Canal才可以从从库采集数据
log_slave_updates
binlog-format=Row
# 这些是表示同步的时候忽略的数据库
binlog-ignore-db=information_schema
binlog-ignore-db=ambari
binlog-ignore-db=dbusmgr
binlog-ignore-db=hive
binlog-ignore-db=mysql
binlog-ignore-db=performance_schema
binlog-ignore-db=sys
重启从库服务:
[admin@hdp02 ~]$ sudo systemctl restart mysqld
# 登录MySQL进行如下操作
# 注意这里的MASTER_LOG_FILE='mysql-bin.000001',MASTER_LOG_POS=834是在主库通过show master status查询到的信息
mysql> CHANGE MASTER TO MASTER_HOST='hdp01', MASTER_USER='repl', MASTER_PASSWORD='123456', MASTER_LOG_FILE='mysql-bin.000001', MASTER_LOG_POS=834;
mysql> START SLAVE;
# 查看从库状态
mysql> show slave status\G
*************************** 1. row ***************************
Slave_IO_State: Waiting for master to send event
Master_Host: hdp01
Master_User: repl
Master_Port: 3306
Connect_Retry: 60
Master_Log_File: mysql-bin.000001
Read_Master_Log_Pos: 834
Relay_Log_File: hdp02-relay-bin.000002
Relay_Log_Pos: 320
Relay_Master_Log_File: mysql-bin.000001
Slave_IO_Running: Yes
Slave_SQL_Running: Yes
# Slave_IO_Running和Slave_SQL_Running应该都为Yes
2.2 数据库源端配置
在业务库主库中执行以下操作:
# 1.创建dbus库和dbus用户及相应权限
mysql> set global validate_password_policy=0;
mysql> set global validate_password_mixed_case_count=0;
mysql> set global validate_password_number_count=3;
mysql> set global validate_password_special_char_count=0;
mysql> set global validate_password_length=3;
mysql> create database dbus;
mysql> CREATE USER 'dbus'@'%' IDENTIFIED BY '123456';
mysql> GRANT ALL ON dbus.* TO dbus@'%' IDENTIFIED BY '123456';
mysql> flush privileges;
# 2.创建dbus库中需要包含的1张表,创建细节如下
mysql> use dbus;
mysql> DROP TABLE IF EXISTS `db_heartbeat_monitor`;
mysql> CREATE TABLE `db_heartbeat_monitor` (
-> `ID` bigint(19) NOT NULL AUTO_INCREMENT COMMENT '',
-> `DS_NAME` varchar(64) NOT NULL COMMENT '',
-> `SCHEMA_NAME` varchar(64) NOT NULL COMMENT '',
-> `TABLE_NAME` varchar(64) NOT NULL COMMENT '',
-> `PACKET` varchar(256) NOT NULL COMMENT '',
-> `CREATE_TIME` datetime NOT NULL COMMENT '',
-> `UPDATE_TIME` datetime NOT NULL COMMENT '',
-> PRIMARY KEY (`ID`)
-> ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
mysql> SET FOREIGN_KEY_CHECKS=0;
2.3 准备测试用业务数据库
在主库执行以下操作:
# 创建测试库
mysql> set global validate_password_policy=0;
mysql> set global validate_password_mixed_case_count=0;
mysql> set global validate_password_number_count=3;
mysql> set global validate_password_special_char_count=0;
mysql> set global validate_password_length=3;
mysql> create database test;
# 创建测试用户
mysql> CREATE USER 'test'@'%' IDENTIFIED BY '123456';
mysql> GRANT ALL ON test.* TO test@'%' IDENTIFIED BY '123456';
mysql> flush privileges;
# 创建测试表
mysql> use test;
mysql> create table t1(a int, b varchar(50));
Query OK, 0 rows affected (0.01 sec)
2.4 开启dbus用户拉备库权限
在主库执行以下操作:
mysql> GRANT select on test.t1 TO dbus;
mysql> flush privileges;
2.5 手动Canal部署
创建Canal专用用户,在从库执行以下操作,因为canal用户要去从库拉取数据:
mysql> set global validate_password_policy=0;
mysql> set global validate_password_mixed_case_count=0;
mysql> set global validate_password_number_count=3;
mysql> set global validate_password_special_char_count=0;
mysql> set global validate_password_length=3;
mysql> CREATE USER 'canal'@'%' IDENTIFIED BY '123456';
mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
mysql> FLUSH PRIVILEGES;
下载canal安装包:https://github.com/BriData/DBus/releases,官网给了百度网盘下载链接,文件为v0.6.1/deployer-0.6.1/zip/dbus-canal-auto-0.6.1.zip和v0.6.1/deployer-0.6.1/zip/canal.zip,注意不要使用canal官方的安装包
# 将dbus-canal-auto-0.6.1.zip上传到dbus安装目录下,就是/opt/apps/dbus目录下,然后解压
[admin@hdp02 dbus]$ pwd
/opt/apps/dbus
[admin@hdp02 dbus]$ ll
total 12
drwxr-xr-x 2 admin admin 155 2020-11-05 22:54 bin
drwxr-xr-x 9 admin admin 195 2020-11-10 23:23 conf
drwxr-xr-x 4 admin admin 82 2020-11-03 21:15 dbus-canal-auto-0.6.1
drwxr-xr-x 2 admin admin 310 2020-11-05 22:04 extlib
drwxr-xr-x 2 admin admin 213 2020-11-05 22:54 lib
drwxr-xr-x 2 admin admin 97 2020-11-05 22:54 logs
-rw-r--r-- 1 admin admin 8194 2020-11-05 22:04 README.md
drwxr-xr-x 2 admin admin 134 2020-11-12 20:44 zip
# 将canal.zip上传到上一步解压的dbus-canal-auto-0.6.1目录下并解压
[admin@hdp02 dbus-canal-auto-0.6.1]$ ll
total 16
-rwxr-xr-x 1 admin admin 651 2019-12-18 21:09 addLine.sh
drwxrwxr-x 6 admin admin 52 2020-02-24 14:50 canal
drwxr-xr-x 2 admin admin 35 2020-11-12 21:06 conf
-rwxr-xr-x 1 admin admin 654 2019-12-18 21:09 delLine.sh
-rwxr-xr-x 1 admin admin 1103 2019-12-18 21:09 deploy.sh
drwxr-xr-x 2 admin admin 4096 2020-08-21 17:55 lib
安装canal:
# 修改dbus-canal-auto-0.6.1/conf/canal-auto.properties文件
[admin@hdp02 dbus-canal-auto-0.6.1]$ vim conf/canal-auto.properties
# 数据源的名称,后续在页面填写的时候,需要填这个名称
dsname=dbus_mysql_test
# zk address
zk.path=hdp02:2181,hdp03:2181,hdp04:2181
# mysql address:从库的地址
canal.address=hdp02:3306
# mysql canal user
canal.user=canal
# mysql canal password
canal.pwd=123456
# means mysql server-id:需要与mysql的主从库的server-id不同
canal.slaveId=1050
# kafka address:kafka集群地址
bootstrap.servers=hdp02:9092,hdp03:9092,hdp04:9092
# 执行deploy.sh
[admin@hdp02 dbus-canal-auto-0.6.1]$ sh deploy.sh
......
********************************* CANAL DEPLOY SCCESS! **************************************
report文件: canal_deploy_dbus_mysql_test_20201112211012.txt
# 执行完成后,生成了两个日志
lrwxrwxrwx 1 admin admin 79 2020-11-12 21:10 dbus_mysql_test_canal.log -> /opt/apps/dbus/dbus-canal-auto-0.6.1/canal-dbus_mysql_test/logs/canal/canal.log
lrwxrwxrwx 1 admin admin 99 2020-11-12 21:10 dbus_mysql_test.log -> /opt/apps/dbus/dbus-canal-auto-0.6.1/canal-dbus_mysql_test/logs/dbus_mysql_test/dbus_mysql_test.log
# 检查一下这两个日志,如果其中没有报错,就是部署成功了!
# 同时生成了一个目录canal-dbus_mysql_test,后续canal的启停脚本在此目录的bin下
在DBus页面删除自动部署canal的配置信息:
3. 在DBus平台中采集MySQL数据
使用admin用户登录DBus平台:
MySQL URL如下:
jdbc:mysql://hdp01:3306/dbus?useUnicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false&noAccessToProcedureBodies=true&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false
在Storm UI中确认一下两个任务是够提交成功:
验证:
MySQL中插入数据:
mysql> use test;
mysql> INSERT INTO t1(a, b) VALUES (101, "Tom");
mysql> INSERT INTO t1(a, b) VALUES (102, "Kate");
mysql> INSERT INTO t1(a, b) VALUES (103, "Jerry");
Kafka的dbus_mysql_test
、dbus_mysql_test.dbus
和dbus_mysql_test.dbus.result
这3个Topic中应该有数据进入。
至此,使用DBus平台收集MySQL bin-log日志就成功了!