MySQL数据实时增量同步到Redis

一、go-mysql-transfer

go-mysql-transfer是使用Go语言实现的MySQL数据库实时增量同步工具。能够实时监听MySQL二进制日志(binlog)的变动,将变更内容形成指定格式的消息,发送到接收端。在数据库和接收端之间形成一个高性能、低延迟的增量数据(Binlog)同步管道, 具有如下特点:

1、不依赖其它组件,一键部署

2、集成多种接收端,如:Redis、MongoDB、Elasticsearch、RabbitMQ、Kafka、RocketMQ,不需要再编写客户端,开箱即用

3、内置丰富的数据解析、消息生成规则;支持Lua脚本,以处理更复杂的数据逻辑

4、支持监控告警,集成Prometheus客户端

5、高可用集群部署

6、数据同步失败重试

7、全量数据初始化

详情及安装说明 请参见: MySQL Binlog 增量同步工具go-mysql-transfer实现详解

项目开源地址:
gitee (速度更快) :go-mysql-transfer
github:go-mysql-transfer

如果此工具对你有帮助,请Star支持下

二、配置

Redis部署模式可以为单机、主从(哨兵)、集群(cluster)。相关配置如下:

# app.yml
redis_addrs: 127.0.0.1:6379 #地址,多个用逗号分隔
#redis_group_type: cluster   # 集群类型 sentinel或者cluster
#redis_master_name: mymaster # Master节点名称,如果group_type为sentinel则此项不能为空,为cluster此项无效
#redis_pass: 123456 #redis密码
#redis_database: 0  #redis数据库 0-16,默认0。如果group_type为cluster此项无效

三、数据转换规则

相关配置如下:

rule:
  -
    schema: eseap #数据库名称
    table: t_user #表名称
    #order_by_column: id #排序字段,存量数据同步时不能为空
    #column_lower_case:false #列名称转为小写,默认为false
    #column_upper_case:false#列名称转为大写,默认为false
    column_underscore_to_camel: true #列名称下划线转驼峰,默认为false
    # 包含的列,多值逗号分隔,如:id,name,age,area_id  为空时表示包含全部列
    #include_columns: ID,USER_NAME,PASSWORD
    #exclude_columns: BIRTHDAY,MOBIE # 排除掉的列,多值逗号分隔,如:id,name,age,area_id  默认为空
    #column_mappings: CARD_NO=sfz #列名称映射,多个映射关系用逗号分隔,如:USER_NAME=account 表示将字段名USER_NAME映射为account
    #default_column_values: source=binlog,area_name=合肥  #默认的列-值,多个用逗号分隔,如:source=binlog,area_name=合肥
    #date_formatter: yyyy-MM-dd #date类型格式化, 不填写默认yyyy-MM-dd
    #datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp类型格式化,不填写默认yyyy-MM-dd HH:mm:ss
    #lua_file_path: lua/t_user.lua   #lua脚本文件,详见使用手册,当此值不为空时后面的配置除redis_structure其余均无效
    #lua_script:   #lua 脚本,详见使用手册,当此值不为空时后面的配置均无效
    value_encoder: json  #值编码
    #value_formatter: ${ID}|${USER_NAME} #值格式化表达式,如:${ID}|${USER_NAME},${ID}表示字段id的值、${USER_NAME}表示字段name的值
    #redis相关
    redis_structure: string # 数据类型。 支持string、hash、list、set、sortedset类型(与redis的数据类型一致)
    redis_key_prefix: USER_ #key的前缀
    redis_key_column: USER_NAME #使用哪个列的值作为key,不填写默认使用主键
    #redis_key_formatter: ${id}-${name} # KEY格式化表达式,如:${ID}-${USER_NAME},${ID}表示字段id的值、${USER_NAME}表示字段name的值
    #redis_key_value: user #KEY的值(固定值);当redis_structure为hash、list、set、sortedset此值不能为空
    #redis_hash_field_prefix: _CARD_ #hash的field前缀,仅redis_structure为hash时起作用
    #redis_hash_field_column: Cert_No #使用哪个列的值作为hash的field,仅redis_structure为hash时起作用,不填写默认使用主键

value_encoder表示值编码方式,不填写默认为json,支持如下编码方式:

格式 说明 举例
json json {"id": "1001","userName": "admin","password": "123456",
"createTime": "2020-07-20 14:29:19"}
kv-commas key-value逗号分隔 id=1001,userName=admin,password=123456,createTime=2020-07-20 14:29:19
v-commas value逗号分隔 1001,admin,123456,2020-07-20 14:29:19

示例

t_user表,数据如下:

同步为string类型

配置如下:

    schema: eseap #数据库名称
    table: t_user #表名称
    column_underscore_to_camel: true #列名称下划线转驼峰,默认为false
    value_encoder: json  #值编码
    redis_structure: string # 数据类型。 支持string、hash、list、set、sortedset类型(与redis的数据类型一致)
    redis_key_prefix: USER_ #key的前缀
    redis_key_column: USER_NAME #使用哪个列的值作为key,不填写默认使用主键
   

同步到Redis的数据如下:

同步为hash类型

配置如下:

    column_underscore_to_camel: true #列名称下划线转驼峰,默认为false
    value_encoder: json  #值编码,支持json、kv-commas、v-commas
    redis_structure: hash 
    redis_key_value: user_cache #key的值(固定值);当redis_structure为hash、list、set、sortedset此值不能为空
    redis_hash_field_prefix: user_name_ #hash的field前缀,仅redis_structure为hash时起作用
    redis_hash_field_column: user_name #使用哪个列的值作为hash的field,仅redis_structure为hash时起作用,不填写默认使用主键

同步到Redis的数据如下:

使用规则能将一个table映射成为一个HASH,如果需要将talbe中的每一行映射成一个HASH,可以使用Lua脚本实现,详请参见下面示例。

同步为list类型

配置如下:

    value_formatter: ${ID}|${USER_NAME} # 值格式化表达式,如:${ID}|${USER_NAME},${ID}表示字段id的值、${USER_NAME}表示字段name的值
    redis_structure: list 
    redis_key_value: user_list #key的值(固定值);当redis_structure为hash、list、set、sortedset此值不能为空

value_formatter为值格式化表达式,value_formatter不为空时value_encoder无效。

同步到Redis的数据如下:

同步为set类型

配置如下:

    value_formatter: ${ID}|${USER_NAME} #值格式化表达式,如:${ID}|${USER_NAME},${ID}表示字段id的值、${USER_NAME}表示字段name的值
    redis_structure: set
    redis_key_value: user_set #key的值(固定值);当redis_structure为hash、list、set、sortedset此值不能为空

同步到Redis的数据如下:

同步为Sorted Set类型

t_user表,数据如下:

配置如下:

    value_formatter: ${ID}|${USER_NAME} #值格式化表达式,如:${ID}|${USER_NAME},${ID}表示字段id的值、${USER_NAME}表示字段name的值
    redis_structure: sortedset
    redis_key_value: users #key的值(固定值);当redis_structure为hash、list、set、sortedset此值不能为空
    redis_sorted_set_score_column: CREATE_TIME  #sortedset的score,当数据类型为sortedset时,此项不能为空,此项的值应为数字类型

同步到Redis的数据如下:

四、Lua脚本

使用Lua脚本可以实现更复杂的数据处理逻辑,go-mysql-transfer支持Lua5.1语法。

示例

t_user表,数据如下:

引入Lua脚本:

rule:
  -
    schema: eseap
    table: t_user
    lua_file_path: lua/t_user_redis.lua   #lua脚本文件

Lua脚本:

local json = require("json")   -- 加载json模块
local ops = require("redisOps") --加载redis操作模块

local row = ops.rawRow()  --数据库当前变更的一行数据,table类型,key为列名称
local action = ops.rawAction()  --当前数据库事件,包括:insert、updare、delete

local id = row["ID"] --获取ID列的值
local userName = row["USER_NAME"] --获取USER_NAME列的值
local key = "user_"..id -- 定义key

if action == "delete" -- 删除事件
then
    ops.DEL(key) 
    ops.SREM("user_set",userName) 
else 
    local password = row["PASSWORD"] --获取USER_NAME列的值
    local createTime = row["CREATE_TIME"] --获取CREATE_TIME列的值
    local result = {}  -- 定义一个table
    result["id"] = id
    result["userName"] = userName
    result["password"] = password
    result["createTime"] = createTime
    result["source"] = "binlog" -- 数据来源
    local val = json.encode(result) -- 将newTable转为json
    ops.SET(key,val) -- 对应Redis的SET命令,第一个参数为key(支持string类型),第二个参数为value
    
    if action == "update" -- 修改事件
    then
        local oldRow = ops.rawOldRow()  --数据库变更之前的数据(修改之前的数据)
        local oldUserName = oldRow["USER_NAME"] --获取USER_NAME列的值
        ops.SREM("user_set",oldUserName) -- 删除旧值
    end
    
    ops.SADD("user_set",userName) -- 对应Redis的SADD命令,第一个参数为key(支持string类型),第二个参数为value
end 

同步到Redis的数据如下:

string类型
set类型

将talbe中的一行映射成一个HASH,脚本如下:

local ops = require("redisOps") --加载redis操作模块

local row = ops.rawRow()  --当前数据库的一行数据,table类型,key为列名称
local action = ops.rawAction()  --当前数据库事件,包括:insert、updare、delete

if action == "insert" -- 只监听insert事件
then
    local key = row["USER_NAME"] --获取USER_NAME列的值
   
    local id = row["ID"] --获取ID列的值
    local userName = row["USER_NAME"] --获取USER_NAME列的值
    local password = row["PASSWORD"] --获取PASSWORD列的值
    local createTime = row["CREATE_TIME"] --获取CREATE_TIME列的值
    
    ops.HSET(key,"id",id) -- 对应Redis的HSET命令
    ops.HSET(key,"userName",userName) -- 对应Redis的HSET命令
    ops.HSET(key,"password",password) -- 对应Redis的HSET命令
    ops.HSET(key,"createTime",createTime) -- 对应Redis的HSET命令
end

同步到Redis的数据如下:

redisOps模块提供的方法如下:

  1. SET: Redis字符串命令,设置指定key的值。如:ops.SET(key,val)
  2. DEL: Redis字符串命令,删除指定key的值。如:ops.DEL(key)
  3. HSET: Redishash命令,设置哈希表key中的字段field的值。如:ops.HSET(key,field,val)
  4. HDEL: Redishash命令,设置哈希表key中的字段。如:ops.HDEL(key,field)
  5. RPUSH: Redis列表命令,将值插入到列表key的头部。如:ops.RPUSH(key,val)
  6. LREM: Redis列表命令,移除列表key的值。如:ops.LREM(key,val)
  7. SADD: Redis集合命令,向集合key添加值。如:ops.SADD(key,val)
  8. SREM: Redis集合命令,移除集合key的值。如:ops.SREM(key,val)
  9. ZADD: Redis有序集合命令,向有序集合key添加值。如:ops.ZADD(key,score,val)
  10. ZREM: Redis有序集合命令,移除有序集合key的值。如:ops.ZREM(key,val)
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,509评论 6 504
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,806评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,875评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,441评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,488评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,365评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,190评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,062评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,500评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,706评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,834评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,559评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,167评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,779评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,912评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,958评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,779评论 2 354

推荐阅读更多精彩内容