选择中间人
Celery 需要一个发送和接收消息的解决方案,其通常以独立服务形式出现, 称为 消息中间人 。
RabbitMQ
sudo apt-get install rabbitmq-server
命令执行完成后,中间人就已经运行在后台,准备好传输消息: Starting rabbitmq-server: SUCCESS
。
Redis
使用 Redis
一般不推荐将数据存在数据库,对于很小的项目可能是适合的
也可以使用 Amazon SQS 、 Using MongoDB 和 IronMQ ,但是官方推荐使用Redis和RabbitMQ.
安装celery
pip install celery
应用
在项目app目录下,创建 tasks.py
from celery import Celery
app = Celery('tasks', broker='amqp://guest@localhost//')
@app.taskdef add(x, y):
return x + y
AsyncResult实例:
result = add.delay(4,4)
result.ready() #查看任务是否完成处理
result.get(timeout=1) #等待任务完成,返回结果,但这就将异步变成同步
result.get(propagate=False) #倘若任务抛出了一个异常,get()会重新抛出异常, 但你可以指定 propagate
参数来覆盖这一行为
result.traceback #如果任务抛出了一个异常,你也可以获取原始的回溯信息
配置
config.py
## Broker设置。
BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' #Redis 结果后端
'''
CELERY_RESULT_BACKEND='redis://:password@host:port/db'
host: Redis的服务器的名称或IP地址。例如 localhost:本地主机。
port: Redis的服务器。默认值是6379
db: 使用的数据库数,默认值为0
password: 密码用于连接到数据库。
缓存后端设置
使用单个memcached服务器:
CELERY_RESULT_BACKEND='cache+memcached://127.0.0.1:11211/'
cache : 缓存
'''
or
BROKER_URL = 'amqp://'
CELERY_RESULT_BACKEND = 'amqp://'
CELERY_TASK_RESULT_EXPIRES = 18000 # 5 hours.
'''
同时,AMQP后端需要的RabbitMQ 1.1.0或更高版本以自动失效的结果。如果你正在运行的RabbitMQ的是旧版本,你应该禁用这样的结果是到期:
CELERY_TASK_RESULT_EXPIRES =无
'''
CELERY_CACHE_BACKEND = ‘memory’ #只在内存中缓存
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 3600} # 1 hour. Redis 的默认可见性超时时间是 1 小时。
#列出导入模块
CELERY_IMPORTS=('myapp.tasks',)
app.conf.CELERY_TASK_SERIALIZER = 'json'
#一次性添加多个配置
app.conf.update(
CELERY_TASK_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'], # Ignore other content
CELERY_RESULT_SERIALIZER='json',
CELERY_TIMEZONE='Europe/Oslo',
CELERY_ENABLE_UTC=True,
)
#处理繁重任务的专用队列
CELERY_ROUTES = {
'tasks.add': 'low-priority',
}
#限制任务的速率,这样每分钟只允许处理 10 个该类型的任务
CELERY_ANNOTATIONS = {
'tasks.add': {'rate_limit': '10/m'}
}
CELERY_ANNOTATIONS={'*':{'rate_limit':'10/s'}} #限制所有任务处理
如果你使用 RabbitMQ 或 Redis 作为中间人,那么你也可以在运行时直接在职程上设置速率限制:
$ celery control rate_limit tasks.add 10/mworker@example.com: OK new rate limit set successfully
[并发设置]
CELERYD_CONCURRENCY #默认是你的cpu可用数量
你可以调用 config_from_object()
来让 Celery 实例加载配置模块:
app.config_from_object('celeryconfig')
验证配置模块:
python -m celeryconfig
配置选项的完整参考见 Configuration and defaults
注意
1.确保任务没有启用 ignore_result 。启用这个选项会强制所有职程跳过状态更行。