使用ScheduledThreadPoolExecutor执行任务调度
为了解决项目中大消息量的发送问题,对项目的消息发送模块进行了处理。之前的消息发送,是直接把消息扔投递出去,但是由于项目中的集成的消息发送客户端存在限制问题,一旦所发送的消息并发量达到一定数量,消息发送客户端就会报错。其次,由于在我们的项目中,需要将消息推送到web端,然后wed端进行消息处理,如果此时,消息投递的过快,就会造成web端一时的消息量太高。综合上面俩个方面的原因,决定对消息的发送逻辑进行缓冲处理。
利用生产者-消费者模式进行消息处理
在项目中,我们对项目中的消息发送模块进行统一封装,所有的需要执行消息发送的模块或开发者,都直接调用我们暴露的接口进行消息发送,这样的话,我们就把消息发送的逻辑统一处理在消息发送处理器中。
代码实现
以下是伪代码实现:
@PostConstruct
private void init(){
logger.info("【sender对象执行初始化】");
handleTenantQueue(tenantQueueTable);
sendSchedulerExecutor= Executors.newSingleThreadScheduledExecutor(HerculesThreadFactory.forName("mqtt-msg-delay-schedule"));
monitorScheduleExecutor=Executors.newSingleThreadScheduledExecutor(HerculesThreadFactory.forName("mqtt-queue-monitor"));
webLogScheduleExecutor=Executors.newSingleThreadScheduledExecutor(HerculesThreadFactory.forName("mqtt-web-log-queue-monitor"));
queueMonitorExcetor=Executors.newSingleThreadScheduledExecutor(HerculesThreadFactory.forName("queue-monitor-schedule"));
blockingQueue=new LinkedBlockingQueue<MqttMessageSendCommon>(maxFlightSize);
sendSchedulerExecutor.scheduleAtFixedRate(()->{
// logger.info("【mqtt消息监视器: 队列大小: {}】",blockingQueue.size());
//固定调度
//If any execution of the given task throws an exception, the task is no longer executed.
// If no exceptions are thrown, the task will continue to be executed until the ScheduledExecutorService is shut down.
doActualSend();
},1,100, TimeUnit.MILLISECONDS);//定时调度
monitorScheduleExecutor.scheduleAtFixedRate(()->{
//logger.info("【web端mqtt消息监视器: 队列大小: {}】",webMsgblockingQueue.size());
doActualSendWeb();
},0,3,TimeUnit.SECONDS);
webLogScheduleExecutor.scheduleAtFixedRate(()->{
doActualSendWebLog();
},0,10,TimeUnit.MILLISECONDS);
queueMonitorExcetor.scheduleAtFixedRate(()->{
logger.info("【监控队列】");
logger.info("【blockingQueue的队列大小是:{},webLogMsgblockingQueue的队列大小是:{}】",blockingQueue.size(),webLogMsgblockingQueue.size());
},0,5,TimeUnit.SECONDS);
}
private void doActualSend(){
try{
MqttMessageSendCommon msg = blockingQueue.take();
String data = JSONObject.toJSONString(msg);
//logger.info("【调度任务,执行消息发送,消息为:{}】",data);
mqttGateway.sendToMqtt(msg.getMsgTopic(), 0, data);
mqttMessageLogService.handleMqSendMsgLog(msg);
MqttMessageFactory.recycleMessageSendCommon(msg);
}catch (InterruptedException e){
logger.error("【mqtt消息发送,发生异常】",e.getCause());
Thread.currentThread().interrupt();
}
}
故障现象
上面的这块代码,在一开始的时候,运行的是良好的,后面增加了消息日志之后,设备端突然反映说,现在无法接收消息了,这时候,我以为是blockingQueue阻塞导致的,因为blockingQueue当消息满了之后,后续的put操作会被阻塞住,但是如果说,当前发送的消息被阻塞住了,过了1秒钟之后,队列中的空间应该就释放出来了,新的put操作还是应该可以放入到blockingQueue中的才是,一顿问号之后,我怀疑是不是调度线程出现了问题。
故障排查
为了查看程序的线程状态,同时保持现场信息,决定使用Arthas来查看线程状态。
可以看到,目前的消息调度线程处于waiting状态。这就很奇怪,明明任务是周期性任务,为什么调度线程处于waiting状态呢,根据以下的线程堆栈:
"mqtt-msg-delay-schedule-3-thread-1" #39 prio=5 os_prio=0 tid=0x00007f0bc113e000 nid=0x4900 waiting on condition [0x00007f0b8c3c9000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000087c708c8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我决定去源码中找下答案。
查看源码:
可以看到调度线程会从把任务放入到queue队列中,然后每次从队列中取出任务执行,如果没有可以执行的任务的话,调度线程就会处于waiting状态。
但是,我明明有一个周期性任务提交给调度线程了啊。为了确保,使用的是否正确,我再次查看了相关的API,发现了一个隐藏的问题。
If any execution of the given task throws an exception, the task is no longer executed.If no exceptions are thrown, the task will continue to be executed until the ScheduledExecutorService is shut down.
到目前就已经清楚了,如果在执行任务的过程中,发生了异常的话,该任务就会被丢弃。因此,当我们的任务发生异常的时候,任务被抛弃了,这时,任务调度队列中是没有任务可被执行的,所以当前的调度线程处于waiting状态。
总结
在使用SchedulerExecutor.scheduleAtFixedRate时,要注意: 要防止异常的产生,一旦有异常产生,当前任务就会被丢弃,任务调度池中不再有任务,就会造成调度线程状态处于:waiting状态。任务也不再被周期性执行。