DataX3.0安装

Datax3.0使用说明

原文链接:https://github.com/alibaba/DataX/blob/master/introduction.md

一、datax3.0介绍

1、DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

image

2、DataX3.0框架设计

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

  1. Reader:数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  2. Writer:数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  3. Framework:用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
image

3、DataX3.0核心架构

1. 核心模块介绍:

  • DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
  • DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
  • 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
  • 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
  • DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0

2. DataX调度流程:

举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:

  • DataXJob根据分库分表切分成了100个Task。
  • 根据20个并发,DataX计算共需要分配4个TaskGroup。
  • 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。
image

二、Datax3.0安装部署

1、环境准备

Linux
jdk 1.8
python 2.7.5(datax是由python2开发的)

2.下载安装

wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
tar -zxvf  datax.tar.gz -C    /XXXXXXXXXXX
查看安装成功:在bin目录下执行 python datax.py ../job/job.json
python ./bin/datax.py ./job/job.json 
  • 报下面这个错
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 716177408, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 716177408 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /opt/module/datax/hs_err_pid7446.log
  • 报错是因为java内存不够了

vi  conf/core.json
# 下面这个内存默认是2G,调小点,重启或者杀死所有java进程,在运行datax.py
    "entry": {
        "jvm": "-Xms256M -Xmx256M",
        "environment": {}
    }
  • 继续测试成功
2020-10-17 14:38:30.743 [job-0] INFO  JobContainer - PerfTrace not enable!
2020-10-17 14:38:30.743 [job-0] INFO  StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.041s |  All Task WaitReaderTime 0.094s | Percentage 100.00%
2020-10-17 14:38:30.744 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2020-10-17 14:38:20
任务结束时刻                    : 2020-10-17 14:38:30
任务总计耗时                    :                 10s
任务平均流量                    :          253.91KB/s
记录写入速度                    :          10000rec/s
读出记录总数                    :              100000
读写失败总数                    :                   0

3、查看配置文件

在bin目录下已经给出了样例配置,但不同的数据源配置文件不一样。通过命令查看配置模板

python datax.py -r {YOUR_READER} -w {YOUR_WRITER}

示例:[xxx@xxxbin]$ python datax.py -r mysqlreader -w hdfswriter

4、Reader插件和Writer插件

DataX3.0版本提供的Reader插件和Writer插件,每种读插件都有一种和多种切分策略

"reader": {
    "name": "mysqlreader", #从mysql数据库获取数据(也支持sqlserverreader,oraclereader)
    "name": "txtfilereader", #从本地获取数据
    "name": "hdfsreader", #从hdfs文件、hive表获取数据
    "name": "streamreader", #从stream流获取数据(常用于测试)
    "name": "httpreader", #从http URL获取数据
    }
"writer": {
    "name":"hdfswriter", #向hdfs,hive表写入数据
    "name":"mysqlwriter ", #向mysql写入数据(也支持sqlserverwriter,oraclewriter)
    "name":"streamwriter ", #向stream流写入数据。(常用于测试)   
    }

5、json配置文件模板

  1. 整个配置文件是一个job的描述;
  2. job下面有两个配置项,content和setting,其中content用来描述该任务的源和目的端的信息,setting用来描述任务本身的信息;
  3. content又分为两部分,reader和writer,分别用来描述源端和目的端的信息;
  4. setting中的speed项表示同时起几个并发去跑该任务。
1. mysql_to_hive示例
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "querySql": "", #自定义sql,支持多表关联,当用户配置querySql时,直接忽略table、column、where条件的配置。
                        "fetchSize": "", #默认1024,该配置项定义了插件和数据库服务器端每次批量数据获取条数,该值决定了DataX和服务器端的网络交互次数,能够较大的提升数据抽取性能,注意,该值过大(>2048)可能造成DataX进程OOM
                        "splitPk": "db_id", #仅支持整形型数据切分;如果指定splitPk,表示用户希望使用splitPk代表的字段进行数据分片,如果该值为空,代表不切分,使用单通道进行抽取
                        "column": [], #"*"默认所有列,支持列裁剪,列换序
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://IP:3306/database?useUnicode=true&characterEncoding=utf8"], 
                                "table": [] #支持多张表同时抽取
                            }
                        ],
                        "password": "",
                        "username": "",
                        "where": "" #指定的column、table、where条件拼接SQL,可以指定limit 10,也可以增量数据同步,如果该值为空,代表同步全表所有的信息
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [], #必须指定字段名,字段类型,{"name":"","tpye":""}
                        "compress": "", #hdfs文件压缩类型,默认不填写意味着没有压缩。其中:text类型文件支持压缩类型有gzip、bzip2;orc类型文件支持的压缩类型有NONE、SNAPPY(需要用户安装SnappyCodec)。 
                        "defaultFS": "", #Hadoop hdfs文件系统namenode节点地址。
                        "fieldDelimiter": "", #需要用户保证与创建的Hive表的字段分隔符一致
                        "fileName": "", #HdfsWriter写入时的文件名,需要指定表中所有字段名和字段类型,其中:name指定字段名,type指定字段类型。 
                        "fileType": "", #目前只支持用户配置为”text”或”orc”
                        "path": "", #存储到Hadoop hdfs文件系统的路径信息,hive表在hdfs上的存储路径
                        "hadoopConfig": {} #hadoopConfig里可以配置与Hadoop相关的一些高级参数,比如HA的配置。 
                        "writeMode": "" #append,写入前不做任何处理,文件名不冲突;nonConflict,如果目录下有fileName前缀的文件,直接报错。 
                    }
                }
            }
        ],
        "setting": {
            "speed": { #流量控制
                "byte": 1048576, #控制传输速度,单位为byte/s,DataX运行会尽可能达到该速度但是不超过它
                "channel": ""  #控制同步时的并发数
                    }
            "errorLimit": { #脏数据控制
                "record": 0 #对脏数据最大记录数阈值(record值)或者脏数据占比阈值(percentage值,当数量或百分比,DataX Job报错退出
            }
        }
    }
}

2. hive_to_mysql示例
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "hdfsreader",
                    "parameter": {
                        "column": [], #"*"默认所有列,指定Column信息时,type必须填写,index/value必须选择其一。 
                        "defaultFS": "", #hdfs文件系统namenode节点地址
                        "encoding": "UTF-8", #默认UTF-8
                        "nullFormat": "", #文本文件中无法使用标准字符串定义null(空指针),例如:nullFormat:”\N”,那么如果源头数据是”\N”
                        "compress": "", #orc文件类型下无需填写
                        "hadoopConfig": {}, #hadoopConfig里可以配置与Hadoop相关的一些高级参数,比如HA的配置。 
                        "fieldDelimiter": ",", #默认",";读取textfile数据时,需要指定字段分割符,HdfsReader在读取orcfile时,用户无需指定字段分割符
                        "fileType": "orc", #文件的类型,目前只支持用户配置为”text”、”orc”、”rc”、”seq”、”csv”。 
                        "path": "" #文件路径,支持多文件读取,可以使用"*",也可以指定通配符遍历多文件,单文件只能单线程,多文件可以多线程,线程并发数通过通道数指定
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "column": [], #必须指定,不能留空;如果要依次写入全部列,使用表示, 例如: "column": [""],强烈不建议
                        "batchSize": "", #默认值1024 一次性批量提交的记录数大小,该值可以极大减少DataX与Mysql的网络交互次数,并提升整体吞吐量。但是该值设置过大可能会造成DataX运行进程OOM情况。
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://IP:3306/database?useUnicode=true&characterEncoding=utf8",
                                "table": [] #支持写入一个或者多个表。当配置为多张表时,必须确保所有表结构保持一致。
                            }
                        ],
                        "password": "",
                        "preSql": [], #写入数据到目的表前,会先执行这里的标准语句。例在导入表前先进行删除操作:["delete from 表名"]
                        "postSql":[], #写入数据到目的表后,会执行这里的标准语句。(原理同 preSql )
                        "session": [], #DataX在获取Mysql连接时,执行session指定的SQL语句,修改当前connection session属性
                        "username": "",
                        "writeMode": "" #默认insert ,可选insert/replace/update 
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": ""
            }
            "errorLimit": { #脏数据控制
                "record": 0 #对脏数据最大记录数阈值(record值)或者脏数据占比阈值(percentage值,当数量或百分比,DataX Job报错退出
            }
        }
    }
}

三、Datax3.0使用

# trail_pigeon导入hive

#hive里面建表

CREATE TABLE ods_db_bidata.trail_pigeon (
    order_id int ,
    order_apply_time string
    )
stored as orc tblproperties ("orc.compress"="ZLIB");



#建shell脚本,执行python脚本前先清空目标表

#!/bin/bash

hive_db=ods_db_bidata
hive_table=trail_pigeon

hive -e "truncate table ${hive_db}.${hive_table}"

python /opt/app/datax/bin/datax.py /opt/app/datax/job/mysql2hive/trail_pigeon.json
#写json配置文件
{
    "job": {
        "content": [{
            "reader": {
                "name": "mysqlreader",
                "parameter": {
                    "column": ["*"],
                    "splitPk": "order_id",
                    "connection": [{
                        "jdbcUrl": ["jdbc:mysql://ip:3306/bidata?useUnicode=true&characterEncoding=utf8"],
                        "table": ["trail_pigeon"]
                        }],
                        "password": "password",
                        "username": "username",
                        "where": ""
                    }
                    },
                    "writer": {
                        "name": "hdfswriter",
                        "parameter": {
                            "column": [{"name": "order_id","type": "int"},
                            {"name": "order_apply_time","type": "string"}
                            ],
                            "compress": "SNAPPY",
                            "defaultFS": "hdfs://192.168.0.127:8020",
                            "fieldDelimiter": "\u0001",
                            "fileName": "trail_pigeon",
                            "fileType": "orc",
                            "path": "/hive/warehouse/ods_db_bidata.db/trail_pigeon",
                            "writeMode": "nonConflict"
                        }
                    }
                    }],
                    "setting": {
                        "speed": {
                            "channel": "5"
                        }
                    }
                }
            }

四、Datax3.0支持的reader和writer


image.png

image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,525评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,203评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,862评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,728评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,743评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,590评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,330评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,244评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,693评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,885评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,001评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,723评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,343评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,919评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,042评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,191评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,955评论 2 355