spring batch 纯注解学习笔记(八)--多job运行与优化

前序文章陆续已经将spring batch所有组件模块介绍完毕,并一一演示了作用,本文将对前面业务做一个补充和优化工作。

1.多Job运行

对于业务复杂的应用,往往是多个Job同时存在的运行,其实处理非常简单,通过以下示例展示如果存在多个Job运行,这里为了直接凸显作用,部分代码没有公开:

@EnableBatchProcessing
@Slf4j
@Import(ScopeConfiguration.class)
public class BatchConfig {
    private static JobContext jobContext = JobContext.getInstance();


    @Bean("synchroJob")
    public Job ceateSynchroJob(@Qualifier("getDataToCacheStep") Step getDataToCacheStep,
                               @Qualifier("camDataListener") CamDataListener camDataListener,
                               @Qualifier("backupDataStep") Step backupDataStep,
                               @Qualifier("truncateDataStep") Step truncateDataStep,
                               @Qualifier("saveDataToDestDataBaseStep") Step saveDataToDestDataBaseStep,
                               @Qualifier("synchroIsFinishedDecider") JobExecutionDecider synchroIsFinishedDecider,
                               @Qualifier("saveDataIsFinishedDecider") JobExecutionDecider saveDataIsFinishedDecider) {
        return jobBuilderFactory.get("synchroJob")
                .incrementer(new RunIdIncrementer())
                .listener(camDataListener)
                .start(getDataToCacheStep)
                .next(synchroIsFinishedDecider)
                .on("GOON")
                .to(getDataToCacheStep)
                .from(synchroIsFinishedDecider)
                .on("COMPLETED")
                .to(backupDataStep)
                .next(truncateDataStep)
                .next(saveDataToDestDataBaseStep)
                .from(saveDataIsFinishedDecider)
                .on("GOON")
                .to(saveDataToDestDataBaseStep)
                .end()
                .build();
    }
}


and

@Configuration
@EnableBatchProcessing
@Slf4j
public class BackUpConfig {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;
    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Bean("backUpJob")
    public Job ceateSynchroJob(@Qualifier("backupDataStep") Step backupDataStep, @Qualifier("backupDataListener") BackupDataListener backupDataListener) {
        return jobBuilderFactory.get("backupJob").incrementer(new RunIdIncrementer()).listener(backupDataListener).start(backupDataStep).build();
    }

    @Bean("backupDataStep")
    public Step backupDataStep(@Qualifier("backUpDataTasklet") Tasklet backUpDataTasklet) {
        return stepBuilderFactory.get("backupDataStep").tasklet(backUpDataTasklet).build();
    }

    @Bean("backUpDataTasklet")
    @Scope(value = "step", proxyMode = ScopedProxyMode.INTERFACES)
    public BackUpDataTasklet createBackUpDataTasklet() {
        return new BackUpDataTasklet();
    }
}

运行第一个任务

JobParametersBuilder jobParametersBuilder = new JobParametersBuilder().addString("keepLatest", "false").addDate("date", new Date()).addLong("currentTime", new Long(System.currentTimeMillis()));
            JobExecution result = jobLauncher.run(synchroJob,
                    jobParametersBuilder.toJobParameters());
            try {
                this.wait();
                ExitStatus exitStatus = result.getExitStatus();
                if (exitStatus.getExitCode().equals(ExitStatus.COMPLETED.getExitCode())) {
                    log.info("任务正常完成");
                    return true;
                } else {
                    log.error("任务失败,exitCode=" + exitStatus.getExitCode());
                    return false;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                log.error("任务失败,exitCode=" + e.getMessage());
                return false;
            }

运行第二个任务

try {
                JobContext.getInstance().setVersion(new SimpleDateFormat("yyyyMMddHHmmss").format(new Date()));
                synchronized (this) {


                    JobExecution result = launcher.run(job,
                            new JobParametersBuilder().addString("keepLatest", "true").addDate("date", new Date()).addLong("currentTime", new Long(System.currentTimeMillis())).toJobParameters());
                        this.wait();
                        if(!result.getExitStatus().getExitCode().equals(ExitStatus.COMPLETED.getExitCode())){
                            return 100;
                        }
                }
            } catch (JobExecutionAlreadyRunningException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
                log.info("有任务正在同步中,任务失败" + e.getMessage());
                return 2;
            } catch (JobRestartException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
                log.info("任务重启失败" + e.getMessage());
                return 3;
            } catch (JobInstanceAlreadyCompleteException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
                log.info("任务实例已完成");
                return 4;
            } catch (JobParametersInvalidException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
                log.info("任务启动失败,参数异常" + e.getMessage());
                return 5;
            }catch (InterruptedException e) {
                e.printStackTrace();
                return 7;
            }

加锁的目的是为了防止多次启动造成参数重复

2.线程池优化

往往Job数量比较多时就需要多线程处理,当然我们可以自己在业务中使用线程池处理,但是Spring batch提供了可以配置线程池的参数,将参数注入启动器jobLauncher 中即可实现多线程运行,最大线程数量配置规则为:
如果任务是计算密集型的,线程池大小建议设置为Ncpu + 1
  其中N是CPU数量,

+1 是为了在某一个线程处于暂停阶段时,有新的线程可以用来执行,减少CPU中断时间。

如果是IO密集型,则需要增大线程数大小,避免IO操作占用过多的CPU时间

Nthreads = Ncpu x Ucpu x (1 + W/C),其中

Ncpu = CPU核心数

Ucpu = CPU使用率,0~1

W/C = 等待时间与计算时间的比率

通产而言对于单机器部署的单个服务:
IO密集型配置线程数经验值是:2N,其中N代表CPU核数。
CPU密集型配置线程数经验值是:N + 1,其中N代表CPU核数。

 //配置线程池执行job
    @Bean
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(5);
        taskExecutor.setMaxPoolSize(10);
        taskExecutor.setQueueCapacity(200);
        return taskExecutor;
    }

    @Bean("cssJobLauncher")
    public SimpleJobLauncher jobLauncher(ThreadPoolTaskExecutor taskExecutor, DataSource dataSource,
                                         PlatformTransactionManager transactionManager) throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setTaskExecutor(taskExecutor);
        jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager));
        return jobLauncher;
    }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,366评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,521评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,689评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,925评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,942评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,727评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,447评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,349评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,820评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,990评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,127评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,812评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,471评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,017评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,142评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,388评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,066评论 2 355

推荐阅读更多精彩内容