Celery 使用

应用

需要一个celery实例,即应用。这个应用是使用所有东西的进入点,例如创建任务、管理工作进程,必须可被其他模块引入。

tasks.py

# coding: utf8
from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')
# 传入的tasks参数即当前的模块名称,broker即为消息队列的地址
# ampb(RabbitMQ) redis

# 下面创建任务
@app.task
def add(x, y):
    return x + y

执行程序,启动工作进程:

celery -A tasks worker --loglevel=info

调用

from tasks import add
add.delay(4, 4)

现在task是被之前启动的工作进程来执行,返回值是一个AsyncResult,可以用来判断任务的状态、等待该任务执行完毕或是获得它的返回值。默认是不返回的,需要配置result backend,也可以在工作进程的命令行输出窗口中看到。

保存结果

可以使用很多backend 例如Django的ORM、SQLAlchemy,Redis,RabbitMQ。

app = Celery('tasks', backend='rpc://', broker='pyamqp://')
# 这边使用的backend是RabbitMQ的rpc远程调用

result = add.delay(4, 4)  # 现在就可以获得返回的result了
result.ready()  # 判断任务是否执行完成
result.get(timeout=1)  # 等待任务执行(一般不用)
# 如果任务出错了这边也会直接获得异常 或:
result.get(propagate=False)  # 不抛出
result.traceback  # 再获得异常信息

配置

app.conf.task_serializer = 'json'
# 设置task的序列化方式

# 一次设置很多选项
app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
)    

使用配置模块:

app.config_from_object('celeryconfig')

celeryconfig.py

broke_url = 'pyamqp://'
task_serializer = 'json'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Asia/Shanghai'
enable_utc = True

task_routes = {
    'tasks.add': 'low-priority',  # 把任务路由到某个队列
}
task_annotations = {
    'tasks.add': {'rate_limit': '10/m'}  # 限制该任务的发送速度
}

如果想测试配置文件是否有语法问题,和普通的py文件一样,使用:

python -m celeryconfig
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容