DRF 之 celery 异步处理问题
发送邮件, 短信, 文件上传
1. 简介 celery -> 生产者 消费者 模型 的一种体现
client(生成任务) -> broker(任务存储队列) -> worker(执行任务)
2. 安装三步走:
- tasks ------> 任务
- broker ----> 消息队列(中间人)
- worker----> 执行者
3. 在 工程中 创建 celery_tasks 包 (存放celery所有的文件)
创建任务的一些前提条件:
- 任务 就是
函数
- 任务 需要被celery 实例出的对象的 task 方法
装饰
- 任务 需要被celery 实例出的对象
自动检测
help!: => We need a celery 实例对象 哎!!!**
4. 所以这里需要先 创建出<celery对象>
- 在celery_tasks包 下 , 新建程序执行入口模块 -> main.py
# celery_tasks/main.py
###################### 需先导入工程配置文件 #######################
# 作用: 比如 获取 Django项目的 redis 配置?!
import os
if not os.getenv('DJANGO_SETTINGS_MODULE'):
# 设置 添加 Django项目 的 setting 路径 到 os 环境变量
os.environ['DJANGO_SETTINGS_MODULE'] = 'mall.settings' # 要对应 自己的 Django 项目名
######################## 创建celery实例 ##########################
# main 其实 就是 给celery设置一个名字, 这个名字唯一 就可以
# 推荐使用 文件路径
from celery import Celery
app = Celery(main='celery_tasks')
############ 加载 config.py 配置文件(设置broker任务队列) ############
# 配置见下文
app.config_from_object('celery_tasks.config')
################## 实现 celery实例对象 自动检测任务 #################
# 参数: 列表
# 列表中 需要填写任务的包路径
app.autodiscover_tasks([celery_tasks.sms])
# end
- 上面的 main.py 中引入了一个 config配置文件
- 所以我们在这里 创建一个 config.py文件
好了, 暂停一下, 现在让我们先了解 两个概念:
broker: 是一个消息传输的中间件,可以理解为一个邮箱。每当应用程序调用celery的异步任务的时候,会向broker传递消息,而后celery的worker将会取到消息,进行对于的程序执行。好吧,这个邮箱可以看成是一个消息队列。
backend: 通常程序发送的消息,发完就完了,可能都不知道对方时候接受了。为此,celery实现了一个backend,用于存储这些消息以及celery执行的一些消息和结果。对于 brokers,官方推荐是rabbitmq和redis,至于backend,就是数据库啦。为了简单起见,我们都用redis。
5. 接上文, 配置 config.py ---> 即: <broker配置>
- config.py 配置文件 (设置broker任务队列相关)
# celery_tasks/config.py
# 配置一个 config.py, 存储配置信息, 实现 配置信息 存在于 单独的配置文件中
# 之后在main.py, 让实例对象 app 加载其中的配置
broker_url = 'redis://127.0.0.1/14' # 配置 borker 存储在 redis:14号 库
result_backend = 'redis://127.0.0.1/15' # 配置 backend 存储在 redis:15号 库
6. 创建 <任务>, 并用 上面的 celery 对象的task方法 装饰
在celery_tasks文件夹下, 新建 sms 包
在sms 包 下, 新建 tasks.py 模块
任务注意:
- 任务包下 所对应的py文件名, 必须为 tasks.py
- 所谓的 任务 其实 就是 函数
- 这个 函数 需要被 cekery 的实例对象的task装饰器 装饰
- 这个任务 需要 被celery实例对象 自动检测(已经在 main.py 中实现)
# celery_tasks/sms/tasks.py
from libs.yuntongxun.sms import CCP
#####################
# @app.task 需要 加 括号么???
#####################
#可以设置name参数
@app.task(name='send_sms_code') # 调用 app实例对象的task方法, 装饰这个 函数任务
def send_sms_code(mobile, sms_code):
# 发送手机号为: mobile, 发送数据为: sms_code, 有效期:5, 使用模板:1
CCP().send_templates_sms(mobile, [sms_code, 5], 1)
7. 在视图函数中 调用 task任务
# ...
from celery_tasks.sms.tasks import send_sms_code
# dalay 的参数 同 send_sms_code的参数
send_sms_code.delay(mobile, sms_code)
# ...
8. woker
# celery -A celery 实例对象所在的文件 worker -l info
celery -A celery_tasks.main worker -l info
划重点, 易错 总结:
1. 装饰器的写法错误:
@app.task
2. 调用方法 错误: delay - 延期
send_sms_code.delay(mobile, sms_code)
3. config.py 配置文件 调用
app.config_from_object("celery_tasks.config")
# 为什么也可以这样写 ???!!!
from celery_tasks import config
app.config_from_object(config)
4. redis-存储队列:
前提: 这是在 没有 woker 的情况下 积攒的 队列任务
- 存入的 元素 "celery", 是一个redis的
list 数据
-
新任务
是从左侧
插入的 - 提取时候 需要用 命令:
lrange "celery" 0 -1
127.0.0.1:6379[14]> keys *
1) "_kombu.binding.celery"
2) "celery"
127.0.0.1:6379[14]> type "celery"
list
127.0.0.1:6379[14]> lrange "celery" 0 -1
1) "{\"body\": \"W1siMTcxOTEwOTA4OTYiLCAiNjk4ODU5Il0sIHt9LCB7ImNhbGxiYWNrcyI6IG51bGwsICJlcnJiYWNrcyI6IG51bGwsICJjaGFpbiI6IG51bGwsICJjaG9yZCI6IG51bGx9XQ==\", \"content-encoding\": \"utf-8\", \"content-type\": \"application/json\", \"headers\": {\"lang\": \"py\", \"task\": \"celery_tasks.sms.tasks.send_sms_code\", \"id\": \"669ed6ce-b894-4fbb-bf22-074843305865\", \"eta\": null, \"expires\": null, \"group\": null, \"retries\": 0, \"timelimit\": [null, null], \"root_id\": \"669ed6ce-b894-4fbb-bf22-074843305865\", \"parent_id\": null, \"argsrepr\": \"('171xxxxx896', '698859')\", \"kwargsrepr\": \"{}\", \"origin\": \"gen1786@Dabenstone\"}, \"properties\": {\"correlation_id\": \"669ed6ce-b894-4fbb-bf22-074843305865\", \"reply_to\": \"de91a2aa-0236-3ca2-9e34-28af36442b00\", \"delivery_mode\": 2, \"delivery_info\": {\"exchange\": \"\", \"routing_key\": \"celery\"}, \"priority\": 0, \"body_encoding\": \"base64\", \"delivery_tag\": \"7fae1b62-61d8-4462-88c2-7662a7e293dd\"}}"
2) "{\"body\": \"W1siMTcxOTEwOTA4OTIiLCAiMDMzMzM3Il0sIHt9LCB7ImNhbGxiYWNrcyI6IG51bGwsICJlcnJiYWNrcyI6IG51bGwsICJjaGFpbiI6IG51bGwsICJjaG9yZCI6IG51bGx9XQ==\", \"content-encoding\": \"utf-8\", \"content-type\": \"application/json\", \"headers\": {\"lang\": \"py\", \"task\": \"celery_tasks.sms.tasks.send_sms_code\", \"id\": \"dea15283-18a4-4275-82c4-106175ce56a3\", \"eta\": null, \"expires\": null, \"group\": null, \"retries\": 0, \"timelimit\": [null, null], \"root_id\": \"dea15283-18a4-4275-82c4-106175ce56a3\", \"parent_id\": null, \"argsrepr\": \"('171xxxxx892', '033337')\", \"kwargsrepr\": \"{}\", \"origin\": \"gen1776@Dabenstone\"}, \"properties\": {\"correlation_id\": \"dea15283-18a4-4275-82c4-106175ce56a3\", \"reply_to\": \"653a7b98-6fd1-3d12-9048-c840c894773c\", \"delivery_mode\": 2, \"delivery_info\": {\"exchange\": \"\", \"routing_key\": \"celery\"}, \"priority\": 0, \"body_encoding\": \"base64\", \"delivery_tag\": \"e3773ab2-94a1-4257-a110-54dd418f5b20\"}}"
127.0.0.1:6379[14]>