步骤
先说总体步骤:
- 下载源码,并编译到本地
maven
仓库[上传私服(可选)]; -
pom
文件依赖datax-core
和需要的reader
和writer
- 环境变量设置
datax.home
(或者利用System#setProperty(String)
)和一些需要替换脚本中的变量:脚本中${}
占位符的变量将被系统变量替换。 - 将datax.tar.gz中解压出来的
conf
、plugin
等文件放到datax.home目录中。 - 构造参数数组:
{"-job", "xxx.json", "-mode", "standalone", "-jobid", "-1"}
- 调用
Engin#main(String[])
或者Engine#entry(String[])
引言
目前官方的使用指南里都是利用python来调用dataX执行任务。而且现有的博客基本上也是利用java来调用python命令Runtime.getRuntime().exec()
来执行。
个人感觉,dataX未提供java集成开发的方法,应该是定位生产系统,运维需要吧?!
我们的业务场景:执行完dataX的job之后,还有一定的业务逻辑,所以希望在java应用里调用dataX执行完job之后,再执行后续逻辑。
DataX分析
笔者简单的看了一下午的DataX的逻辑,完全以使用者的视角分析DataX,必然不能完全了解DataX的整个执行过程。
本文仅分析如果能够在java代码里集成DataX进行开发。
集成准备
DataX没有将代码上传到maven服务器上,所以需要自己先pull代码到本地,编译,才能在集成开发的使用通过pom引用。有条件的可以上传到自己的私服上。
代码地址
代码依赖
通过pom文件加入datax-core
:
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-core</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
如果需要对应的reader
和writer
的话,加入到pom文件中,比如需要streamreader和streamwriter:
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>streamreader</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>streamwriter</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
要依赖datax一定要保证有对应的源码或者编译到本机的maven repository或者在对应的私服上有上传相应的编译版本,不然pom文件是找不到依赖的。
为了集成开发,可能需要一口气引用所有的reader和writer,目前所知,就得一个一个写,如果大家有好办法,麻烦告知!
准备相应的文件
从com.alibaba.datax.core.util.container.CoreConstant
中可以看到,datax.home
很重要,很多文件的读取都是在datax.home
里面获取的。就如我们在安装版的datax中可以看到里面一些目录一样
$ ll
total 4
drwxr-xr-x 2 mcbadm mcb 56 Sep 20 18:28 bin
drwxr-xr-x 2 mcbadm mcb 65 Sep 20 18:28 conf
drwxr-xr-x 2 mcbadm mcb 21 Sep 20 18:28 job
drwxr-xr-x 2 mcbadm mcb 4096 Sep 20 18:28 lib
drwxr-xr-x 4 mcbadm mcb 32 Sep 20 18:28 plugin
drwxr-xr-x 2 mcbadm mcb 22 Sep 20 18:28 script
drwxr-xr-x 2 mcbadm mcb 23 Sep 20 18:28 tmp
目前所知的,Engine#entry
在解析配置的时候会读取conf目录下的文件,还有对应plugin/reader/xxxreader、plugin/writer/xxxwriter的plugin.json文件:
{
"name": "streamreader",
"class": "com.alibaba.datax.plugin.reader.streamreader.StreamReader",
"description": {
"useScene": "only for developer test.",
"mechanism": "use datax framework to transport data from stream.",
"warn": "Never use it in your real job."
},
"developer": "alibaba"
}
编写代码
编写job代码:
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 1,
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello,你好,世界-DataX,现在是${now}"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
写个测试类吧:
import java.time.LocalTime;
import com.alibaba.datax.core.Engine;
public class EngineTest {
public static void main(String[] args) {
System.setProperty("datax.home", getCurrentClasspath());
System.setProperty("now", LocalTime.now().toString());// 替换job中的占位符
String[] datxArgs = {"-job", getCurrentClasspath() + "/job/stream2stream.json", "-mode", "standalone", "-jobid", "-1"};
try {
Engine.entry(datxArgs);
} catch (Throwable e) {
e.printStackTrace();
}
}
public static String getCurrentClasspath() {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
String currentClasspath = classLoader.getResource("").getPath();
// 当前操作系统
String osName = System.getProperty("os.name");
if (osName.startsWith("Windows")) {
// 删除path中最前面的/
currentClasspath = currentClasspath.substring(1);
}
return currentClasspath;
}
}
datax在解析完配置后,会将core.json,job.json,plugin.json合并在一起:
{
"common": {
"column": {
"dateFormat": "yyyy-MM-dd",
"datetimeFormat": "yyyy-MM-dd HH:mm:ss",
"encoding": "utf-8",
"extraFormats": [
"yyyyMMdd"
],
"timeFormat": "HH:mm:ss",
"timeZone": "GMT+8"
}
},
"core": {
"container": {
"job": {
"id": -1,
"reportInterval": 10000
},
"taskGroup": {
"channel": 5
},
"trace": {
"enable": "false"
}
},
"dataXServer": {
"address": "http://localhost:7001/api",
"reportDataxLog": false,
"reportPerfLog": false,
"timeout": 10000
},
"statistics": {
"collector": {
"plugin": {
"maxDirtyNumber": 10,
"taskClass": "com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector"
}
}
},
"transport": {
"channel": {
"byteCapacity": 67108864,
"capacity": 512,
"class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel",
"flowControlInterval": 20,
"speed": {
"byte": -1,
"record": -1
}
},
"exchanger": {
"bufferSize": 32,
"class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger"
}
}
},
"entry": {
"jvm": "-Xms1G -Xmx1G"
},
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello,你好,世界-DataX"
}
],
"sliceRecordCount": 1
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
},
"plugin": {
"reader": {
"streamreader": {
"class": "com.alibaba.datax.plugin.reader.streamreader.StreamReader",
"description": {
"mechanism": "use datax framework to transport data from stream.",
"useScene": "only for developer test.",
"warn": "Never use it in your real job."
},
"developer": "alibaba",
"name": "streamreader",
"path": "D:/workspace/datax-test/target/test-classes/\\plugin\\reader\\streamreader"
}
},
"writer": {
"streamwriter": {
"class": "com.alibaba.datax.plugin.writer.streamwriter.StreamWriter",
"description": {
"mechanism": "use datax framework to transport data to stream.",
"useScene": "only for developer test.",
"warn": "Never use it in your real job."
},
"developer": "alibaba",
"name": "streamwriter",
"path": "D:/workspace/datax-test/target/test-classes/\\plugin\\writer\\streamwriter"
}
}
}
}
说说插件原理
每个reader和writer都有自己的plugin.json文件,里面最重要的就是class配置了,这个类的全路径配置用于classloader将其加载进来并通过反射将其实例化。加载代码可看com.alibaba.datax.core.util.container.LoadUtil
所以我们在集成的时候,plugin目录下面不需要有jar包了,只需要放json文件就行,因为我们通过pom文件依赖了对应的reader和writer,说白了,就是classpath下面有对应的reader和writer即可。
结束语
文章有点长,记录了一个下午的研究结果,应该有很多不完善的地方,希望可以和大家多交流。如果觉得有帮助,可以点个赞。