Celery 部署小记
参考版本: 4.0.2
概念
以下摘自官方文档的翻译:
Celery - 分布式任务队列
Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具
它是一个专注于实时处理的任务队列,同时也支持任务调度
Celery
是Python
的一个“开箱即用”的任务队列模块,易用且提供与多语言对接的集成方案
角色列表:
- task(任务)
- broker(中间人)
- worker(消费者)
- result backend(结果后端),可选
- celerybeat(任务调度器),用于定时任务
使用场景
- 处理高功耗、高延时的并发实时操作。可进入celery任务队列,由workers去执行,属于应用层的任务调度
- 可以在应用层(通过任务调度器
celerybeat
进程)执行定时任务
实时任务的建立&执行的过程:
- 创建
task
文件,定义一些任务方法(@app.task
),这些是任务实际执行的代码 - 在配置文件中定义
broker
和result backend
的实体(用于存储信息,可以是RabbitMQ
,Redis
, 数据库等),以及一些workers
执行时需要遵循的规定 - 启动
workers
进程,它会加载配置、绑定task
文件中的任务方法,然后会监控broker
中的每个请求数据包 - 客户端将任务方法名、参数列表等参数包装好,投递到
broker
中,然后不等待执行结果返回,就继续往下执行主程序 -
workers
进程检测到broker
中有任务需要执行,故从中取得数据传递到相应的方法执行,执行完将结果存储到result backend
中(如果有设置结果后端) - 在执行每个任务的时候,
workers
中还维护了该任务的执行状态、执行结果、错误信息等数据项,便于客户端随时调用getStatus
等方法来查询结果
上述的
workers
进程泛指一个监控中间人、执行实际任务的消费者、存数据至结果后端、维护任务信息的进程集合,实际的实现可能是多个子进程或多个子线程共同完成
定时任务的过程: 通过启动额外的进程celerybeat
(任务调度器),每当有到时间执行的任务,就通知workers
执行,其他过程与实时任务大致相同
快速开始
参考: http://docs.jinkan.org/docs/celery/getting-started/first-steps-with-celery.html#first-steps
环境:ubuntu 16.04 virtualenv python3
【1】 创建mytasks.py
文件,键入如下代码:
from celery import Celery
import celeryconfig
app = Celery('mytasks')
# 分离配置到celeryconfig文件
app.config_from_object(celeryconfig)
@app.task
def add(x, y):
return x + y
【2】 创建celeryconfig.py
配置文件,键入如下代码:
broker_url = 'redis://localhost'
result_backend = 'redis://localhost'
# result_backend = 'db+mysql://{user}:{pass}@{host}:{port}/{database}'
include = ['mytasks']
task_serializer = 'json'
result_serializer = 'json'
timezone = 'Asia/Shanghai'
enable_utc = False
除了基本的broker
, result backend
之外,还有一些其他配置,规定workers
的执行
【3】 在项目目录下执行celery -A mytasks worker --loglevel=info
(如提示缺少redis
类库,使用pip
安装即可)
(env) tyruschin@tyruschin-B85M-DS3H-A:~/Desktop/celerytask$ celery -A mytasks worker --loglevel=info
-------------- celery@tyruschin-B85M-DS3H-A v4.1.0 (latentcall)
---- **** -----
--- * *** * -- Linux-4.4.0-87-generic-x86_64-with-debian-stretch-sid 2017-09-06 11:54:49
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: mytasks:0x7fbbf5bbedd8
- ** ---------- .> transport: redis://localhost:6379//
- ** ---------- .> results: redis://localhost/
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. mytasks.add
[2017-09-06 11:54:49,511: INFO/MainProcess] Connected to redis://localhost:6379//
[2017-09-06 11:54:49,520: INFO/MainProcess] mingle: searching for neighbors
[2017-09-06 11:54:50,538: INFO/MainProcess] mingle: all alone
[2017-09-06 11:54:50,546: INFO/MainProcess] celery@tyruschin-B85M-DS3H-A ready.
[2017-09-06 11:54:52,887: INFO/MainProcess] Events of group {task} enabled by remote.
可以看到,进程启动的时候,加载了[config]
,建立了队列[queues]
,绑定了任务列表[tasks]
,然后执行了主进程MainProcess
【4】 在另一终端创建客户端程序client.py
,键入如下代码:
import mytasks
mytasks.add.delay(1, 3)
python client.py
,此时查看celery
进程执行的终端,多了两行log:
[2017-09-06 12:03:01,874: INFO/MainProcess] Received task: mytasks.add[bf3ea8c0-a053-48fb-ad14-25f0f1cc8c07]
[2017-09-06 12:03:01,884: INFO/ForkPoolWorker-2] Task mytasks.add[bf3ea8c0-a053-48fb-ad14-25f0f1cc8c07] succeeded in 0.007293057162314653s: 4
可以知道主进程MainProcess
接收到任务(task id
为bf3ea8c0-a053-48fb-ad14-25f0f1cc8c07
),从进程池中选择一个worker
,即ForkPoolWorker-2
来执行任务,得到结果为4
,经过了约0.007s
【5】 打开redis-cli
查看result backend
,task id
与redis key
相对应,具体如下:
127.0.0.1:6379> keys *
...
6) "celery-task-meta-bf3ea8c0-a053-48fb-ad14-25f0f1cc8c07"
127.0.0.1:6379> get celery-task-meta-bf3ea8c0-a053-48fb-ad14-25f0f1cc8c07
"{\"children\": [], \"result\": 4, \"status\": \"SUCCESS\", \"task_id\": \"bf3ea8c0-a053-48fb-ad14-25f0f1cc8c07\", \"traceback\": null}"
使用的细节
celery
的配置
4.0
版本之后,配置建议采用小写,与旧版有些变化,文档摸我
一些用到的配置:
- broker_url: 中间人配置(默认使用
RabbitMQ
) - result_backend: 结果后端(默认不保存结果)
- include:
list
结构,通常用于task
实体方法与配置不在同一个文件的情况,与Python
的import
类似,且支持用parent_mod.module_name
作为值来引入子模块
简单证明调用任务.delay()
是不等待结果直接返回的(异步非阻塞)
之前使用到了两个文件client.py
和mytasks.py
,修改它们:
# client.py
import mytasks
res = mytasks.add.delay(1, 3)
print("继续执行,不会阻塞,返回值为", res)
获取了任务返回的结果,并输出了一行文字
# mytasks.py
from celery import Celery
import celeryconfig
import time
app = Celery('mytasks')
app.config_from_object(celeryconfig)
@app.task
def add(x, y):
time.sleep(10)
return x + y
在任务方法体内,先模拟一个高延时的情况,10秒后再处理返回结果
重启celery,使其重新绑定任务方法
(env) tyruschin@tyruschin-B85M-DS3H-A:~/Desktop/celerytask$ python client.py
继续执行,不会阻塞,返回值为 564d1adb-9c87-4a29-9c9c-a8a1ba116b0b
执行client.py
发现文字马上输出了,且返回值是task id
而不是计算结果(此时结果还没算出来)
上述步骤证明了非阻塞
[2017-09-06 16:09:33,936: INFO/MainProcess] Received task: mytasks.add[564d1adb-9c87-4a29-9c9c-a8a1ba116b0b]
[2017-09-06 16:09:43,947: INFO/ForkPoolWorker-2] Task mytasks.add[564d1adb-9c87-4a29-9c9c-a8a1ba116b0b] succeeded in 10.00852725515142s: 4
注意到,在celery
进程中,过了约10.009s
才返回计算结果,期间客户端可以通过获取状态来检出结果(此处略去)
上述两个步骤证明了异步
使用监控管理程序
最简单的实践是flower
,它可以监控任务的执行过程、统计相关信息、控制和修改workers等(但是flower
进程断开,相关数据会丢失)
# install
pip install flower
# execute
flower -A mytasks --port=5555
特别注意:必须配置result backend,方可启动flower,且flower中的数据与result backend不关联
其他的一些
1. 自定义状态
...
# 增加引入这两个模块
from celery.exceptions import Ignore
from celery import current_task
@app.task
def test():
...
# 改变当前状态
current_task.update_state(state='WRONG_CODE', meta=result)
# 使当前状态成为最终的状态
raise Ignore()
# 如果return的话,状态会转变成success
# return result
根据我的测试,celery
默认只有两种最终状态,即success
和failure
,如果要增加最终状态,则必须更新状态之后抛出Ignore
异常
update_state
中的meta
参数为记录到结果后端的元数据,记录的格式以配置result_serializer
为准,4.0
版本之后默认为json
,之前为pickle
此处有一个坑:
flower
不认为Ignore
是一个结束状态,所以一直处于“处理中”的状态,故自定义状态不会被flower
记录到,只能记录在result backend
中
2. 使用MySQL作为结果后端
result_backend = 'db+mysql://{user}:{pass}@{host}:{port}/{database}
数据库需要事先建立,在第一次跑的时候,会默认生成两张表celery_taskmeta
和celery_tasksetmeta
,以上的案例只会涉及到第一张表
此处有两个坑:
之前提到配置result_serializer
,发现即使修改为json
(并且4.0
默认就是),MySQL
中仍然使用了pickle
来序列化。配置timezone
与enable_utc
在MySQL
的存储中也同样是失效的
正因上面的坑,有些客户端没有做MySQL
结果的获取适配(如:celery-php
),导致结果后端可以写入但不能读取
可能的解决方案:自定义数据表来存储相应的结果到MySQL
中
3. 其他客户端
支持node
, PHP
等