测试mysql与greenplum实时同步

测试mysql与greenplum实时同步

一.测试服务器环境准备

1.集群服务器ip及主机名

192.168.10.225 artemis.hadoop.com
192.168.10.226 uranus.hadoop.com
192.168.10.227 ares.hadoop.com
192.168.10.228 bird.hadoop.com
192.168.10.229 lover.hadoop.com

  • 修改命令

    vim /etc/hosts

2.zookeeper环境

  • 节点服务器

    server.1=uranus.hadoop.com:2888:3888
    server.2=ares.hadoop.com:2888:3888
    server.3=bird.hadoop.com:2888:3888

    分别对应226 227 228

  • 安装位置

    /usr/local/zookeeper-3.4.6/bin/

  • 配置文件位置

    /usr/local/zookeeper-3.4.6/bin/../conf/zoo.cfg

  • 安装源文件位置

    /app/zookeeper-3.4.5.tar.gz

  • 下载地址https://archive.apache.org/dist/zookeeper/

  • 环境配置vim /etc/profile

    #zookeeper
    ZOOKEEPER_HOME=/usr/local/zookeeper-3.4.6/
    export PATH=$ZOOKEEPER_HOME/bin:$PATH
    
  • 启动、关闭、查看命令

        zkServer.sh start
        ZooKeeper JMX enabled by default
        Using config: /ryyf/apache-zookeeper-3.5.9-bin/bin/../conf/zoo.cfg
        Starting zookeeper ... STARTED
    
         zkServer.sh stop
        ZooKeeper JMX enabled by default
        Using config: /ryyf/apache-zookeeper-3.5.9-bin/bin/../conf/zoo.cfg
        Stopping zookeeper ... STOPPED
    
        zkServer.sh status
        ZooKeeper JMX enabled by default
        Using config: /ryyf/apache-zookeeper-3.5.9-bin/bin/../conf/zoo.cfg
        Client port found: 2181. Client address: localhost. Client SSL: false.
        Mode: follower
    

3.java环境

  • 多版本jdk安装并把jdk11设置为非默认版本

  • java 路径

    /app/jdk1.8.0_144/bin/java

    /app/jdk1.8.0_201/bin/java

    /app/jdk-11.0.12/bin/java

alternatives --install /usr/bin/java java /app/jdk-11.0.12/bin/java 300

alternatives --install /usr/bin/java java /app/jdk1.8.0_201/bin/java 1

alternatives --install /usr/bin/java java /app/jdk1.8.0_144/bin/java 100

alternatives --config java

There are 3 programs which provide 'java'.

  Selection    Command
-----------------------------------------------
 + 1           /app/jdk1.8.0_201/bin/java
*  2           /app/jdk-11.0.12/bin/java
   3           /app/jdk1.8.0_144/bin/java
  • java版本

    java version "1.8.0_144"
    Java(TM) SE Runtime Environment (build 1.8.0_144-b01)
    Java HotSpot(TM) 64-Bit Server VM (build 25.144-b01, mixed mode)
    
  • 安装源文件位置

    /app/jdk-8u144-linux-x64.tar.gz

  • profile位置及修改

    vim /etc/profile
    JAVA_HOME=/app/jdk1.8.0_201
    JRE_HOME=$JAVA_HOME/jre
    PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin:
    CLASSPATH=.:$JRE_HOME:/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib:
    export PATH JAVA_HOME JRE_HOME CLASSPATH
    
    

4.kafka环境

5.maxwell环境

6.canel

 mkdir /root/canal
 tar zxvf canal.deployer-1.0.23.tar.gz  -C /app/canal

二.各自配置文件及详细说明

1.jdk11多版本共存安装

2.zookeeper

3.kafka vim /usr/local/kafka/config/server.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# host.name=uranus.hadoop.com
############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
# listeners=PLAINTEXT://172.16.2.226:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
# advertised.listeners=PLAINTEXT://172.16.2.226:9092
advertised.host.name=uranus.hadoop.com
advertised.port=9092

zookeeper.connect=uranus.hadoop.com:2181,ares.hadoop.com:2181,bird.hadoop.com:2181
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=40

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=24

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
参数名称 参数值 备注
broker.id 1 broker.id的值三个节点要配置不同的值,分别配置为0,1,2
advertised.host.name uranus.hadoop.com 在hosts文件配置kafka1域名,另外两台分别为:kafka2.sd.cn,kafka3.sd.cn
advertised.port 9092 默认端口,不需要改
log.dirs /tmp/kafka-logs
num.partitions 40 分区数,自行修改
log.retention.hours 24 日志保存时间
zookeeper.connect uranus.hadoop.com:2181,ares.hadoop.com:2181,bird.hadoop.com:2181 zookeeper连接地址,多个以逗号隔开
listeners PLAINTEXT://172.16.2.226:9092 就是主要用来定义Kafka Broker的Listener的配置项。本案不填写
advertised.listeners PLAINTEXT://172.16.2.226:9092 参数的作用就是将Broker的Listener信息发布到Zookeeper中,如果不配置采用listeners,本案不填写
hostname 本案不填写

4.canal配置过程

  • 创建用户

    CREATE USER 'canal'@'localhost' IDENTIFIED BY 'canal';
    GRANT ALL PRIVILEGES ON *.* TO 'canal'@'localhost' WITH GRANT OPTION;
    CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
    GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' WITH GRANT OPTION;
    flush privileges;
    
    
  • 修改配置文件:(如果是访问本机,并且用户密码都为canal则不需要修改配置文件)

    vi /app/canal/conf/example/instance.properties
    #################################################
    ## mysql serverId , v1.0.26+ will autoGen
    canal.instance.mysql.slaveId=225
    
    # enable gtid use true/false
    canal.instance.gtidon=true
    
    # position info
    canal.instance.master.address=192.168.10.216:3306
    canal.instance.master.journal.name=
    canal.instance.master.position=
    canal.instance.master.timestamp=
    canal.instance.master.gtid=
    
    # rds oss binlog
    canal.instance.rds.accesskey=
    canal.instance.rds.secretkey=
    canal.instance.rds.instanceId=
    
    # table meta tsdb info
    canal.instance.tsdb.enable=true
    #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
    #canal.instance.tsdb.dbUsername=canal
    #canal.instance.tsdb.dbPassword=canal
    
    #canal.instance.standby.address =
    #canal.instance.standby.journal.name =
    #canal.instance.standby.position =
    #canal.instance.standby.timestamp =
    #canal.instance.standby.gtid=
    
    # username/password
    canal.instance.dbUsername=canal
    canal.instance.dbPassword=canal
    canal.instance.connectionCharset = UTF-8
    # enable druid Decrypt database password
    canal.instance.enableDruid=false
    #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
    
    # table regex
    canal.instance.filter.regex=.*\\..*
    # table black regex
    canal.instance.filter.black.regex=mysql\\.slave_.*
    # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
    #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
    # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
    #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
    
    # mq config
    canal.mq.topic=mysql_binlog
    # dynamic topic route by schema or table regex
    #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
    canal.mq.partition=0
    # hash partition config
    #canal.mq.partitionsNum=3
    #canal.mq.partitionHash=test.table:id^name,.*\\..*
    #canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
    #################################################
    
  • 修改canal 配置文件

vi /app/canal/conf/canal.properties 
#################################################
#########    common argument        #############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =

canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = kafka
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

#################################################
#########               destinations            #############
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = false

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
#########             MQ Properties      #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=

canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local

canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8

##################################################
#########                    Kafka                   #############
##################################################
kafka.bootstrap.servers = 192.168.10.225:9092,192.168.10.226:9092,192.168.10.228:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0

kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"

##################################################
#########                   RocketMQ         #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 127.0.0.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =

##################################################
#########                   RabbitMQ         #############
##################################################
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.deliveryMode =

三.各自环境测试

1.zookeeper测试

2.kafka环境测试

1、创建topic:test

/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper uranus.hadoop.com:2181,ares.hadoop.com:2181,bird.hadoop.com:2181 --replication-factor 1 --partitions 1 --topic test

2、列出已创建的topic列表

/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper uranus.hadoop.com:2181,ares.hadoop.com:2181,bird.hadoop.com:2181 


3、模拟客户端去发送消息 在其中一台测试!

/usr/local/kafka/bin/kafka-console-producer.sh --broker-list uranus.hadoop.com:9092,artemis.hadoop.com:9092,bird.hadoop.com:9092 --topic test

4、模拟客户端去接受消息 新版本的接收语句不一样不是 --zookeeper uranus.hadoop.com:2181,ares.hadoop.com:2181,bird.hadoop.com:2181 !!!

/usr/local/kafka/bin/kafka-console-consumer.sh  --bootstrap-server uranus.hadoop.com:9092,artemis.hadoop.com:9092,bird.hadoop.com:9092 --topic test --from-beginning
删除topic
/usr/local/kafka/bin/kafka-topics.sh  --delete --zookeeper uranus.hadoop.com:2181,ares.hadoop.com:2181,bird.hadoop.com:2181   --topic mysql_binlog

     (1)登录zookeeper客户端:命令:/usr/local/zookeeper-3.4.6/bin/zookeeper-client

     (2)找到topic所在的目录:ls /brokers/topics

     (3)找到要删除的topic,执行命令:rmr /brokers/topics/【topic name】即可,此时topic被彻底删除。
     
 删除kafka存储目录(server.properties文件log.dirs配置,默认为"/tmp/kafka-logs")相关topic目录

3.maxwell 测试 这玩意需要jdk11!!!

1.创建topic:mysql_binlog
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper uranus.hadoop.com:2181,ares.hadoop.com:2181,bird.hadoop.com:2181  -replication-factor 3 --partitions 5 --topic mysql_binlog

2.在后台启动mysql和maxwell
/app/maxwell-1.35.5/bin/maxwell --user='root' --password='1234@Abcd' --port=3306 --host='192.168.10.216' --producer=kafka \
--kafka.bootstrap.servers=uranus.hadoop.com:9092,ares.hadoop.com:9092,bird.hadoop.com:9092 --kafka_topic=mysql_binlog &

/app/maxwell-1.35.5/bin/maxwell --config config.properties

3.监控kafka中的数据
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server uranus.hadoop.com:9092,artemis.hadoop.com:9092,bird.hadoop.com:9092 --topic mysql_binlog --from-beginning
[2022-01-11 13:40:12,523] WARN [Consumer clientId=consumer-1, groupId=console-consumer-81729] Connection to node -2 (ares.hadoop.com/192.168.10.227:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
{"database":"test","table":"home","type":"insert","ts":1641870563,"xid":76127446,"commit":true,"data":{"id":31223,"tab":"123","values":"131231"}}
{"database":"test","table":"home","type":"insert","ts":1641871080,"xid":76127964,"commit":true,"data":{"id":666,"tab":"666","values":"66666"}}
{"database":"test","table":"home","type":"update","ts":1641871200,"xid":76128473,"commit":true,"data":{"id":66644,"tab":"666","values":"66666"},"old":{"id":666}}
{"database":"test","table":"home","type":"update","ts":1641871967,"xid":76129546,"commit":true,"data":{"id":4234234,"tab":"123","values":"123"},"old":{"id":88881}}
{"database":"test","table":"home","type":"update","ts":1641872049,"xid":76129898,"commit":true,"data":{"id":6666667,"tab":"5555","values":"555555"},"old":{"id":5555}}
{"database":"test","table":"home","type":"update","ts":1641872403,"xid":76130973,"commit":true,"data":{"id":243324,"tab":"123","values":"123"},"old":{"id":4234234}}


4.gpkafka配置及测试-maxwell版

DATABASE: test
USER: gpadmin
HOST: 192.168.10.227
PORT: 5432
VERSION: 2
KAFKA:
   INPUT:
      SOURCE:
        BROKERS: 192.168.10.225:9092,192.168.10.226:9092,192.168.10.228:9092
        TOPIC: mysql_binlog
      VALUE:
        COLUMNS:
          - NAME: c1
            TYPE: json
        FORMAT: json
      ERROR_LIMIT: 100
   OUTPUT:
      SCHEMA: test
      TABLE: home
      MAPPING:
        - NAME: id
          EXPRESSION: (c1->'data'->>'id')::decimal
        - NAME: tab
          EXPRESSION: (c1->'data'->>'tab')::text
        - NAME: values
          EXPRESSION: (c1->'data'->>'values')::text
   COMMIT:
      MINIMAL_INTERVAL: 2000
  • 注意json格式!

kafka中的json数据格式:

插入

{"database":"test","table":"home","type":"insert","ts":1641885332,"xid":76159057,"commit":true,"data":{"id":5543223,"tab":"23","values":"2234"}}

修改

{"database":"test","table":"home","type":"update","ts":1641871200,"xid":76128473,"commit":true,"data":{"id":66644,"tab":"666","values":"66666"},"old":{"id":666}}

5.canal测试

  • 启动canal

    sh /app/canal/bin/startup.sh
    
    cd to /root/canal/bin for workaround relative path
    LOG CONFIGURATION : /root/canal/bin/../conf/logback.xml
    canal conf : /root/canal/bin/../conf/canal.properties
    CLASSPATH :/root/canal/bin/../conf:/root/canal/bin/../lib/zookeeper-3.4.5.jar:/root/canal/bin/../lib/zkclient-0.1.jar:/root/canal/bin/../lib/spring-2.5.6.jar:/root/canal/bin/../lib/slf4j-api-1.7.12.jar:/root/canal/bin/../lib/protobuf-java-2.6.1.jar:/root/canal/bin/../lib/oro-2.0.8.jar:/root/canal/bin/../lib/netty-3.2.5.Final.jar:/root/canal/bin/../lib/logback-core-1.1.3.jar:/root/canal/bin/../lib/logback-classic-1.1.3.jar:/root/canal/bin/../lib/log4j-1.2.14.jar:/root/canal/bin/../lib/jcl-over-slf4j-1.7.12.jar:/root/canal/bin/../lib/guava-18.0.jar:/root/canal/bin/../lib/fastjson-1.1.35.jar:/root/canal/bin/../lib/commons-logging-1.1.1.jar:/root/canal/bin/../lib/commons-lang-2.6.jar:/root/canal/bin/../lib/commons-io-2.4.jar:/root/canal/bin/../lib/commons-beanutils-1.8.2.jar:/root/canal/bin/../lib/canal.store-1.0.23.jar:/root/canal/bin/../lib/canal.sink-1.0.23.jar:/root/canal/bin/../lib/canal.server-1.0.23.jar:/root/canal/bin/../lib/canal.protocol-1.0.23.jar:/root/canal/bin/../lib/canal.parse.driver-1.0.23.jar:/root/canal/bin/../lib/canal.parse.dbsync-1.0.23.jar:/root/canal/bin/../lib/canal.parse-1.0.23.jar:/root/canal/bin/../lib/canal.meta-1.0.23.jar:/root/canal/bin/../lib/canal.instance.spring-1.0.23.jar:/root/canal/bin/../lib/canal.instance.manager-1.0.23.jar:/root/canal/bin/../lib/canal.instance.core-1.0.23.jar:/root/canal/bin/../lib/canal.filter-1.0.23.jar:/root/canal/bin/../lib/canal.deployer-1.0.23.jar:/root/canal/bin/../lib/canal.common-1.0.23.jar:/root/canal/bin/../lib/aviator-2.2.1.jar:.:/usr/java/jdk1.8.0_121/lib
    cd to /root/canal for continue
    
    
    
  • 关闭canal

sh /app/canal/bin/stop.sh
master1: stopping canal 16062 ... 
Oook! cost:1
  • 相关日志位置
cat /app/canal/logs/canal/canal.log
cat /app/canal/logs/example/example.log
  • json样式
{"data":[{"id":"13","tab":"123","values":"1231231"}],"database":"test","es":1642058435000,"id":10,"isDdl":false,"mysqlType":{"id":"int(14)","tab":"varchar(20)","values":"varchar(60)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"tab":12,"values":12},"table":"home","ts":1642058440150,"type":"DELETE"}
{"data":[{"id":"32","tab":"12","values":"213"}],"database":"test","es":1642058451000,"id":11,"isDdl":false,"mysqlType":{"id":"int(14)","tab":"varchar(20)","values":"varchar(60)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"tab":12,"values":12},"table":"home","ts":1642058455906,"type":"INSERT"}

6.gpkafka配置及测试-canal版

  • 暂时只考虑插入的情况!

  • canal中的插入语句的json格式为:

    {"data":[{"id":"444","tab":"444","values":"4444","rrr":null}],"database":"test","es":1642062004000,"id":5,"isDdl":false,"mysqlType":{"id":"int(14)","tab":"varchar(21)","values":"varchar(60)","rrr":"varchar(50)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"tab":12,"values":12,"rrr":12},"table":"home","ts":1642062008970,"type":"INSERT"}
    
  • 格式不是标准的json需要处理

DATABASE: test
USER: gpadmin
HOST: 192.168.10.227
PORT: 5432
VERSION: 2
KAFKA:
   INPUT:
      SOURCE:
        BROKERS: 192.168.10.225:9092,192.168.10.226:9092,192.168.10.228:9092
        TOPIC: test_home
      VALUE:
        COLUMNS:
          - NAME: c1
            TYPE: json
        FORMAT: json
      ERROR_LIMIT: 100
   OUTPUT:
      SCHEMA: test
      TABLE: home
      MAPPING:
        - NAME: id
          EXPRESSION: ((c1#>>'{data,0}')::json->>'id')::decimal
        - NAME: tab
          EXPRESSION: ((c1#>>'{data,0}')::json->>'tab')::text
        - NAME: values
          EXPRESSION: ((c1#>>'{data,0}')::json->>'values')::text
   COMMIT:
      MINIMAL_INTERVAL: 2000
gpkafka load test_home.yaml

成功达成!

DATABASE: test
USER: gpadmin
HOST: 192.168.10.227
PORT: 5432
VERSION: 2
KAFKA:
   INPUT:
      SOURCE:
        BROKERS: 192.168.10.225:9092,192.168.10.226:9092,192.168.10.228:9092
        TOPIC: test_home
      VALUE:
        COLUMNS:
          - NAME: c1
            TYPE: json
        FORMAT: json
      ERROR_LIMIT: 100
   OUTPUT:
      SCHEMA: test
      MODE: MERGE
      MATCH_COLUMNS:
        - id
      UPDATE_COLUMNS:
        - tab
        - values
      ORDER_COLUMNS:
        - ts
        - xid
        - del_mark
        - ddl_type
      TABLE: home
      MAPPING:
        - NAME: id
          EXPRESSION: ((c1#>>'{data,0}')::json->>'id')::decimal
        - NAME: tab
          EXPRESSION: ((c1#>>'{data,0}')::json->>'tab')::text
        - NAME: values
          EXPRESSION: ((c1#>>'{data,0}')::json->>'values')::text
        - NAME: ts
          EXPRESSION: (c1->>'es')::decimal
        - NAME: xid
          EXPRESSION: (c1->>'id')::decimal
        - NAME: ddl_type
          EXPRESSION: (c1->>'type')::text
        - NAME: del_mark
          EXPRESSION: CASE WHEN ((c1->>'type')::text= 'DELETE') then true else false end
   COMMIT:
      MINIMAL_INTERVAL: 2000

https://segmentfault.com/a/1190000022567264 完成,是DELETE_CONDITION: (c1->>'type')::text = 'DELETE' 条件写错误

完美达成,可以实现一个虚拟删除记录

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,348评论 6 491
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,122评论 2 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,936评论 0 347
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,427评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,467评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,785评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,931评论 3 406
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,696评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,141评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,483评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,625评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,291评论 4 329
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,892评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,741评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,977评论 1 265
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,324评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,492评论 2 348

推荐阅读更多精彩内容