定时任务场景
在开发过程中,根据需求和业务的不同经常会有很多场景需要用到不同特性的定时任务,针对这些场景,这里提供不同的一个实现思路。定时任务可能需要的特性如下:
- 多线程执行:即一个定时任务是需要多线程去跑的,因为一个线程太慢了
- 分布式执行:在多线程的基础上,用多台机器的算力去执行一个定时任务
- 动态时间的定时任务:即定时任务的开始时间是不确定的
- 连续上下文多线程定时任务:在使用多线程执行定时任务的时候,严格按照任务的顺序来执行,即任务ABC,多线程执行完A,然后再执行B和C,不能存在A还有没跑完的线程,B已经开始的情况。
- 可暂停继续的定时任务:即一个任务执行了一半,可以暂停后继续运行。
针对以上的几种特性,这里讲一讲相对简单的实现方式
不同定时任务特性对应的实现方式
多线程执行
多线程执行相对来说是个比较简单的需求,只需要定时任务触发的时候,使用线程池去执行任务,在springboot中,只需要使用@Async注解就行了。
//定时任务代码
@Scheduled(fixedDelay = 60000)
public void doSomeThing() {
servive.doA();
}
-------------------------------------------
//执行定时任务逻辑代码
@Service
public class Service(){
@Async("A-config")
public void doA(){
//这里执行逻辑省略
}
}
-------------------------------------------
@Bean("updateById")
public Executor updateById() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数:线程池创建时候初始化的线程数
executor.setCorePoolSize(64);
// 最大线程数:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
executor.setMaxPoolSize(128);
// 缓冲队列:用来缓冲执行任务的队列
executor.setQueueCapacity(500);
// 允许线程的空闲时间60秒:当超过了核心线程之外的线程在空闲时间到达之后会被销毁
executor.setKeepAliveSeconds(60);
// 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
executor.setThreadNamePrefix("do-somthing-");
// 缓冲队列满了之后的拒绝策略:这里调用主线程执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
分布式执行
一般分布式定时任务主要是两个问题,一个是触发,一个是分发。触发需要考虑到只能有一个触发成功,需要根据定时任务的间隔,指定合理的竞争策略,比如每日定时任务,那就用当日凌晨当key,利用redis的setnx原子操作来竞争。
然后就是分发问题,一般任务触发后,利用多台机分布式执行定时任务,由一台机触发后,把任务分发到中间件中,然后多台机器的服务去消费这些任务,这个中间件一般是队列或者redis或者数据库。
动态时间的定时任务
当需要在一个不确定的时间去执行任务,比如我需要定时任务开启一场折扣活动,而这个开启时间取决于业务的其他条件出发,这个开始时间会在某一个业务时刻存入数据库或者reids,那就需要一个任务触发检测器了,可以跑一个1s为间隔的定时任务,每秒钟都检查一次这个任务是否需要执行或者是否到了执行时间,如果需要执行就立刻执行相应的任务逻辑。
连续上下文多线程定时任务
实际业务场景经常会有这种情况,我需要连续的执行A->B->C这三个任务,一定要按严格的顺序执行,如果我给这三个任务分别设置三个执行时间,那后续任务如果超时或者执行时间超过预期,就有可能出现B任务在A没结束就执行。所以只能按同一个定时任务触发来执行。而这时候我又需要多线程来提高效率。所以这个场景的重点在于,子任务结束的判定也就是线程执行完毕的判断。这里有种很简单的实现方式。
实现这个场景的基本思路就是,在执行任务A时候创建线程池去执行A任务逻辑,然后分发完之后,shutdown线程池,调用线程池的awaitTermination方法去阻塞等待线程所有执行完毕。这个方法结束后,自然所有A任务就执行完了,销毁线程池后执行任务B,再创建新的线程池来执行任务B,以此类推,这样既用了多线程执行速度的优点又保证了任务的执行顺序。当然分布式用队列和消费者来做的话,就更简单,只需要监控队列的已消费和待发送以及消费完成数量,就知道任务是否完全结束。以下是多线程上下文任务的一个实现代码
//创建线程池A,这里线程池改造过可以指定名字
NamedThreadPoolExecutor APool =
new NamedThreadPoolExecutor(
32,
128,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(5),
new ThreadPoolExecutor.CallerRunsPolicy(),
this.getClass().getSimpleName());
-------------------------------------------------------------
//这里拿到任务后,把任务都丢到线程池里去执行
List<Object> taskA = getTask();
for(Object task:taskA){
APool.execute(new CommonRunable(task));
}
//这里调用shutdown和awaitTermination,并设置最大超时,在方法执行完后就表示任务A已经完全执行完了
APool.shutdown();
APool.awaitTermination(Long.MAX_VALUE,TimeUnit.NANOSECONDS);
//然后这里A已经全部执行完了,开始执行B
NamedThreadPoolExecutor BPool =
new NamedThreadPoolExecutor(
32,
128,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(5),
new ThreadPoolExecutor.CallerRunsPolicy(),
this.getClass().getSimpleName());
List<Object> taskB = getTask();
for(Object task:taskB){
BPool.execute(new CommonRunable(task));
}
//这里调用shutdown和awaitTermination,并设置最大超时,在方法执行完后就表示任务B已经完全执行完了,继续执行后面的任务
BPool.shutdown();
BPool.awaitTermination(Long.MAX_VALUE,TimeUnit.NANOSECONDS);
//继续执行C任务
...
可暂停并重新开始的定时任务
实现一个可暂停并重新开始的定时任务,首先得考虑场景,如果这个暂停是主动暂停,那需要把这个任务执行的进度和一些中间数据存入中间件或者内存,然后再次开始的时候读取这些中间数据就行了。如果这个暂停的场景包含被动暂停,比如任务中断,或者进程挂了,那就需要在任务一开始就把中间数据和执行进度都使用中间件存储,再次开始的时候会直接从中间件读取任务。一般来说,这里的中间件会选择redis,像java中一些list或者set数据都是很方便存储和读取的。