python 定时任务组件的实现

背景

很多时候我们需要控制在某个时间,或者时间间隔执行我们的代码。例如N天后结束商品打折;例如每半个小时给用户推送一条消息等等。

思路

把定时的任务相关信息以及应该执行时刻记录在一个池子(池子需要能持久化,重启我们的服务时,任务不会丢失。这里我们选择mysql)里,定时扫描池子,找出到了执行时间的任务,还原场景,执行即可。

运行环境

Ubuntu 14.04.2 LTS
mysql 5.6及以上
Django 1.10.4 及以上
python 3.6

实现
目录结构
image.png
创建池子 model.py
SQL语句如下
# 任务存储表
  CREATE TABLE t_task (
  id       INT PRIMARY KEY AUTO_INCREMENT ,
  biz_num  VARCHAR(100) NOT NULL, # 业务关联代码
  biz_code VARCHAR(50)  NOT NULL, # 任务场景编码
  `when`     DATETIME     NOT NULL, # 执行时间点
  biz_ext VARCHAR(200), # 扩展信息
  create_time DATETIME NOT NULL, # 创建时间
  update_time DATETIME NOT NULL, # 修改时间
  status TINYINT NOT NULL, # 状态
  version INT NOT NULL DEFAULT 0 # 版本
);

# 索引
CREATE UNIQUE INDEX idx_task_biz ON t_task(biz_num, biz_code);
CREATE INDEX idx_task_when ON t_task(`when`);

#对应django  model
class Task(models.Model):
    biz_code = models.CharField(max_length=50)
    biz_num = models.CharField(max_length=100)
    when = models.DateTimeField()
    biz_ext = models.CharField(max_length=3000, null=True)
    create_time = models.DateTimeField(auto_now_add=True)
    update_time = models.DateTimeField(auto_now=True)
    status = models.SmallIntegerField(default=0)
    version = models.IntegerField(default=0)

    class Meta:
        db_table = 't_task'
        unique_together = ('biz_code', 'biz_num')

    def __str__(self):
        return 'biz_code: %s, biz_num: %s, when: %s, biz_ext: %s' % \
               (self.biz_code, self.biz_num, self.when, self.biz_ext)

mysql 对任务的管理 store.py
# -*- coding: utf-8 -*-

from utils import logging
from datetime import datetime

from django.db.models import F

from .models import *

# 每次批量处理的任务数量
batch_undo_rows = 20
logger = logging.getLogger(__name__)


class MySqlTaskStore:
    def add_task(self, task):
        # biz_code和biz_num有唯一联合索引,注意这里是update_or_create,
        _task, created = Task.objects.update_or_create(biz_code=task.biz_code, biz_num=task.biz_num,
                                                       defaults=dict(when=task.when, biz_ext=task.biz_ext))
        return _task.id

    def finished_task(self, biz_code, biz_num):
        # 执行完成后再mysql中删除任务
        Task.objects.filter(biz_code=biz_code, biz_num=biz_num).delete()

    def get_undo_tasks(self):
        now = datetime.now()
        # 扫描出早于当前时间的定时任务,每次扫描batch_undo_rows行去执行
        undo_tasks = Task.objects.filter(status=0, when__lte=now).order_by('when')[:batch_undo_rows]

        lock_tasks = []

        for task in undo_tasks:
            # 乐观锁
            rows = Task.objects.filter(id=task.id, status=0, version=task.version).update(
                status=1, version=F('version') + 1, update_time=now)
            if rows == 1:
                lock_tasks.append(task)

        return lock_tasks

    def delete_tasks(self, biz_code, biz_num):
        Task.objects.filter(biz_code=biz_code, biz_num__contains=biz_num).delete()

    def close(self):
        pass


def get_store():
    return MySqlTaskStore()


def get_tasks_by_code(biz_code=None, biz_num=None, when=None):
    tasks = Task.objects.filter(status=0)
    if biz_code:
        tasks = tasks.filter(biz_code=biz_code)
    if biz_num:
        tasks = tasks.filter(biz_num__icontains=biz_num)
    if when:
        if tasks.filter(when=when).exists():
            tasks = tasks.filter(when=when)
        else:
            tasks = tasks.filter(when__gt=when)
    return tasks


store_engine = get_store()

结合业务接口 api.py
# -*- coding: utf-8 -*-
from .store import store_engine, get_tasks_by_code as get_tasks
from .executor import run, register_handler, send_shutdown_signal, handler_map


def add_task(task):
    """
    添加新任务
    :param task:
    :return:
    """
    store_engine.add_task(task)


def cancel_task(biz_code, biz_num):
    """
    取消未执行的任务
    :param biz_code: 任务执行处理器编码
    :param biz_num: 任务编码
    :return:
    """
    store_engine.finished_task(biz_code, biz_num)


def delete_tasks(biz_code, biz_num):
    store_engine.delete_tasks(biz_code, biz_num)


def get_tasks_by_code(biz_code=None, biz_num=None, when=None):
    """
    获取业务对应任务, biz_num为模糊查询
    :param biz_code: 任务类型
    :param biz_num: 任务编码
    :param when: 定时时间
    :return:
    """
    return get_tasks(biz_code=biz_code, biz_num=biz_num, when=when)


def register_task_handler(task_handler):
    """
    注册任务处理器
    :param task_handler: 任务处理器
    :return:
    """
    register_handler(task_handler)


def get_task_handlers():
    return handler_map.values()


def start():
    """
    开启服务
    :return:
    """
    run()


def shutdown():
    """
    关闭服务
    :return:
    """
    send_shutdown_signal()


class TaskException(Exception):
    pass


class TaskHandler:
    def handle(self, task):
        """
        子类必需重写此方法处理业务逻辑
        :param task:
        :return:
        """
        raise TaskException('must ovrride process')

    def get_biz_code(self):
        """
        子类返回业务场景编码
        :return:
        """
        raise TaskException('must override get_biz_code')

执行任务 executor.py
# -*- coding: utf-8 -*-

import time
from utils import logging
import threading

from .store import store_engine

logger = logging.getLogger(__name__)

# 空闲时线程休眠时间(秒)
second_of_wait_task = 10
# 任务处理器
handler_map = {}
# 结束任务信号通道
shutdown_signal = False
is_running = False


def register_handler(task_handler):
    handler_map[task_handler.get_biz_code()] = task_handler


def send_shutdown_signal():
    global shutdown_signal, is_running
    shutdown_signal = True
    is_running = False


class TaskProcessThread(threading.Thread):
    def run(self):
        # 为什么这里要sleep一段时间呢? 因为不这样的话, 事个程序加载会有问题,导致服务不可用.初步估计是uwsgi加载
        # 程序初始化的过程中, 整体资源初始化和django project初始化有死锁冲突导致.
        # time.sleep(10)

        logger.info('start the task processor.....')
        global is_running
        is_running = True
        while not shutdown_signal:
            undo_tasks = store_engine.get_undo_tasks()
            logger.info('get undotask size: %s' % len(undo_tasks))

            if not undo_tasks:
                time.sleep(second_of_wait_task)
                continue

            for task in undo_tasks:
                try:
                    logger.info('begin process the task: %s' % task)
                    next_time = handler_map.get(task.biz_code).handle(task)
                    if next_time:
                        # 再次执行此任务
                        logger.info('retry the task: %s' % task)
                        store_engine.retry_task(task.biz_code, task.biz_num, next_time)
                    else:
                        # 标识此任务已完成
                        logger.info('finished the task: %s' % task)
                        store_engine.finished_task(task.biz_code, task.biz_num)
                except Exception:
                    logger.exception('fail process task: %s' % task)
        else:
            logger.info('the task executor had shutdown')


def run():
    if not is_running:
        for i in range(2):
            TaskProcessThread().start()
    else:
        logger.info('task is running, not allow start again')
注册成django应用 app.py
# -*- coding: utf-8 -*-
from utils import logging
import importlib
from django.apps import AppConfig
from django.conf import settings
from .api import register_handler

log = logging.getLogger(__name__)


class TaskConfig(AppConfig):
    name = 'utils.task'

    def ready(self):
        log.info('prepare registry all task handlers ...')
        if hasattr(settings, 'TASK_HANDLERS') and settings.TASK_HANDLERS:
            for handler in settings.TASK_HANDLERS:
                mod_path, sep, cls_name = handler.rpartition('.')
                mod = importlib.import_module(mod_path)
                cls = getattr(mod, cls_name)()
                register_handler(cls)

                log.info('registry success handler: {}'.format(cls))
        else:
            log.info('project has no any task handlers')

#在django的setting文件中配置INSTALLED_APPS 中加入 utils.task
启动task starttask.py
# -*- coding: utf-8 -*-

import signal
import importlib
from django.core.management.base import BaseCommand
from django.conf import settings
from utils.task.api import run as run_task, shutdown
from utils import logging
from ...api import register_handler

log = logging.getLogger(__name__)


def kill_task_when_signal(signum, frame):
    shutdown()


class Command(BaseCommand):
    help = "Start task command."

    def handle(self, *args, **options):

        if hasattr(settings, 'TASK_HANDLERS') and settings.TASK_HANDLERS:
            for handler in settings.TASK_HANDLERS:
                mod_path, sep, cls_name = handler.rpartition('.')
                mod = importlib.import_module(mod_path)
                cls = getattr(mod, cls_name)()
                register_handler(cls)

                log.info('registry success handler: {}'.format(cls))
        else:
            log.info('project has no any task handlers')

        run_task()
        signal.signal(signal.SIGINT, kill_task_when_signal)
        signal.signal(signal.SIGTERM, kill_task_when_signal)
        self.stdout.write("task启动成功......")
# python3 manage.py starttask 即可
应用示例
# 三天后结束商品打折
TASK_CODE_END_PRODUCT_PROMOTION = 'end_product_promotion' # 一种义务场景一个code,唯一
#添加任务
add_task(
        Task(biz_code=TASK_CODE_END_PRODUCT_PROMOTION, biz_num=product_type, when=executor_time))
#注册handler

class EndProductPromotionHandler(TaskHandler):
    """
    结束商品打折
    """

    def handle(self, task):
        product_type = task.biz_num
        # 自定义函数
        end_product_promotion(product_type)
        # return time (下一次执行时间)

    def get_biz_code(self):
        return TASK_CODE_END_PRODUCT_PROMOTION

# 将handler写入配置文件
setting.TASK_HANDLERS 中增加 EndProductPromotionHandler 的绝对路径,用.做分割,例如core_mall.service.promotionservice.EndProductPromotionHandler

重新运行starttask 

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,290评论 6 491
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,107评论 2 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,872评论 0 347
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,415评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,453评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,784评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,927评论 3 406
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,691评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,137评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,472评论 2 326
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,622评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,289评论 4 329
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,887评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,741评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,977评论 1 265
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,316评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,490评论 2 348

推荐阅读更多精彩内容