概述
生产开发过程经常遇到数据加工场景,如果处理不好很容易引起各种问题,比如:加工慢、漏加工等,本文针对常见的无序加工场景进行分析介绍。
案例
先看一段目前主流的加工代码,以下代码存在一些坑。
题外话:还有些童鞋使用了死循环取数加工,取不到数据时跳出循环或者休眠指定时间,普通业务场景不推荐。
public void doExecute() {
...
try {
long count = getBillRepository().count(store.getCode(), latestProcessTime, executeTime);
int loop = (int) Math.ceil(Double.valueOf(count) / pageSize);
for (int i = 0; i < loop; i++) {
//list方法一般会按照时间或某个字段排序,保证分页取数不会乱
List<Bill> list = getBillRepository().list(store.getCode(),
latestProcessTime, executeTime, i, pageSize);
for (Bill bill : list) {
// 加工
processOne(bill);
}
}
} catch (Exception e) {
log.error("加工XX流水发生错误:", e);
}
}
private void processOne(Bill bill) throws SwallowsServiceException {
//实际加工内容 TODO
}
问题分析
- processOne方法中如果抛出异常,那么会导致加工异常中断,影响到下一批数据的加工,如果错误数据得不到解决,那么加工程序就一起卡在这条错误数据。
- 影响级别:高,生产事故。
- 解决方案:processOne内部捕获异常,不对外抛出,同时一般会提供记录失败重试次数以及失败原因,示例如下所示
private void processOne(Bill bill) {
try{
//实际加工内容 TODO
} catch (Exception e) {
log.error("加工单据XX发生错误:", e);
try{
BillFailure failure = new BillFailure();
failure.setBillId(bill.getId());
//有童鞋不截取消息,导致数据库存储超长报错,当然有些童鞋已经在fail方法中处理,此处特别列出坑。
failure.setMessage(StringUtils.substring(e.getMessage), 0, 255);
//fail中会累加重试次数update XX set retries=retries+1 where ...
getBillRepository().fail(failure);//此处代码还可以进一步优化,比如累积一批失败记录,批量更新记录日志
}catch(Exception e){
log.error("", e);
}
}
}
- 如果processOne会影响list查询的数据,比如删除或修改了待加工数据为已加工,会导致查询的数据范围发生变化,那么再取第二个分页数据时,实际上已经不是未加工前的第二个分页数据;原第二个分页数据部分数据已跑到第一个分页中,如此导致本次加工作业漏加工数据。
- 影响级别:中,目前大部分是定时加工,错过的数据,下一次定时作业还能加工到,但是加工处理过程会被拉长。
- 解决方案:如果不同数据间的加工无关联,无需保证顺序的话,那么可以从最后一个分页往前加工,如下所示
// doExecute方法中的代码
for (int i = loop-1; i >=0; i--) {
//list方法一般会按照时间或某个字段排序,保证分页取数不会乱
List<Bill> list = getBillRepository().list(store.getCode(),
latestProcessTime, executeTime, i, pageSize);
for (Bill bill : list) {
// 加工
processOne(bill);
}
}
- 如果数据加工失败没得到及时修复,那么这部分数据会一直重复加工,纯粹浪费资源。
- 影响级别:低
- 解决方案:
- 增加最大重试次数,超过最大重试次数之后不再进行加工;
- list待加工数据时限制重试次数,processOne失败增加重试次数。
- 如果查询的待加工表数据量异常庞大,比如百万级别数据量以上,那么使用count统计数据随数据量的增加对应查询耗时增加。
- 影响级别:低,目前较少遇到大数据加工场景,如果有那么当前的代码框架也得换了。
- 解决方案:(只适用于待加工数据范围不会变化的场景,比如全量同步某种资料,该场景较少)去掉count语句,最外层for改用while
// doExecute方法中的代码
int page = 0;
List<Bill> list=null;
while(CollectionUtils.isNotEmpty(list=getBillRepository().list(store.getCode(),
latestProcessTime, executeTime,page, pageSize)){
for (Bill bill : list) {
// 加工
processOne(bill);
}
if (list.size() < pageSize) {
break;
}
page++;
}
推荐写法
public void doExecute() {
...
try {
long count = getBillRepository().count(store.getCode(), latestProcessTime, executeTime);
int loop = (int) Math.ceil(Double.valueOf(count) / pageSize);
for (int i = loop-1; i >=0; i--) {
//list方法一般会按照时间或某个字段排序,保证分页取数不会乱
List<Bill> list = getBillRepository().list(store.getCode(),
latestProcessTime, executeTime, i, pageSize);
for (Bill bill : list) {
// 加工
processOne(bill);
}
}
} catch (Exception e) {
log.error("加工XX流水发生错误:", e);
}
}
private void processOne(Bill bill) {
try{
//实际加工内容 TODO
} catch (Exception e) {
log.error("加工单据XX发生错误:", e);
try{
BillFailure failure = new BillFailure();
failure.setBillId(bill.getId());
//有童鞋不截取消息,导致数据库存储超长报错,当然有些童鞋已经在fail方法中处理,此处特别列出坑。
failure.setMessage(StringUtils.substring(e.getMessage), 0, 255);
//fail中会累加重试次数update XX set retries=retries+1 where ...
getBillRepository().fail(failure);//此处代码还可以进一步优化,比如累积一批失败记录,批量更新记录日志
}catch(Exception e){
log.error("", e);
}
}
}
基类抽象写法
可将以上通用过程抽取基类,减少开发踩坑,最终代码可能如下所示:
//如果想一次取出所有数据,total方法返回1,fetchData查询返回所有数据即可
@Slf4j
@Component
@ConditionalOnProperty(value = "demo.job.enabled", havingValue = "true")
public class DemoDataProcessJob extends DataProcessAbstractJob<PChain> {
public static final String CRON_EXPRESSION_KEY = "demo.job.cronExpression";
public static final String CRON_EXPRESSION_DEFAULT_VALUE = "0 0 0,12 * * ?";
@Autowired
private ChainRepository repository;
//方法说明:待加工的总记录数
@Override
protected long total() {
return repository.count();
}
//方法说明:获取指定页码的一页待加工数据
@Override
protected List<PChain> fetchData(int page, int pageSize) {
//特别注意查询方法中需要按照一定规则排序,一般是XX时间字段
return repository.list(page, pageSize);
}
//方法说明:加工一页数据
@Override
protected void processData(List<PChain> data) {
for (PChain chain : data) {
processOne(chain);
}
}
//方法说明(实际代码别拷贝我):加工一条数据
private void processOne(PChain chain) {
try {
//TODO 此处做实际数据加工处理
} catch (Exception e) {
log.error("加工单据XX发生错误:", e);
//TODO 此处做数据加工异常处理,比如增加最大重试次数、记录失败日志
}
}
//方法说明(实际代码别拷贝我):分页大小,默认为500,根据业务需要可重写父类此方法
// @Override
// protected int pageSize() {
// return super.pageSize();
// }
//方法说明(实际代码别拷贝我):一页数据加工过程发生异常是否忽略,继续加工下一分页数据;默认为true。
// @Override
// protected boolean ignoreException() {
// return super.ignoreException();
// }
@Override
public String getDescription() {
return "数据加工作业示例";
}
@Override
public String getCronExpression() {
Environment env = ApplicationContextUtils.getBean(Environment.class);
return env.getProperty(CRON_EXPRESSION_KEY, CRON_EXPRESSION_DEFAULT_VALUE);
}
}
其它思考
- 加工过程涉及的取数、更新等操作尽量使用批量。
- 针对频繁失败的加工数据,除了增加重试次数以外,还可将重试加工的时间往后延,避免失败数据积压影响正常数据的加工速度。思路类似Spring Retry中的重试等待策略。
- 集群场景,可考虑结合quartz、ElasticJob等实现分布式定时作业加工。
- 如数据量较大,加工时限要求高的,可引入线程池并发加工。(大部分场景无需引入并发处理)