1.环境搭建
使用的官方的example-job-example中的elastic-job-example-lite-java
这里粘上github的地址:https://github.com/dangdangdotcom/elastic-job
下载后解压导入elastic-job-example maven工程
没装maven插件的需要装个maven插件,下载个最新的eclipse都配着有
项目导进去了,好像扯得有点偏,我们回到环境的搭建上
需要配置:
(1)zookeeper,这个网上的教程很详细
(2)mysql,还需要建个库命名为:elastic_job_log,后面会用到
(3)elastic-job-lite-console-2.1.2.tar.gz,这个百度搜一下吧,应该能下到。
2.环境的启动
(1)zookeeper的配置这里就不说了,打开目录进入bin
运行zkServer.cmd;
(2)把下载的这个解压elastic-job-lite-console-2.1.2.tar.gz,进入目录bin文件,运行start.bat启动服务后,用浏览器进入http://localhost:8899 看是否启动成功,默认的用户名:root,密码:root;
进入注册中心添加zookeeper
进入事件追踪数据添加数据库(数据这里要开启的,名字,地址也要对,不然连不上的)
到这里我们的环境也就配好了,现在进入最后一步。
3.修改代码并运行
添加aliyun仓库
http://maven.aliyun.com/nexus/content/groups/public/
然后修改一下代码
修改com.dangdang.ddframe.job.example.JavaMain.java
package com.dangdang.ddframe.job.example;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;
import com.dangdang.ddframe.job.config.script.ScriptJobConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration;
import com.dangdang.ddframe.job.example.job.dataflow.JavaDataflowJob;
import com.dangdang.ddframe.job.example.job.simple.JavaSimpleJob;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.apache.commons.dbcp.BasicDataSource;
import javax.sql.DataSource;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.PosixFilePermissions;
public final class JavaMain {
// zookeeper config
private static final int EMBED_ZOOKEEPER_PORT = 2181;
private static final String ZOOKEEPER_CONNECTION_STRING = "localhost:" + EMBED_ZOOKEEPER_PORT;
private static final String DIGEST_STR = "admin.admin";
private static final String JOB_NAMESPACE = "elastic-job-example-lite-java";
// MySQL config
private static final String EVENT_RDB_STORAGE_DRIVER = "com.mysql.jdbc.Driver";
private static final String EVENT_RDB_STORAGE_URL = "jdbc:mysql://localhost:3306/elastic_job_log";
private static final String EVENT_RDB_STORAGE_USERNAME = "root";
private static final String EVENT_RDB_STORAGE_PASSWORD = "root";
public static void main(final String[] args) throws IOException {
// 连接到注册中心
CoordinatorRegistryCenter regCenter = setUpRegistryCenter();
// 数据源配置
JobEventConfiguration jobEventConfig =
new JobEventRdbConfiguration(setUpEventTraceDataSource());
// 设置简单的任务
setUpSimpleJob(regCenter, jobEventConfig);
//setUpDataflowJob(regCenter, jobEventConfig);
//setUpScriptJob(regCenter, jobEventConfig);
}
private static CoordinatorRegistryCenter setUpRegistryCenter() {
ZookeeperConfiguration zkConfig =
new ZookeeperConfiguration(ZOOKEEPER_CONNECTION_STRING, JOB_NAMESPACE);
zkConfig.setDigest(DIGEST_STR); // 设置digest
CoordinatorRegistryCenter result = new ZookeeperRegistryCenter(zkConfig);
result.init();
return result;
}
private static DataSource setUpEventTraceDataSource() {
BasicDataSource result = new BasicDataSource();
result.setDriverClassName(EVENT_RDB_STORAGE_DRIVER);
result.setUrl(EVENT_RDB_STORAGE_URL);
result.setUsername(EVENT_RDB_STORAGE_USERNAME);
result.setPassword(EVENT_RDB_STORAGE_PASSWORD);
return result;
}
private static void setUpSimpleJob(final CoordinatorRegistryCenter regCenter, final JobEventConfiguration jobEventConfig) {
JobCoreConfiguration coreConfig = JobCoreConfiguration
.newBuilder("javaSimpleJob", "0 0/2 * * * ?", 3)
.shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou")
// 全局参数
.jobParameter("jobParameter=jobParameter")
// 自定义的异常处理类
.jobProperties("job_exception_handler",
"com.dangdang.ddframe.job.example.TestHandler")
// 自定义的线程池
.jobProperties("executor_service_handler", "com.dangdang.ddframe.job.lite.spring.fixture.handler.SimpleExecutorServiceHandler")
.build();
SimpleJobConfiguration simpleJobConfig =
new SimpleJobConfiguration(coreConfig, JavaSimpleJob.class.getCanonicalName());
LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration
.newBuilder(simpleJobConfig)
.monitorPort(9888) // 启动监听 监听的端口为9888
.build();
new JobScheduler(regCenter, // 注册中心
liteJobConfiguration, // job的配置
jobEventConfig, // 数据源的配置
new MyElasticJobListener() // 任务的监听器
).init(); // 启动任务
}
private static void setUpDataflowJob(final CoordinatorRegistryCenter regCenter, final JobEventConfiguration jobEventConfig) {
JobCoreConfiguration coreConfig = JobCoreConfiguration
.newBuilder("javaDataflowElasticJobfalse", "0 0/3 * * * ?", 3)
.shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou")
.build();
DataflowJobConfiguration dataflowJobConfig = new DataflowJobConfiguration(coreConfig,
JavaDataflowJob.class.getCanonicalName(),
false);
new JobScheduler(regCenter,
LiteJobConfiguration.newBuilder(dataflowJobConfig).build(),
jobEventConfig).init();
}
private static void setUpScriptJob(final CoordinatorRegistryCenter regCenter, final JobEventConfiguration jobEventConfig) throws IOException {
JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("scriptElasticJob", "0 0/2 * * * ?", 3).build();
ScriptJobConfiguration scriptJobConfig = new ScriptJobConfiguration(coreConfig, buildScriptCommandLine());
new JobScheduler(regCenter, LiteJobConfiguration.newBuilder(scriptJobConfig).build(), jobEventConfig).init();
}
private static String buildScriptCommandLine() throws IOException {
if (System.getProperties().getProperty("os.name").contains("Windows")) {
return Paths.get(JavaMain.class.getResource("/script/demo.bat").getPath().substring(1))
.toString();
}
Path result = Paths.get(JavaMain.class.getResource("/script/demo.sh").getPath());
Files.setPosixFilePermissions(result, PosixFilePermissions.fromString("rwxr-xr-x"));
return result.toString();
}
}
修改 com.dangdang.ddframe.job.example.job.simple.JavaSimpleJob
packagecom.dangdang.ddframe.job.example.job.simple;
importcom.dangdang.ddframe.job.api.ShardingContext;
importcom.dangdang.ddframe.job.api.simple.SimpleJob;
importcom.dangdang.ddframe.job.example.fixture.entity.Foo;
import com.dangdang.ddframe.job.example.fixture.repository.FooRepository;
importcom.dangdang.ddframe.job.example.fixture.repository.FooRepositoryFactory;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
public class JavaSimpleJob implementsSimpleJob {
private FooRepository fooRepository =FooRepositoryFactory.getFooRepository();
@Override
public void execute(final ShardingContext shardingContext) {
System.out.println(String.format("Item: %s | Time: %s | Thread: %s| %s",
shardingContext.getShardingItem(),new SimpleDateFormat("HH:mm:ss").format(new Date()),Thread.currentThread().getId(), "SIMPLE"));
System.out.println("----------------------------" +shardingContext.getShardingParameter());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("------------------------------------------------------------------------------------"+ shardingContext.getShardingParameter());
List data =fooRepository.findTodoData(shardingContext.getShardingParameter(), 10);
for (Foo each : data) {
fooRepository.setCompleted(each.getId());
}
}
}
创建类 com.dangdang.ddframe.job.example.MyElasticJobListener
package com.dangdang.ddframe.job.example;
importcom.dangdang.ddframe.job.executor.ShardingContexts;
importcom.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;
publicclass MyElasticJobListener implements ElasticJobListener {
@Override
publicvoid afterJobExecuted(ShardingContexts shardingContexts) {
System.out.println("-----------------afterJobExecuted---------");
}
@Override
publicvoid beforeJobExecuted(ShardingContexts shardingContexts) {
System.out.println("-----------------beforeJobExecuted---------");
}
}
运行JavaMain.java
4.运行结果分析
如图三个分片都给一个实例运行
运行成功。
我们来试一下开两个JavaMain,看看它是怎么分配的
运行实例变成了两个
作业维度分片状态中的分片也改变了,把分片1分给了另一个实例。
在历史状态中可以看到,0,2片分给一个实例,1分给一个实例,他们的状态如图。可以看到分片1是先运行中,然后等待运行,时间是一样的,可能是后台响应的问题吧。
看到这里也就告一段落了,第一次写这样的文档,还请大家指教,有什么不懂的下面留言,有时间我就会答复。
070721更新
刚才遇到一个朋友导入项目之后项目出错,看了一下是log.,或者.set方法出错。试着重装了一下lombok,就可以了。原来是它的lombok装到另一个eclipse里面了。。。。
双击.jar文件,选取你用的eclipse,安装重启后clean一下工程即可。