基础
自动路由
需要启用 [task_create_missing_queues](默认是启用的)这样在发现某队列不存在时会创建。
task_routes = {'feed.tasks.import_feed': {'queue': 'feeds'}}
这样feed.tasks.import_feed任务会发送到feeds队列中,其他默认会发送到celery队列中(也可以改变 app.conf.task_default_queue = 'default'
)。
app.conf.task_routes = {'feed.tasks.*': {'queue': 'feeds'}}
task_routes = ([
('feed.tasks.*', {'queue': 'feeds'}),
('web.tasks.*', {'queue': 'web'}),
(re.compile(r'(video|image)\.tasks\..*'), {'queue': 'media'}),
],)
启动工作进程只负责某队列
celery -A proj worker -Q feeds
celery -A proj worker -Q feeds,celery
特殊选项
支持优先级:
from kombu import Exchange, Queue
app.conf.task_queues = [
Queue('tasks', Exchange('tasks'), routing_key='tasks', queue_arguments={'x-max-priority': 10},
]
设置默认优先级
app.conf.task_queue_max_priority = 10
因为非amqp得中间人(redis)不支持exchange,所以需要exchange和队列名称相同。
AMQP
消息
消息包含消息头和消息体,celery使用头部来存储消息类型和消息编码格式,消息体中包含要执行的任务名称、UUID、参数和其他信息。
{'task': 'myapp.tasks.add',
'id': '54086c5e-6193-4575-8308-dbab76798756',
'args': [4, 4],
'kwargs': {}}
生产者、消费之、中间人
exchange 队列 routingkey
消息先发送给exchange,exchange把消息发给队列,消息一直等在队列里直到有消费者取出,被ack后从队列中删除。
from kombu import Exchange, Queue
app.conf.task_queues = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('videos', Exchange('media'), routing_key='media.video'),
Queue('images', Exchange('media'), routing_key='media.image'),
)
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange_type = 'direct'
app.conf.task_default_routing_key = 'default'
exchange类型
已定义好的有direct、topic、fanout、headers,此外还可以通过对rabbitmq的插件来增加exchange类型。
Direct
通过routing key来匹配,绑定了routing key的队列只接受带该routing key的消息。
Topic
可以加通配符的Direct
API
exchange.declare(exchange_name, type, passive, durable, auto_delete, internal)
passive:表明不会创建该exchange,使用该选项来判断该exchange是否已存在。
durable:是否保存消息到磁盘
auto_delete:是否在没有工作进程使用它时就删除
queue.declare(*queue_name*, *passive*, *durable*, *exclusive*, *auto_delete*)
声明队列,如果加了exclusive则该队列只能被该创建者来使用(同时声明了该选项就意味着auto_delete)
queue.bind(*queue_name*, *exchange_name*, *routing_key*)
绑定队列和exchange
queue.delete(*name*, *if_unused=False*, *if_empty=False*)
删除队列
exchange.delete(*name*, *if_unused=False*)
删除exchange
路由器
专门用于确定一个任务的路由的函数。
def route_task(name, args, kwargs, options, task=None, **kw):
if name == 'myapp.tasks.compress_video':
return {'exchange': 'video',
'exchange_type': 'topic',
'routing_key': 'video.compress'}
task_routes = (route_task,)
广播
from kombu.common import Broadcast
app.conf.task_queues = (Broadcast('broadcast_tasks'),)
app.conf.task_routes = {'tasks.reload_cache': {'queue': 'broadcast_tasks'}}
这样tasks.reload_cache任务会被发送到每个消费该队列的工作进程上。注意celery result不能处理这种多个task id相同的结果,所以最好是设置ignore_result。