Celery
Celery是一个分布式的任务队列,负责任务的执行与调度。
Celery的架构由三部分组成:
- 消息中间件(message broker);
- 任务执行单元(worker);
- 任务执行结果存储(task result store)组成。
安装:
$ pip install celery
框架集成:
框架 | 集成 |
---|---|
Pyramid | pyramid_celery |
Pylons | celery-pylons |
Flask | not needed |
web2py | web2py-celery |
Tornado | tornado-celery |
对于Flask项目:
- 可以用Redis作为Celery的Broker,负责传递通讯消息。
- 用
pip install flask-celery-helper
安装Celery和它的Flask扩展 - 用
pip install redis
安装Redis的python扩展。
Celery First Step
选择消息中间件(Message Broker)
Celery是通过消息(Message)来沟通的,通常使用一个消息中间件来连接客户端和工作端。初始化一个任务后,客户端通过添加一个消息到队列(Queue),消息中间件(Broker)会将消息发送到工作端处理
Name | Status | Monitoring | Remote Control |
---|---|---|---|
RabbitMQ | Stable | Yes | Yes |
Redis | Stable | Yes | Yes |
Amazon SQS | Stable | No | No |
Zookeeper | Experimental | No | No |
应用
简单应用
创建一个task.py
文件
from celery import Celery
celery = Celery('tasks', broker='redis://localhost:6379/0')
@celery.task
def add(x, y):
return x + y
Flask应用
将Celery配置加入到配置项中:
CELERY_IMPORTS = (
"app.tasks.mail",
)
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
DEBUG = True
创建并初始化Celery:
from flask import Flask
from flask_celery import Celery
app = Flask(__name__)
celry = new Celery()
# 初始化Celery
celery.init_app(app)
运行Worker服务
$ celery -A task.celery worker --loglevel=info
可以通过命令查看更多帮助文档:
$ celery worker --help
$ celery help
调用任务
>>> from task import add
>>> add.delay(4, 4)
<AsyncResult: 7031f593-7166-49dd-8c4b-3717e163de16>
同样你会看到log中已经接收到并且处理了任务
[2016-11-17 12:44:32,642: INFO/MainProcess] Received task: task.add[7031f593-7166-49dd-8c4b-3717e163de16]
[2016-11-17 12:44:32,645: INFO/PoolWorker-2] Task task.add[7031f593-7166-49dd-8c4b-3717e163de16] succeeded in 0.000571212000068s: 8
获取结果
如果想要持久化或者在某些地方使用任务反回的结果,有一些内建的后端服务可以使用。
使用Redis作为后台的话可以使用如下配置创建celery实例:
celery = Celery('tasks', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0')
当然也可以搭配使用:
- RabbitMQ 作为消息中间件
- Redis 作为后端
- more info
-
现在我们重启Celery的Worker服务
$ celery -A task.celery worker --loglevel=info
-
发送任务并接收任务结果
>>> from task import add >>> result = add.delay(3, 4)
这里返回了一个 AsyncResult实例。
-
查看任务是否执行:
>>> result.ready() True
-
等待之行结果:
>>> result.get(timeout=1) 7
get()
有很多参数:
如果task在之行过程中抛出了异常,可以通过指定get参数让异常重新抛出,并进行跟踪:>>> result.get(propagate=True) >>> result.traceback …
更多信息参考celery.result
配置
Celery有输入和输出,可以将输入连接到消息中间件,将输出连接到结果后端。通畅情况下默认配置已经够用,不需要做过多的更动。
现在可以先参考官方文档