最近公司有不少关于数据同步的业务需求,比如需要将mysql表同步到hive中或者同步为parquet等格式存放在HDFS中,这种类型的需求一般不是简单的同步,而是需要将mysql的数据进行处理,然后将结果表的数据放入指定的数据源中。得益于威廉大哥开发的Streamingpro这一利器,同步数据,数据处理,最后放入指定的数据源中,这一连串的工作变得十分简单,只需要一个json格式的配置文件即可轻松搞定,实在是大大的提高了工作效率。关于streamingPro的使用请参考威廉的简书相关的文章,下文主要介绍遇到的一些问题:
mysql timestamp类型转换错误
需求是要将mysql的表数据同步至hive中,利用streamingPro是很容易实现的,只需要配置一个输入源,一个处理SQL语句,配置文件大概如下:
{
"chinaDrug2hive": {
"desc": "",
"strategy": "spark",
"algorithm": [],
"ref": [],
"compositor": [
{
"name": "batch.sources",
"params": [
{
"url": "jdbc:mysql://localhost :3306/DB?user=username&password=password",
"dbtable": "mysqlTableName",
"driver": "com.mysql.jdbc.Driver",
"path": "-",
"format": "jdbc",
"outputTable": "tableName"
}
]
},
{
"name": "batch.sql",
"params": [
{
"sql": "drop table db.hivetableName",
"outputTableName": "-"
}
]
},
{
"name": "batch.sql",
"params": [
{
"sql": "create table if not exists db.hivetableName as select * from tableName",
"outputTableName": "-"
}
]
}
],
"configParams": {
}
}
}
关于mysql相关的配置参数的配置,其实就是spark访问mysql需要配置的几个参数。上面的示例中,考虑到与mysql表解耦,即当mysql表结构由于业务或其他原因发生变化,配置文件不需要发生任何变化,故而用到了create table as select 语句,而没有用insert into语句。本来跟容易搞定的事情,因为mysql表中有字段类型是datetime,且未设置为not null。在运行时,会出现:
Cause: java.sql.SQLException: Value '0000-00-00 00:00:00' can not be represented as java.sql.Timestamp。
解决的办法是在URL中添加一个参数:zeroDateTimeBehavior=convertToNull
问题得到解决。
tinyint类型自动转换成boolean类型
datetime类型得到了解决,数据也顺利写到hive表中了,原以为大功告成了。使用hive表的数据进行测试时,同事反应,tinyint类型的被转换成了boolean型。导致原本写好的SQL脚本不能运行,tinyint中存储的也不只有0和1两个值,所以转换成boolean类型是会导致错误的。解决此问题的方法也是在URL添加一个参数:tinyInt1isBit=false,再次运行重新同步数据,问题得到解决。
分区表问题
这是在使用streamingPro将表数据存为parquet文件,但是结果表是按日期进行分区的分区表。这种情况可以分为两种情况来考虑:
如果分区列本身就是表中列,那么可以使用如下方法:
{
"name": "batch.outputs",
"params": [
{
"name": "outName",
"format": "parquet",
"inputTableName": "inputTableName",
"path": "/user/zhang/Data/inputTableName",
"partitionBy":"hp_stat_date",
"mode": "Overwrite"
}
]
}
如果分区列不是表的中列,那么只需要将路径通过参数动态传入:
YESTERDAY=$(date -d "@$i" "+%Y-%m-%d")
HiveOutputTable=/user/zhang/Data/tableName/hp_stat_date=$YESTERDAY
spark-submit \
--class streaming.core.StreamingApp \
--master yarn-cluster \
--num-executors 4 \
--executor-memory 12G \
--executor-cores 1 \
--driver-memory 10G \
--name result_table \
/home/zhangzl/streamingpro/streamingpro-spark-0.4.14-SNAPSHOT.jar \
-streaming.name result_table \
-streaming.platform spark \
-streaming.jobs XXX \
-streaming.enableHiveSupport true \
-streaming.sql.params.YESTERDAY $YESTERDAY \
-streaming.sql.out.outName.path $HiveOutputTable \
-streaming.job.file.path /user/zhang/test.json
在tableName的文件夹路径下,会生成
hp_stat_date=$YESTERDAY一系列的子目录。以上的提交命令中包含了如何向streamingPro中添加参数,-streaming.sql.params.YESTERDAY $YESTERDAY
代表在SQL语句中传入参数,-streaming.sql.out.outName.path $HiveOutputTable \表示的是在输出中添加参数。更多关于streamingPro的文章,请参看威廉的的简书,里面还有大量Spark,ES等相关的优质文章,满满的干货。