概述
celery是一个python实现的分布式任务执行框架,本文为学习笔记:
- python 3.8.6
- celery 5.1.2
- 操作系统 win10
- redis 3.0.504
安装
- pip install celery
- pip install flower (任务监控平台)
- pip install eventlet (win10 环境需要)
参考官网上面的例子,将celery 当成一个独立工程来维护,方便将已有的业务按统一规范写成任务入口函数
目录结构如下:
celery.py 文件代码:
from celery import Celery
app = Celery('proj',
broker='redis://:123456@10.2.13.167:6379/1',
backend='redis://:123456@10.2.13.167:6379/2',
include=['proj.tasks'])
# Optional configuration, see the application user guide.
app.conf.update(
result_expires=3600,
)
if __name__ == '__main__':
app.start()
tasks.py 文件代码如下:
from .celery import app
@app.task
def add(x, y):
print("call add")
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def xsum(numbers):
return sum(numbers)
启动服务
- 启动 celery celery -A proj worker -l INFO -P eventlet
- 启动 flower celery -A proj flower --address=127.0.0.1 --port=5566 # web监控页面打开方式 http://127.0.0.1:5566
直接ctrl+c 退出程序
在liunx上面 支持以 守护进程方式启动 celery multi start w1 -A proj -l INFO multi 其它参数:重启是 restart 停止是stop,还支持启动多个worker 这里是只启动了一个,具体参数参考官方文档
集成到flask框架中
celery集成到flask框架中不需要安装任何扩展,网上有其它的方式进行集成,本人采用的是,直接调用 tasks.py 下面的任务函数的方式实现
代码如下:
from flask import Flask, request
from proj.tasks import app, add, mul
from celery.result import AsyncResult
flask_app = Flask(__name__)
@flask_app.route('/add', methods=["POST"])
def celery_add():
"""
执行add任务
POST http://127.0.0.1:5000/add
Body 为表单 参数为 args1,args2
:return:
"""
try:
args1, args2 = request.form.values()
print(f"获取到的参数:{args1},{args2}")
except Exception as e:
return str(e)
result = add.delay(int(args1), int(args2))
result_id = result.id
print(f"任务id:{result_id}")
return result_id
@flask_app.route('/get_result', methods=["GET"])
def get_result_id():
"""
根据id获取任务结果
GET http://127.0.0.1:5000/get_result?id=fd7cb7dc-5b1b-4e07-8199-dd89a0c08a2a
:return:
"""
result_id = request.args.get('id')
async_result = AsyncResult(id=result_id, app=app)
result = ""
if async_result.successful():
result = async_result.get()
print(result)
# result.forget() # 将结果删除
elif async_result.status == 'PENDING':
print('任务等待被执行')
elif async_result.status == 'RETRY':
print('任务异常后重试')
elif async_result.status == 'STARTED':
print('任务执行中')
elif async_result.failed():
print('任务执行失败')
return str(result)
if __name__ == "__main__":
flask_app.run("127.0.0.1", port=5000)
启动flask程序
通过postman进行调用测试:
-
调用add函数,获取任务id:
-
获取对应id的结果:
-
打开flower 查看调用过程 如下图: