celery 简介

一. celery 简介

Celery 是一个专注于实时处理和任务调度的分布式任务队列, 同时提供操作和维护分布式系统所需的工具.. 所谓任务就是消息, 消息中的有效载荷中包含要执行任务需要的全部数据.

Celery 是一个分布式队列的管理工具, 可以用 Celery 提供的接口快速实现并管理一个分布式的任务队列.

Celery 本身不是任务队列, 是管理分布式任务队列的工具. 它封装了操作常见任务队列的各种操作, 我们使用它可以快速进行任务队列的使用与管理.

Celery 特性 :

方便查看定时任务的执行情况, 如 是否成功, 当前状态, 执行任务花费的时间等.

使用功能齐备的管理后台或命令行添加,更新,删除任务.

方便把任务和配置管理相关联.

可选 多进程, Eventlet 和 Gevent 三种模型并发执行.

提供错误处理机制.

提供多种任务原语, 方便实现任务分组,拆分,和调用链.

支持多种消息代理和存储后端.

Celery 是语言无关的.它提供了python 等常见语言的接口支持.

二. celery 组件

1. Celery 扮演生产者和消费者的角色,

Celery Beat : 任务调度器. Beat 进程会读取配置文件的内容, 周期性的将配置中到期需要执行的任务发送给任务队列.

Celery Worker : 执行任务的消费者, 通常会在多台服务器运行多个消费者, 提高运行效率.

Broker : 消息代理, 队列本身. 也称为消息中间件. 接受任务生产者发送过来的任务消息, 存进队列再按序分发给任务消费方(通常是消息队列或者数据库).

Producer : 任务生产者. 调用 Celery API , 函数或者装饰器, 而产生任务并交给任务队列处理的都是任务生产者.

Result Backend : 任务处理完成之后保存状态信息和结果, 以供查询.

Celery架构图



2. 产生任务的方式 :

   1.发布者发布任务(WEB 应用)

   2.任务调度按期发布任务(定时任务)

3. celery 依赖三个库: 这三个库, 都由 Celery 的开发者开发和维护.

    billiard : 基于 Python2.7 的 multisuprocessing 而改进的库, 主要用来提高性能和稳定性.

    librabbitmp : C 语言实现的 Python 客户端,

    kombu : Celery 自带的用来收发消息的库, 提供了符合 Python 语言习惯的, 使用 AMQP 协议的高级借口.

三. 选择消息代理

        使用于生产环境的消息代理有 RabbitMQ 和 Redis, 官方推荐 RabbitMQ.

四. Celery 序列化

    在客户端和消费者之间传输数据需要 序列化和反序列化. Celery 支出的序列化方案如下所示:



五. 安装,配置与简单示例

Celery 配置参数汇总


代码示例 :

# 安装$ pip install celery, redis, msgpack

# 配置文件 celeryconfig.py   

CELERY_BROKER_URL = 'redis://localhost:6379/1'   

CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'   

CELERY_TASK_SERIALIZER = 'json'   

CELERY_RESULT_SERIALIZER = 'json'   

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 

# 任务过期时间    CELERY_ACCEPT_CONTENT = ["json"]           

# 指定任务接受的内容类型.

# 初始化文件 celery.py       

from __future__ import absolute_import   

from celery import Celery   

app = Celery('proj', include=["proj.tasks"])   

app.config_from_object("proj.celeryconfig")   

if __name__ == "__main__":       

app.start()     

# 任务文件 tasks.py   

from __future__ import absolute_import   

from proj.celery import app   

@app.task    def add(x, y):       

return x + y    # 启动消费者   

$ celery -A proj worker -l info

# 在终端中测试    > from proj.tasks import add   

  > r = add.delay(2,4)   

  > r.result      6   

  > r.status      u"SUCCESS"   

  > r.successful()      True   

  > r.ready()   

# 返回布尔值,  任务执行完成, 返回 True, 否则返回 False.    > r.wait()     

# 等待任务完成, 返回任务执行结果.    > r.get()     

# 获取任务执行结果    > r.result   

# 任务执行结果.    > r.state     

# PENDING, START, SUCCESS    > r.status     

# PENDING, START, SUCCESS   

# 使用 AsyncResult 方式获取执行结果.   

# AsyncResult 主要用来存储任务执行信息与执行结果(类似 js 中的 Promise 对象),    > from celery.result import AsyncResult    > AsyncResult(task_id).get()      4

说明:以上代码为原博客中内容,实测的话结合flask,redis 存在版本问题。后续博客处理

六. 调用任务的方法 :


1. delay

task.delay(args1, args2, kwargs=value_1, kwargs2=value_2)


2. apply_async

delay 实际上是 apply_async 的别名, 还可以使用如下方法调用, 但是 apply_async 支持更多的参数:

task.apply_async(args=[arg1, arg2], kwargs={key:value, key:value})

支持的参数 :

countdown : 等待一段时间再执行.

add.apply_async((2,3), countdown=5)

eta : 定义任务的开始时间.

add.apply_async((2,3), eta=now+tiedelta(second=10))

expires : 设置超时时间.

add.apply_async((2,3), expires=60)

retry : 定时如果任务失败后, 是否重试.

add.apply_async((2,3), retry=False)

retry_policy : 重试策略.

    max_retries : 最大重试次数, 默认为 3 次.

    interval_start : 重试等待的时间间隔秒数, 默认为 0 , 表示直接重试不等待.

    interval_step : 每次重试让重试间隔增加的秒数, 可以是数字或浮点数, 默认为 0.2

    interval_max : 重试间隔最大的秒数, 即 通过 interval_step 增大到多少秒之后, 就不在增加了, 可以是数字或者浮点数, 默认为 0.2 .

自定义发布者,交换机,路由键, 队列, 优先级,序列方案和压缩方法:

task.apply_async((2,2), compression='zlib', serialize='json', queue='priority.high', routing_key='web.add', priority=0, exchange='web_exchange')


七. 指定队列 :

Celery 默认使用名为 celery 的队列 (可以通过 CELERY_DEFAULT_QUEUE 修改) 来存放任务. 我们可以使用 优先级不同的队列 来确保高优先级的任务优先执行.

# 定义任务队列.

Queue('default', routing_key="task.#"),   

# 路由键 以 "task." 开头的消息都进入 default 队列.   

Queue('web_tasks', routing_key="web.#")   

# 路由键 以 "web." 开头的消息都进入 web_tasks 队列.)

CELERY_DEFAULT_EXCHANGE = 'tasks'             

# 默认的交换机名字为

tasksCELERY_DEFAULT_EXCHANGE_KEY = 'topic'         

# 默认的交换机类型为

topicCELERY_DEFAULT_ROUTING_KEY = 'task.default'   

# 默认的路由键是 task.default , 这个路由键符合上面的 default 队列.

CELERY_ROUTES = {    'proj.tasks.add': {        'queue': 'web_tasks',        'routing_key': 'web.add',    }}

# 使用指定队列的方式启动消费者进程.$ celery -A proj worker -Q web_tasks -l info   

# 该 worker 只会执行 web_tasks 中任务, 我们可以合理安排消费者数量, 让 web_tasks 中任务的优先级更高.

这段没试过

阅后即焚模式(transient):

from kombu import QueueQueue('transient', routing_key='transient', delivery_mode=1)


八. 使用任务调度

使用 Beat 进程自动生成任务.

# 修改配置文件,

# 下面的任务指定 tasks.add 任务 每 10s 跑一次, 任务参数为 (16,16).

from datetime import timedelta

CELERYBEAT_SCHEDULE = {    'add': {       

                                                        'task': 'proj.tasks.add',       

                                                        'schedule': timedelta(seconds=10),       

                                                            'args': (16, 16)    }} 

# crontab 风格

from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {        "add": {               

                                                                "task": "tasks.add",               

                                                                "schedule": crontab(hour="*/3", minute=12),               

                                                                "args": (16, 16),                }            }

# 启动 Beat 程序$ celery beat -A proj

# 之后启动 worker 进程.$ celery -A proj worker -l info 或者$ celery -B -A proj worker -l info


使用自定义调度类还可以实现动态添加任务. 使用 Django 可以通过 Django-celery 实现在管理后台创建,删除,更新任务, 是因为他使用了自定义的 调度类 djcelery.schedulers.DatabaseScheduler .

九. 任务绑定, 记录日志, 重试

# 修改 tasks.py 文件.

from celery.utils.log import get_task_loggerlogger = get_task_logger(__name__)

@app.task(bind=True)def div(self, x, y):   

logger.info(('Executing task id {0.id},

args: {0.args!r}'                '

kwargs: {0.kwargs!r}').format(self.request))   

try:       

result = x/y   

except ZeroDivisionError as e:       

raise self.retry(exc=e, countdown=5, max_retries=3)   

# 发生 ZeroDivisionError 错误时, 每 5s 重试一次, 最多重试 3 次.   

return result


当使用 bind=True 参数之后, 函数的参数发生变化, 多出了参数 self, 这这相当于把 div 编程了一个已绑定的方法, 通过 self 可以获得任务的上下文.

日志输出目前未处理,实际问题需要后面处理,由于是与flask整合。所以需要看怎么管理日志


十. 信号系统 :

信号可以帮助我们了解任务执行情况, 分析任务运行的瓶颈. Celery 支持 7 种信号类型.

1.任务信号

    before_task_publish : 任务发布前

    after_task_publish : 任务发布后

    task_prerun : 任务执行前

    task_postrun : 任务执行后

    task_retry : 任务重试时

    task_success : 任务成功时

    task_failure : 任务失败时

    task_revoked : 任务被撤销或终止时

2.应用信号

3.Worker 信号

4.Beat 信号

5.Eventlet 信号

6.日志信号

7.命令信号

代码示例 :

# 在执行任务 add 之后, 打印一些信息.

@after_task_publish

def task_send_handler(sender=None, body=None, **kwargs): 

    print 'after_task_publish: task_id: {body[id]}; 

    sender: {sender}'.format(body=body, sender=sender)


十一. 子任务与工作流:(这块比较重要)

可以把任务 通过签名的方法传给其他任务, 成为一个子任务.

from celery import signaturetask = signature('task.add', args=(2,2), countdown=10) tasktask.add(2,2)

# 通过签名生成任务task.apply_async()

还可以通过如下方式生成子任务 :

from proj.task import   addtask = add.subtask((2,2), countdown=10)# 快捷方式 add.s((2,2), countdown-10) task.apply_async()

自任务实现片函数的方式非常有用, 这种方式可以让任务在传递过程中财传入参数.

partial = add.s(2)partial.apply_async((4,))

子任务支持如下 5 种原语,实现工作流. 原语表示由若干指令组成的, 用于完成一定功能的过程

1.chain : 调用连, 前面的执行结果, 作为参数传给后面的任务, 直到全部完成, 类似管道.

from celery import chainres = chain(add.s(2,2), add.s(4), add.s(8))()res.get() 管道式: (add.s(2,2) | add.s(4) | add.s(8))().get()


2.group : 一次创建多个(一组)任务.

from celery import group res = group(add.s(i,i)foriinrange(10))()res.get()

3.chord : 等待任务全部完成时添加一个回调任务.

res = chord((add.s(i,i)foriinrange(10)), add.s(['a']))()res.get()# 执行完前面的循环, 把结果拼成一个列表之后, 再对这个列表 添加 'a'.[0,2,4,6,8,10,12,14,16,18,u'a']

4.map/starmap : 每个参数都作为任务的参数执行一遍, map 的参数只有一个, starmap 支持多个参数.

add.starmap(zip(range(10), range(10))) 相当于: @app.taskdef temp():return[add(i,i)foriinrange(10)]

5.chunks : 将任务分块.

res = add.chunks(zip(range(50), range(50)),10)()res.get()

在生成任务的时候, 应该充分利用 group/chain/chunks 这些原语.

十二. 其他


关闭不想要的功能 :

@app.task(ignore_result=True) # 关闭任务执行结果.def func(): pass CELERY_DISABLE_RATE_LIMITS=True # 关闭限速.

根据任务状态执行不同操作 :

# tasks.py

class MyTask(Task):   

    def on_success(self, retval, task_id, args, kwargs):       

        print 'task done: {0}'.format(retval)       

        return super(MyTask, self).on_success(retval, task_id, args, kwargs)   

def on_failure(self, exc, task_id, args, kwargs, einfo):       

        print 'task fail, reason: {0}'.format(exc)       

        return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)

# 正确函数, 执行

MyTask.on_success() :

@app.task(base=MyTask)

    def add(x, y):   

    return x + y # 错误函数, 执行 MyTask.on_failure() :

@app.task  #普通函数装饰为

celery taskdef add(x, y):   

    raise KeyError    return x + y


十三. Celery 管理命令

任务状态回调 :


普通启动命令 :

$ celery -A proj worker -l info


十四. 在 Flask 中使用 Celery

Flask 文档: 基于 Celery 的后台任务

在 Flask 中使用 Celery

原博客://www.greatytc.com/p/027538ffb8c1

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,948评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,371评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,490评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,521评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,627评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,842评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,997评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,741评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,203评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,534评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,673评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,339评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,955评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,770评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,000评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,394评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,562评论 2 349

推荐阅读更多精彩内容

  • Celery(芹菜)是一个异步任务队列/基于分布式消息传递的作业队列。 Celery用于生产系统每天处理数以百万计...
    puluto阅读 3,718评论 0 2
  • Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,它是一个专注于实时处理的任务队列, 同时也支持任...
    与蟒唯舞阅读 3,247评论 1 2
  • 一. celery 简介 Celery 是一个专注于实时处理和任务调度的分布式任务队列, 同时提供操作和维护分布式...
    眼睛好酸阅读 10,470评论 1 11
  • 转https://blog.csdn.net/kk123a/article/details/74549117 一....
    你常不走的路阅读 18,369评论 2 14
  • Celery 进阶 前面已经对Celery的简单使用和配置做了介绍,本章将会展示Celery的更多细节 在自己的应...
    0ooops阅读 5,797评论 0 7