ScheduledThreadPoolExecutor的任务调度问题排查

使用ScheduledThreadPoolExecutor执行任务调度

为了解决项目中大消息量的发送问题,对项目的消息发送模块进行了处理。之前的消息发送,是直接把消息扔投递出去,但是由于项目中的集成的消息发送客户端存在限制问题,一旦所发送的消息并发量达到一定数量,消息发送客户端就会报错。其次,由于在我们的项目中,需要将消息推送到web端,然后wed端进行消息处理,如果此时,消息投递的过快,就会造成web端一时的消息量太高。综合上面俩个方面的原因,决定对消息的发送逻辑进行缓冲处理。

利用生产者-消费者模式进行消息处理

在项目中,我们对项目中的消息发送模块进行统一封装,所有的需要执行消息发送的模块或开发者,都直接调用我们暴露的接口进行消息发送,这样的话,我们就把消息发送的逻辑统一处理在消息发送处理器中。

image.png

代码实现

以下是伪代码实现:

 @PostConstruct
    private void init(){
        logger.info("【sender对象执行初始化】");
        handleTenantQueue(tenantQueueTable);
        sendSchedulerExecutor= Executors.newSingleThreadScheduledExecutor(HerculesThreadFactory.forName("mqtt-msg-delay-schedule"));
        monitorScheduleExecutor=Executors.newSingleThreadScheduledExecutor(HerculesThreadFactory.forName("mqtt-queue-monitor"));
        webLogScheduleExecutor=Executors.newSingleThreadScheduledExecutor(HerculesThreadFactory.forName("mqtt-web-log-queue-monitor"));
        queueMonitorExcetor=Executors.newSingleThreadScheduledExecutor(HerculesThreadFactory.forName("queue-monitor-schedule"));

        blockingQueue=new LinkedBlockingQueue<MqttMessageSendCommon>(maxFlightSize);

        sendSchedulerExecutor.scheduleAtFixedRate(()->{
           // logger.info("【mqtt消息监视器: 队列大小: {}】",blockingQueue.size());
            //固定调度
            //If any execution of the given task throws an exception, the task is no longer executed.
            // If no exceptions are thrown, the task will continue to be executed until the ScheduledExecutorService is shut down.
            doActualSend();
        },1,100, TimeUnit.MILLISECONDS);//定时调度


        monitorScheduleExecutor.scheduleAtFixedRate(()->{
            //logger.info("【web端mqtt消息监视器: 队列大小: {}】",webMsgblockingQueue.size());
            doActualSendWeb();
        },0,3,TimeUnit.SECONDS);

        webLogScheduleExecutor.scheduleAtFixedRate(()->{
            doActualSendWebLog();
        },0,10,TimeUnit.MILLISECONDS);

        queueMonitorExcetor.scheduleAtFixedRate(()->{
            logger.info("【监控队列】");
            logger.info("【blockingQueue的队列大小是:{},webLogMsgblockingQueue的队列大小是:{}】",blockingQueue.size(),webLogMsgblockingQueue.size());
        },0,5,TimeUnit.SECONDS);

    }


 private void doActualSend(){
        try{
            MqttMessageSendCommon msg = blockingQueue.take();
            String data = JSONObject.toJSONString(msg);
            //logger.info("【调度任务,执行消息发送,消息为:{}】",data);
            mqttGateway.sendToMqtt(msg.getMsgTopic(), 0, data);
            mqttMessageLogService.handleMqSendMsgLog(msg);
            MqttMessageFactory.recycleMessageSendCommon(msg);
        }catch (InterruptedException e){
            logger.error("【mqtt消息发送,发生异常】",e.getCause());
            Thread.currentThread().interrupt();
        }
    }

故障现象

上面的这块代码,在一开始的时候,运行的是良好的,后面增加了消息日志之后,设备端突然反映说,现在无法接收消息了,这时候,我以为是blockingQueue阻塞导致的,因为blockingQueue当消息满了之后,后续的put操作会被阻塞住,但是如果说,当前发送的消息被阻塞住了,过了1秒钟之后,队列中的空间应该就释放出来了,新的put操作还是应该可以放入到blockingQueue中的才是,一顿问号之后,我怀疑是不是调度线程出现了问题。

故障排查

为了查看程序的线程状态,同时保持现场信息,决定使用Arthas来查看线程状态。

image.png
处于waiting状态的调度线程

可以看到,目前的消息调度线程处于waiting状态。这就很奇怪,明明任务是周期性任务,为什么调度线程处于waiting状态呢,根据以下的线程堆栈:


"mqtt-msg-delay-schedule-3-thread-1" #39 prio=5 os_prio=0 tid=0x00007f0bc113e000 nid=0x4900 waiting on condition [0x00007f0b8c3c9000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000087c708c8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

我决定去源码中找下答案。

查看源码:

image.png

可以看到调度线程会从把任务放入到queue队列中,然后每次从队列中取出任务执行,如果没有可以执行的任务的话,调度线程就会处于waiting状态。

但是,我明明有一个周期性任务提交给调度线程了啊。为了确保,使用的是否正确,我再次查看了相关的API,发现了一个隐藏的问题。

If any execution of the given task throws an exception, the task is no longer executed.If no exceptions are thrown, the task will continue to be executed until the ScheduledExecutorService is shut down.

到目前就已经清楚了,如果在执行任务的过程中,发生了异常的话,该任务就会被丢弃。因此,当我们的任务发生异常的时候,任务被抛弃了,这时,任务调度队列中是没有任务可被执行的,所以当前的调度线程处于waiting状态。

总结

API文档说明

在使用SchedulerExecutor.scheduleAtFixedRate时,要注意: 要防止异常的产生,一旦有异常产生,当前任务就会被丢弃,任务调度池中不再有任务,就会造成调度线程状态处于:waiting状态。任务也不再被周期性执行。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,839评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,543评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,116评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,371评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,384评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,111评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,416评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,053评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,558评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,007评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,117评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,756评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,324评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,315评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,539评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,578评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,877评论 2 345