定时作业数据加工如何写?

概述

生产开发过程经常遇到数据加工场景,如果处理不好很容易引起各种问题,比如:加工慢、漏加工等,本文针对常见的无序加工场景进行分析介绍。

案例

先看一段目前主流的加工代码,以下代码存在一些坑。
题外话:还有些童鞋使用了死循环取数加工,取不到数据时跳出循环或者休眠指定时间,普通业务场景不推荐。

  public void doExecute() {
     ...
    try {
          long count = getBillRepository().count(store.getCode(), latestProcessTime,  executeTime);
          int loop = (int) Math.ceil(Double.valueOf(count) / pageSize);
          for (int i = 0; i < loop; i++) {
            //list方法一般会按照时间或某个字段排序,保证分页取数不会乱
            List<Bill> list = getBillRepository().list(store.getCode(),
                latestProcessTime, executeTime, i, pageSize);
            for (Bill bill : list) {
              //  加工
              processOne(bill);
            }
          }
      } catch (Exception e) {
      log.error("加工XX流水发生错误:", e);
    }
  }
      
     
 private void processOne(Bill bill) throws SwallowsServiceException {
        //实际加工内容 TODO
 }

问题分析

  1. processOne方法中如果抛出异常,那么会导致加工异常中断,影响到下一批数据的加工,如果错误数据得不到解决,那么加工程序就一起卡在这条错误数据。
    • 影响级别:高,生产事故。
    • 解决方案:processOne内部捕获异常,不对外抛出,同时一般会提供记录失败重试次数以及失败原因,示例如下所示
private void processOne(Bill bill)  {
    try{
        //实际加工内容 TODO
     } catch (Exception e) {
       log.error("加工单据XX发生错误:", e);
       try{
           BillFailure failure = new BillFailure();
           failure.setBillId(bill.getId());
           //有童鞋不截取消息,导致数据库存储超长报错,当然有些童鞋已经在fail方法中处理,此处特别列出坑。
           failure.setMessage(StringUtils.substring(e.getMessage), 0, 255);
           //fail中会累加重试次数update XX set retries=retries+1 where ...
           getBillRepository().fail(failure);//此处代码还可以进一步优化,比如累积一批失败记录,批量更新记录日志
       }catch(Exception e){
           log.error("", e);
       }
    }
 }
  1. 如果processOne会影响list查询的数据,比如删除或修改了待加工数据为已加工,会导致查询的数据范围发生变化,那么再取第二个分页数据时,实际上已经不是未加工前的第二个分页数据;原第二个分页数据部分数据已跑到第一个分页中,如此导致本次加工作业漏加工数据。
    • 影响级别:中,目前大部分是定时加工,错过的数据,下一次定时作业还能加工到,但是加工处理过程会被拉长。
    • 解决方案:如果不同数据间的加工无关联,无需保证顺序的话,那么可以从最后一个分页往前加工,如下所示
// doExecute方法中的代码

         for (int i = loop-1; i >=0; i--) {
            //list方法一般会按照时间或某个字段排序,保证分页取数不会乱
            List<Bill> list = getBillRepository().list(store.getCode(),
                latestProcessTime, executeTime, i, pageSize);
            for (Bill bill : list) {
              //  加工
              processOne(bill);
            }
          }
  1. 如果数据加工失败没得到及时修复,那么这部分数据会一直重复加工,纯粹浪费资源。
    • 影响级别:低
    • 解决方案:
      • 增加最大重试次数,超过最大重试次数之后不再进行加工;
      • list待加工数据时限制重试次数,processOne失败增加重试次数。
  2. 如果查询的待加工表数据量异常庞大,比如百万级别数据量以上,那么使用count统计数据随数据量的增加对应查询耗时增加。
    • 影响级别:低,目前较少遇到大数据加工场景,如果有那么当前的代码框架也得换了。
    • 解决方案:(只适用于待加工数据范围不会变化的场景,比如全量同步某种资料,该场景较少)去掉count语句,最外层for改用while
// doExecute方法中的代码
        int page = 0;
        List<Bill> list=null;
        while(CollectionUtils.isNotEmpty(list=getBillRepository().list(store.getCode(),
                latestProcessTime, executeTime,page, pageSize)){ 
            for (Bill bill : list) {
              //  加工
              processOne(bill);
            }
            
            if (list.size() < pageSize) {   
                break;
            }
            page++;
          }
          

推荐写法

  public void doExecute() {
     ...
    try {
          long count = getBillRepository().count(store.getCode(), latestProcessTime,  executeTime);
          int loop = (int) Math.ceil(Double.valueOf(count) / pageSize);
         for (int i = loop-1; i >=0; i--) {
            //list方法一般会按照时间或某个字段排序,保证分页取数不会乱
            List<Bill> list = getBillRepository().list(store.getCode(),
                latestProcessTime, executeTime, i, pageSize);
            for (Bill bill : list) {
              //  加工
              processOne(bill);
            }
          }
      } catch (Exception e) {
      log.error("加工XX流水发生错误:", e);
    }
  }
      
     
private void processOne(Bill bill)  {
    try{
        //实际加工内容 TODO
     } catch (Exception e) {
       log.error("加工单据XX发生错误:", e);
       try{
           BillFailure failure = new BillFailure();
           failure.setBillId(bill.getId());
           //有童鞋不截取消息,导致数据库存储超长报错,当然有些童鞋已经在fail方法中处理,此处特别列出坑。
           failure.setMessage(StringUtils.substring(e.getMessage), 0, 255);
           //fail中会累加重试次数update XX set retries=retries+1 where ...
           getBillRepository().fail(failure);//此处代码还可以进一步优化,比如累积一批失败记录,批量更新记录日志
       }catch(Exception e){
           log.error("", e);
       }
    }
 }
 

基类抽象写法

可将以上通用过程抽取基类,减少开发踩坑,最终代码可能如下所示:

//如果想一次取出所有数据,total方法返回1,fetchData查询返回所有数据即可
    @Slf4j
    @Component
    @ConditionalOnProperty(value = "demo.job.enabled", havingValue = "true")
    public class DemoDataProcessJob extends DataProcessAbstractJob<PChain> {
        public static final String CRON_EXPRESSION_KEY = "demo.job.cronExpression";
        public static final String CRON_EXPRESSION_DEFAULT_VALUE = "0 0 0,12 * * ?";
    
        @Autowired
        private ChainRepository repository;
    
        //方法说明:待加工的总记录数
        @Override
        protected long total() {
            return repository.count();
        }
    
        //方法说明:获取指定页码的一页待加工数据
        @Override
        protected List<PChain> fetchData(int page, int pageSize) {
            //特别注意查询方法中需要按照一定规则排序,一般是XX时间字段
            return repository.list(page, pageSize);
        }
    
        //方法说明:加工一页数据
        @Override
        protected void processData(List<PChain> data) {
            for (PChain chain : data) {
                processOne(chain);
            }
        }
    
        //方法说明(实际代码别拷贝我):加工一条数据
        private void processOne(PChain chain) {
            try {
                //TODO 此处做实际数据加工处理
    
            } catch (Exception e) {
            log.error("加工单据XX发生错误:", e);
                //TODO 此处做数据加工异常处理,比如增加最大重试次数、记录失败日志
            }
        }
    
        //方法说明(实际代码别拷贝我):分页大小,默认为500,根据业务需要可重写父类此方法
        // @Override
        // protected int pageSize() {
        //     return super.pageSize();
        // }
    
        //方法说明(实际代码别拷贝我):一页数据加工过程发生异常是否忽略,继续加工下一分页数据;默认为true。
        // @Override
        // protected boolean ignoreException() {
        //     return super.ignoreException();
        // }
    
        @Override
        public String getDescription() {
            return "数据加工作业示例";
        }
    
        @Override
        public String getCronExpression() {
            Environment env = ApplicationContextUtils.getBean(Environment.class);
            return env.getProperty(CRON_EXPRESSION_KEY, CRON_EXPRESSION_DEFAULT_VALUE);
        }
    }

其它思考

  1. 加工过程涉及的取数、更新等操作尽量使用批量。
  2. 针对频繁失败的加工数据,除了增加重试次数以外,还可将重试加工的时间往后延,避免失败数据积压影响正常数据的加工速度。思路类似Spring Retry中的重试等待策略。
  3. 集群场景,可考虑结合quartz、ElasticJob等实现分布式定时作业加工。
  4. 如数据量较大,加工时限要求高的,可引入线程池并发加工。(大部分场景无需引入并发处理)
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,376评论 6 491
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,126评论 2 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,966评论 0 347
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,432评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,519评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,792评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,933评论 3 406
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,701评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,143评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,488评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,626评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,292评论 4 329
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,896评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,742评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,977评论 1 265
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,324评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,494评论 2 348

推荐阅读更多精彩内容