本文是在学习了慕课网中 “
Python
异步任务队列Celery
使用”课程后记录下来的笔记。由于笔者使用的python版本是python 3.7,所以直接需要在老师的代码基础上进行一定的修改,才能适配当前的环境。
环境:
python: python3.7
django: 2.1.5
celery: 4.2.0
django-celery: 3.2.2
flower: 0.9.2
kombu: 4.3.0
tornado: 5.1.1
什么是 Celery ?
Celery
是一个简单、灵活且可靠的,处理大量消息的分布式系统
。专注于实时处理的
异步任务队列
。同时也支持
任务调度
。
使用场景
-
异步任务
将耗时的操作任务提交给
Celery
去异步执行,比如发送短信/邮件、消息推送、音视频处理等等 -
定时任务
类似于
crontab
,比如每日数据统计
安装配置
python 虚拟环境管理工具
virtualenv & virtualenvwrapper
pyenv
pipenv
venv
-
安装
Celery
pip install celery[redis]
-
Celery
的消息中间件- RabbitMQ
- Redis
-
创建
Celery App
app = Celery('xxx', backend='xxxxx', broker='xxxxx')
使用 Celery
-
配置
Celery
broker = 'redis://localhost:6379/1' backend = 'redis://localhost:6379/2' app = Celery('my_task', broker=broker, backend=backend)
-
注册任务
@app.task # 将 `add` 方法变成异步 def add(x, y): print('enter call function ...') time.sleep(5) return x + y
-
向
Celery
提交任务- windows 64bit用户:在任务脚本下添加以下代码
import os os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')
- 提交任务
result = add.delay(2, 15) s # 或者使用 apply_async(由参数组成的元组) # result = add.apply_async((2, 15))
- windows 64bit用户:在任务脚本下添加以下代码
合理分配文件夹管理 celery
任务
-
在当前项目下新建一个
celery_app
的包,用来存放celery
的task
和config
, 并在__init__.py
中配置celery
实例from celery import Celery import os os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1') app = Celery('demo') # 通过 `Celery` 实例加载配置模块 app.config_from_object('celery_app.celery_config')
-
在
celery_config.py
中添加celery
的配置# celery_config.py from datetime import timedelta from celery.schedules import crontab BROKER_URL = 'redis://localhost:6379/1' CELERY_RESULT_BACKEND = 'redis://localhost:6379/2' CELERY_TIMEZONE = 'Asia/Shanghai' # 默认为 UTC # 导入指定的任务模块 CELERY_IMPORTS = { 'celery_app.task1', 'celery_app.task2', } # 指定 `celery` 要执行的任务 CELERYBEAT_SCHEDULE = { 'task1': { 'task': 'celery_app.task1.add', 'schedule': timedelta(seconds=500), 'args': (2, 8) }, 'task2': { 'task': 'celery_app.task2.multiply', 'schedule': crontab(hour=16, minute=32), 'args': (4,6) } }
-
编写
celery
的task
import time from celery_app import app @app.task def add(x, y): time.sleep(3) return x + y
-
在外部脚本中向
celery
提交任务# app.py from celery_app import task1, task2 task1.add.delay(2 ,5) task2.multiply.apply_async(args=(2, 5)) # 可以添加额外参数,比如:指定使用的队列
启动 Celery
-
运行
Celery worker
celery worker -A [celery_project_name] -l INFO
参数说明:
- -A : 指定celery实例的位置
- -l : 指定日志的级别
注意:在windows 64bit 环境中运行上述命令会报错
(venv) c:\Users\jzw\Desktop\dj_celery>celery worker -A tasks -l INFO -------------- celery@DESKTOP-5DO1L05 v4.2.1 (windowlicker) ---- **** ----- --- * *** * -- Windows-10-10.0.17134-SP0 2019-02-10 15:17:01 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: my_task:0x1a932e708d0 - ** ---------- .> transport: redis://localhost:6379/1 - ** ---------- .> results: redis://localhost:6379/2 - *** --- * --- .> concurrency: 8 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . app.add [2019-02-10 15:17:01,905: INFO/SpawnPoolWorker-1] child process 11932 calling self.run() [2019-02-10 15:17:01,913: INFO/SpawnPoolWorker-2] child process 10864 calling self.run() [2019-02-10 15:17:01,924: INFO/SpawnPoolWorker-3] child process 5544 calling self.run() [2019-02-10 15:17:01,937: INFO/SpawnPoolWorker-4] child process 5236 calling self.run() [2019-02-10 15:17:01,953: INFO/SpawnPoolWorker-6] child process 8684 calling self.run() [2019-02-10 15:17:01,963: INFO/SpawnPoolWorker-5] child process 2660 calling self.run() [2019-02-10 15:17:01,964: INFO/SpawnPoolWorker-7] child process 15968 calling self.run() [2019-02-10 15:17:01,979: INFO/SpawnPoolWorker-8] child process 15808 calling self.run() [2019-02-10 15:17:02,606: INFO/MainProcess] Connected to redis://localhost:6379/1 [2019-02-10 15:17:03,614: INFO/MainProcess] mingle: searching for neighbors [2019-02-10 15:17:07,641: INFO/MainProcess] mingle: all alone [2019-02-10 15:17:12,664: INFO/MainProcess] celery@DESKTOP-5DO1L05 ready. [2019-02-10 15:18:13,158: INFO/MainProcess] Received task: app.add[b7c0cbcc-c9a3-40e5-a7f8-e629bdc6d7e8] [2019-02-10 15:18:14,174: ERROR/MainProcess] Task handler raised error: ValueError('not enough values to unpack (expected 3, got 0)') Traceback (most recent call last): File "c:\users\jzw\desktop\dj_celery\venv\lib\site-packages\billiard\pool.py", line 358, in workloop result = (True, prepare_result(fun(*args, **kwargs))) File "c:\users\jzw\desktop\dj_celery\venv\lib\site-packages\celery\app\trace.py", line 537, in _fast_trace_task tasks, accept, hostname = _loc ValueError: not enough values to unpack (expected 3, got 0)
解决方案参考:
解决方案:添加celery运行时的环境变量,在脚本中添加如下代码
# task.py import os os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')
在Python3.7中运行celery worker时出现报错以及解决办法
from . import async, base
^
SyntaxError: invalid syntax
错误提出及讨论:
解决方案:
https://github.com/celery/celery/pull/4852/commits/d737dec3c943632f21f73a2235409c29e3fe63e3
-
运行
celery beat
celery beat -A [celery_project_name] -l INFO
参数说明:
- -A : 指定celery实例的位置
- -l : 指定日志的级别
-
同时运行
celery worker
和celery beat
(不支持Windows)celery -B -A celery_app worker -l[--loglevel] INFO
-
查看
celery
帮助celery worker --help
在django中使用 celery
- 当前环境
- Windows 64bit
- python 3.7
- Python 包环境
Package Version ------------- ------- amqp 2.4.1 anyjson 0.3.3 billiard 3.5.0.5 celery 4.2.0 Django 2.1.5 django-celery 3.2.2 kombu 4.3.0
运行 python manage.py celery worker -l INFO
时报错:
Traceback (most recent call last):
File "manage.py", line 15, in <module>
execute_from_command_line(sys.argv)
File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\django\core\management\__init__.py", line 381, in execute_from_command_line
utility.execute()
File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\django\core\management\__init__.py", line 375, in execute
self.fetch_command(subcommand).run_from_argv(self.argv)
File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\django\core\management\__init__.py", line 224, in fetch_command
klass = load_command_class(app_name, subcommand)
File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\django\core\management\__init__.py", line 36, in load_command_class
module = import_module('%s.management.commands.%s' % (app_name, name))
File "D:\Programs\Python\Python37\lib\importlib\__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1006, in _gcd_import
File "<frozen importlib._bootstrap>", line 983, in _find_and_load
File "<frozen importlib._bootstrap>", line 967, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 728, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\djcelery\management\co
mmands\celery.py", line 11, in <module>
class Command(CeleryCommand):
File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\djcelery\management\co
mmands\celery.py", line 15, in Command
base.get_options() +
TypeError: can only concatenate tuple (not "NoneType") to tuple
参考:
https://stackoverflow.com/questions/49085230/django-celery-typeerror-can-only-concatenate-tuple-not-nonetype-to-tuple
http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html
解决方案:
- 将
djcelery\management\commands\celery.py
中的options
部分注释掉# celery.py from __future__ import absolute_import, unicode_literals from celery.bin import celery from djcelery.app import app from djcelery.management.base import CeleryCommand base = celery.CeleryCommand(app=app) class Command(CeleryCommand): """The celery command.""" help = 'celery commands, see celery help' # options = (CeleryCommand.options + # base.get_options() + # base.preload_options) def run_from_argv(self, argv): argv = self.handle_default_options(argv) base.execute_from_commandline( ['{0[0]} {0[1]}'.format(argv)] + argv[2:], )
- 修改后运行
python manage.py celery worker -l INFO
时报错,报错信息如下:[2019-02-11 01:11:40,836: CRITICAL/MainProcess] Unrecoverable error: SyntaxError('invalid syntax', ('C:\\Users\\jzw\\Desktop\\celery_learn\\dj_celery\\venv\\lib\\site-packages\\cel ery\\backends\\redis.py', 22, 19, 'from . import async, base\n')) Traceback (most recent call last): File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\kombu\utils\objects.py", line 42, in __get__ return obj.__dict__[self.__name__] KeyError: 'backend' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\celery\worker\worker.py", line 205, in start self.blueprint.start(self) File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\celery\bootsteps.py", line 115, in start self.on_start() File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\celery\apps\worker.py", line 139, in on_start self.emit_banner() File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\celery\apps\worker.py", line 154, in emit_banner ' \n', self.startup_info(artlines=not use_image))), File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\celery\apps\worker.py", line 217, in startup_info results=self.app.backend.as_uri(), File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\kombu\utils\objects.py", line 44, in __get__ value = obj.__dict__[self.__name__] = self.__get(obj) File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\celery\app\base.py", line 1196, in backend return self._get_backend() File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\celery\app\base.py", line 914, in _get_backend self.loader) File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\celery\app\backends.py", line 70, in by_url return by_name(backend, loader), url File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\celery\app\backends.py", line 50, in by_name cls = symbol_by_name(backend, aliases) File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\kombu\utils\imports.py", line 56, in symbol_by_name module = imp(module_name, package=package, **kwargs) File "D:\Programs\Python\Python37\lib\importlib\__init__.py", line 127, in import_module return _bootstrap._gcd_import(name[level:], package, level) File "<frozen importlib._bootstrap>", line 1006, in _gcd_import File "<frozen importlib._bootstrap>", line 983, in _find_and_load File "<frozen importlib._bootstrap>", line 967, in _find_and_load_unlocked File "<frozen importlib._bootstrap>", line 677, in _load_unlocked File "<frozen importlib._bootstrap_external>", line 724, in exec_module File "<frozen importlib._bootstrap_external>", line 860, in get_code File "<frozen importlib._bootstrap_external>", line 791, in source_to_code File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\celery\backends\redis.py", line 22 from . import async, base ^ SyntaxError: invalid syntax
说明:这是因为在 python 3.7
中将 async
作为了关键字,所以当 py 文件中出现类似 from . import async, base
这类不符合python语法的语句时,Python会报错。
解决:
在
celery
官方的提议下,建议将async.py
文件的文件名改成asynchronous
。所以我们只需要将celery\backends\async.py
改成celery\backends\asynchronous.py
,并且把celery\backends\redis.py
中的所有async
改成asynchronous
就可以了。
- 重新运行
python manage.py celery worker
命令,celery
的worker
运行成功!
- 安装
pip install django-celery
- 通过
manage.py
脚本来启动worker
python manage.py celery worker -l INFO -Q [queue_name]
Celery
的监控工具: flower
-
安装
pip install flower
-
启动
- 正常启动
celery flower --address=0.0.0.0 --port=5555 --broker=xxx --basic_auth=finlu:finlu
参数说明:
-
adderss
:flower
服务的地址 -
port
:flower
服务的端口 -
broker
: -
basic_auth
:flower
的基本认证
- 通过
manage.py
脚本启动(可以读取到settings.py
中的配置信息)
python manage.py celery flower
在
windows
上worker
执行任务时崩溃- 报错信息
Traceback (most recent call last): File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\billiard\pool.py", line 358, in workloop result = (True, prepare_result(fun(*args, **kwargs))) File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\celery\app\trace.py", line 537, in _fast_trace_task tasks, accept, hostname = _loc ValueError: not enough values to unpack (expected 3, got 0)
参考:
解决方案:
- 在执行
worker
是设置FORKED_BY_MULTIPROCESSING
环境变量的值为'1'
(可以通过在settings.py
中添加以下代码实现# settings.py import os os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')
- 正常启动
使用 supervisor
来部署管理 celery
-
安装
supervisor
- 由于
pip
上的supeervisor
只支持python2.x
, 所以选择在github上进行源码安装。 -
supervisor
项目地址:https://github.com/Supervisor/supervisor
- 由于
-
配置
-
修改
supervisor
基本配置# 将 `supervisor` 的配置存储在 `conf` 目录中 mkdir conf echo_supervisord_conf > conf/supervisord.conf # 将 `supervisor` 的默认配置重定向到 `supervisord.conf`
在
supervisord.conf
中增加下列配置[unix_http_server] file=/tmp/supervisor.sock ; the path to the socket file [inet_http_server] ; inet (TCP) server disabled by default port=127.0.0.1:9001 ; ip_address:port specifier, *:port for all iface [supervisorctl] serverurl=unix:///tmp/supervisor.sock ; use a unix:// URL for a unix socket [include] files = *.ini
-
在
conf
文件夹下增加进程的配置-
supervisor_celery_worker.ini
配置 celery worker[program:celery-worker] command=python manage.py celery worker -l INFO # 运行的命令 directory=/home/finlu/celery_learn/dj_celery # 命令运行的目录 enviroment=PATH="/home/finlu/celery_learn/dj_celery/venv/bin" # Python虚拟环境 stdout_logfile=/home/finlu/celery_learn/dj_celery/logs/celery.worker.log # 输出日志 stderr_logfile=/home/finlu/celery_learn/dj_celery/logs/celery.worker.log # 错误日志 autostart=True # 自动启动 autorestart=True # 自动重启 startsecs=10 # 启动延时 stopwatisecs=60 # 停止延迟 priority=998 # 进程优先级
-
supervisor_celery_flower.ini
配置 celery flower[program:celery-flower] command=python manage.py celery flower directory=/home/finlu/celery_learn/dj_celery enviroment=PATH="/home/finlu/celery_learn/dj_celery/venv/bin" stdout_logfile=/home/finlu/celery_learn/dj_celery/logs/celery.flower.log stderr_logfile=/home/finlu/celery_learn/dj_celery/logs/celery.flower.log autostart=True autorestart=True startsecs=10 stopwatisecs=60 priority=1000
-
supervisor_celery_beat.ini
# 配置 celery beat[program:celery-beat] command=python manage.py celery beat -l INFO directory=/home/finlu/celery_learn/dj_celery enviroment=PATH="/home/finlu/celery_learn/dj_celery/venv/bin" stdout_logfile=/home/finlu/celery_learn/dj_celery/logs/celery.beat.log stderr_logfile=/home/finlu/celery_learn/dj_celery/logs/celery.beat.log autostart=True autorestart=True startsecs=10 stopwatisecs=60 priority=997
-
-
-
运行
supervisor
- 指定配置文件并运行
supervisor
supervisord -c conf/supervisord.conf
- 使用
supervisorctl
命令进行管理-
update
: 更新配置信息(相当于restart)
-
- 指定配置文件并运行