celery 分布式任务队列工具
Celery是一个分布式任务队列工具,是一个异步的任务队列基于分布式消息传递
基本
- Broker: 消息队列使用的中间人 有RabbiMQ redis mongodb 等一系列数据库
- Task: 用来定义任务
- backend: 用来保存结果
- Worker: 执行单元, 用来从中间人取出任务 , 并把结果发给backen
数据保存
各个sqalchemy数据库
CELERY_RESULT_BACKEND = 'db+scheme://user:password@host:port/dbname'
# sqlite (filename) CELERY_RESULT_BACKEND = ‘db+sqlite:///results.sqlite’
# mysql CELERY_RESULT_BACKEND = ‘db+mysql://scott:tiger@localhost/foo’
# postgresql CELERY_RESULT_BACKEND = ‘db+postgresql://scott:tiger@localhost/mydatabase’
# oracle CELERY_RESULT_BACKEND = ‘db+oracle://scott:tiger@127.0.0.1:1521/sidname’
worker 设置
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_ACCEPT_CONTENT = ['pickle']
# 设置队列 feeds
CELERY_ROUTES = {'feed.tasks.import_feed': {'queue': 'feeds'}}
BROKER_URL = 'amqp://' # 指定 Broker
CELERY_RESULT_BACKEND = '' # 指定 Backend
CELERY_TIMEZONE='Asia/Shanghai' # 指定时区,默认是 UTC
# CELERY_TIMEZONE='UTC'
CELERY_IMPORTS = ( # 指定导入的任务模块
'celery_app.service_monitor',
'celery_app.service_diagnose'
)
消息机制 支持 RabbitMQ, Redis, Beanstalk, MongoDB, CouchDB,
以及 SQL 数据库.
容错 与 RabbitMQ配合可完美实现错误恢复
分布式 运行于一台或多台服务器。支持Broker群集和HA,可任意添加worker而无需在服务器中心节点配置。
并发 通过Python的multiprocessing, Eventlet, gevent 或者他们的混合实现并发执行.
Scheduling 支持cron类的递归式任务,或者指定时间、倒数等任务执行方式.
延迟 极低延迟.
返回值 任务运行结果可储存在指定的结果存储后台,你可以等待结果或忽略运算结果
返回值存储 支持SQL数据库, MongoDB, Redis, Tokyo
Tyrant, Cassandra, 或 AMQP (消息通知).
Webhooks 用户跨语言/平台任务分配。
Rate limiting Supports rate limiting by using the token bucket algorithm, which accounts for bursts of traffic. Rate limits can be set for each task type, or globally for all.
消息路由 通过AMQP灵活的路由模型你可以将任务路由到任意worker服务器,可配置或运行时指定。
远程控制 可通过广播消息远程控制worker节点。Celery内置了大量的相关命令,也可以轻松实现自定义命令(只适用AMQP和Redis)
监控 可实时获得workers的一切信息
对象序列化 支持 Pickle, JSON, YAML,或自定义序列化程序.
错误追踪
UUID 每个任务都有一个UUID用于查询该任务的运行状态以及返回值。
出错重试 当任务执行失败时可根据配置重试。配置内容包括最大重试次数,重试时间间隔。
任务集 任务集由多个子任务构成,可以获得子任务的数量,执行情况,以及各个子任务的运算结果。
Made for Web 可通过Ajax查询任务运行状态和运行结果。
出错通知 当任务出错是可通过邮件通知管理员
任务调用
service_monitor 为注册任务
延迟调用
add.s(2, 2)()
简单调用
service_monitor(url)
可设置回调调用
service_monitor.apply_async(args=["http://www.baidu.com"], link=res.s())
所有组任务执行完后 所有结果发送到res任务中去 也就是回调res任务
chord((service_monitor.s("http://www.baidu.com") for i in range(3)) , res.s())()
设置每个任务执行后的任务 .s 前任务作为参数的一部分
add.apply_async((2, 2), link=add.s(16))
链接任务 结果发给后任务
chain(add.s(4, 4) | mul.s(8))().get()
另一种用法
g = chain(add.s(4) | mul.s(8))
g(4).get()
还有一种
(add.s(4, 4) | mul.s(8))().get()
任务可设置参数
add.apply_async((10, 10), serializer='json')
add.apply_async((2, 2), compression='zlib')
add.apply_async((2,2), countdown=10, debug=True)
s = add.subtask((2, 2), {'debug': True}, countdown=10)# args kwargs options
应用
task.py
from celery import Celery
app = Celery('tasks', broker='amqp://guest@localhost//')
@app.task
def add(x, y):
return x + y
运行
celery -A tasks worker --loglevel=info
此时 一个worker 已经开始运行
单机多个worker
$ celery worker --loglevel=INFO --concurrency=10 -n worker1.%h
$ celery worker --loglevel=INFO --concurrency=10 -n worker2.%h
$ celery worker --loglevel=INFO --concurrency=10 -n worker3.%h
下发任务
test.py
# -*- coding: utf-8 -*-
from celery_app.service_monitor import service_monitor, res
from celery import chord
result = chord((service_monitor.s("http://www.baidu.com") for i in range(3)) , res.s())()
# 查看任务
result.ready()
result.get(timeout=1)
交换与路由
三种交换方式
Direct Exchange
直接交换,也就是指定一个消息被其中一个队列接收,这个消息被celerybeat定义一个routing key,如果你发送给交换机并且那个队列且绑定的bingdingkey,那么就会直接被转给这个Queue.
Topic Exchange
这种交换方式可以根据类型的属性进行统配,然后根据统配的类型进行交换到指定的Queue.
Fanout Exchange
广播交换,如果你有某个task,可能处理时间比较长,但是却要求很高的实时性,那么你可能需要多台服务器的多个worker进行处理,每个worker负责其中一部分工作,但是celerybeat 只会生成一个任务,被某个worker取走就没有了,所以你需要让每个服务器的队列都要收到这个消息,这里很需要注意的是:你的fanout类型的消息在生成的时候要有多份,每个队列一份,而不是一个消息发送给单一队列的次数。
celery -A proj worker -Q feeds,celery
为该worker指定一或多个消息队列, worker只取该队列中的任务。可以指定多个队列.
调用
service_monitor.apply_async(args=['http://cnn.com/rss'],queue='feeds',routing_key='feeds')
优先级
单个任务优先级
from kombu import Exchange, Queue
app.conf.task_queues = [
Queue('tasks', Exchange('tasks'), routing_key='tasks',
queue_arguments={'x-max-priority': 10},
]
统一优先级
app.conf.task_queue_max_priority = 10
发送广播
current_app.control.broadcast('rate_limit',
arguments={'task_name': 'celery_app.service_monitor.service_monitor',
'rate_limit': '200/m'})