按业务场景划分队列
一定一定要更具业务特点来划分出不同的队列,不能讲所有的任务都让同一队列消费。场景,工程当中有统计类的异步任务,也有发送红包给微信用户的异步任务。若不划分队列,那么当统计类的异步任务很多很多的时候,发送红包给微信用户的异步任务就要等待一段时间才能被执行到(任务多可能会等待十几分钟甚至一两个小时),这对用户体验来说是非常不友好的。此时就可以将队列划分为low, middle, high这几种队列,对于统计类的这些任务可以扔去low的队列中,而对于发送红包这种重要的任务就扔去high队列中,确保尽可能快的被执行到。具体划分,需要更加业务场景来划分。
通过-Q参数来指定队列名:
python celery worker -A test_project -Q low;
python celery worker -A test_project -Q middle;
python celery worker -A test_project -Q high;
# django代理中设置
CELERY_QUEUES = (
Queue('low', Exchange('low', type='direct')),
Queue('middle', Exchange('middle', type='direct')),
Queue('high', Exchange('high', type='direct')),
)
对任务指定队列运行
方式1
@app.task(bind=True, queue='middle', name='send_wx_text')
def send_wx_text(self, target_origin_id, to_user, txt):
......
方式2
send_wx_text.apply_async((target_origin_id, to_user, txt), queue='middle')
方式3
CELERY_ROUTES = {
'send_wx_text': {
'queue': 'middle',
},
}
处理结果需要关心的,不要返回处理结果
大部分异步任务都是不需要关心处理结果的,此时不需要将处理结果返回。由于返回处理结果,需要存储,这意味着有IO操作,会降低性能,若将结果存储在redis或者RabbitMQ 中,随着任务数的增加,消耗的内存也会增加(曾经因为这个原因爆过内存.....)。若非要存储结果,也要设置好结果的保存时间,以免内存泄漏。
设置例子
# 设置结果的保存时间
CELERY_TASK_RESULT_EXPIRES = 10*60
# 设置默认不存结果
CELERY_IGNORE_RESULT = True
# 任务单独指定, 会覆盖全局配置
@app.task(bind=True, ignore_result=True)
def send_wx_text(self, target_origin_id, to_user, txt):
......
并发数设置
对开启的进程数做过测试,发现当进程数等于cpu核心数时,性能是最好的,少了不能很好的利用多核,多了性能下降估计是因为开多的进程带来的性能的提升小于因上下文切换带来的性能损失。对于协程池的大小设置,需要进行测试来得出最佳数量。
例子:
[program:test_project-high-jobs]
command=/opt/.virtualenvs/test_project/bin/celery worker -A test_project -P gevent -c 100 -l INFO -Q high -n %%h-test_project-high --without-gossip --without-mingle --without-heartbeat
environment=PATH="/opt/.virtualenvs/test_project/bin"
directory=/srv/test_project.com/application
user=uwsgi
priority=10
autorestart=true
process_name=TEST_PROJECT_%(process_num)s
;进程数==核心数
numprocs=4
stdout_logfile=/var/log/supervisor/test_project-high-jobs.log
stderr_logfile=/var/log/supervisor/test_project-high-jobs.log
使用gevent时注意task中是否使用gevent不支持的库
利用gevent来部署worker, 若task使用了那些不支持gevent的库(例如requests的普通模式),会使到异步变同步,极其影响性能。这一点对于其它的协程库也是一样的例如eventlet。
对失败进行重试处理
重试需要注意的点
- 明确哪些失败或错误是需要重试的,不要无条件的重试
- 设定最大重试次数(max_retries),避免无限重试
- 重试的延时最好是更具重试次数进行增量延时,给更多的时间让等待服务恢复正常,以提高成功率
简单的例子
def backoff(attempts):
"""
1, 2, 4, 8, 16, 32, ...
"""
return 2 ** attempts
# 设置max_retries, 限定重试次数
@app.task(bind=True, max_retries=3)
def send_wx_text(self, target_origin_id, to_user, txt):
try:
r, err = wx_api.send_text(to_user, txt)
if err and int(err.code) in (45047):
# 更具重试次数增加重试的延时的时间
raise self.retry(args=[target_origin_id, to_user, txt],
countdown=backoff(self.request.tretries))
except BaseException as e:
raise e
尽快失败,而不阻塞
对于一些爬虫类的异步任务,我们常常是使用代理去爬,一般来说代理服务商提供的代理能用的概率比较低,容易导致timeout,对于这些可能会失败的任务应该尽快让它失败而不是在等待它失败,避免做无畏的等待,进而提高性能。预估一下任务的正常处理时间,根据这个时间来规定一个任务的最大执行时间。
简单的例子
@app.task(bind=True, max_retries=3, soft_time_limit=5)
def send_wx_text(self, target_origin_id, to_user, txt):
.....
对任务进行通用处理
任务失败或者重试,一般来说是需要进行记录,若每一个任务都写一遍相关的代码,这非常麻烦。此时,通过执行任务的base参数对任务进行统一处理。
例子
from myproject.tasks import app
# 具体的可以看看Task类中的方法
class BaseTask(app.Task):
abstract = True
def on_retry(self, exc, task_id, args, kwargs, einfo):
sentrycli.captureException(exc)
super(BaseTask, self).on_retry(exc, task_id, args, kwargs, einfo)
def on_failure(self, exc, task_id, args, kwargs, einfo):
sentrycli.captureException(exc)
super(BaseTask, self).on_failure(exc, task_id, args, kwargs, einfo)
@app.task(bind=True, max_retries=3, soft_time_limit=5, ignore_result=True, base=BaseTask)
def send_wx_text(self, target_origin_id, to_user, txt):
.......
使用RabbitMQ作为broker
理由:
- 更稳定
- 消耗内存比redis要小,用redis做broker当任务量很多时,内存极剧上升,估计有内存泄漏问题
- rabbitmq-plugins这监控工具非常好用(用flower作为监控工具,在测试服务器部署跑,内存会随着时间的推移而不断增加,估计有内存泄漏问题)
跨应用的task生产
场景:工程A利用tornado接收微信转发过来的用户信息,但信息的处理是在使用Django的工程B中。也就是说,由工程A做消息的生产者,工程B作为消息的消费者。若工程A,B的broker都是一样的,那么只需要在工程A设置一个同名的异步任务并指定exchange, routing_key即可。若broker不一致, 则先指定broker,指定exchange, 指定routing_key。原理就是,amqp协议中,只要指定了exchange和routing_key,就可以将任务分发到绑定的队列当中了。celery库做了封装,会更具queue来指定exchange和routing_key。
相同broker的例子
#工程B中的代码
@app.task(bind=True, name='send_wx_text',
max_retries=3, soft_time_limit=5, ignore_result=True, base=BaseTask)
def send_wx_text(self, target_origin_id, to_user, txt):
# 含有具体实现
......
# 工程A中的代码
from celery.app.task import Task
send_wx_text = Task()
send_wx_text.name='send_wx_text'
send_wx_text.apply_async([target_origin_id, to_user, txt],exchange='test', routing_key='test')
不同broker的例子
#工程B中的代码
@app.task(bind=True, name='send_wx_text',
max_retries=3, soft_time_limit=5, ignore_result=True, base=BaseTask)
def send_wx_text(self, target_origin_id, to_user, txt):
# 含有具体实现
......
# 工程A中的代码
from celery.app.task import Task
from celery import Celery
app = Celery(borker='redis://127.0.0.1:6379/15')
send_wx_text = Task()
send_wx_text.bind(app)
send_wx_text.name='send_wx_text'
send_wx_text.apply_async([target_origin_id, to_user, txt], exchange='test', routing_key='test')