celery的建议

按业务场景划分队列

一定一定要更具业务特点来划分出不同的队列,不能讲所有的任务都让同一队列消费。场景,工程当中有统计类的异步任务,也有发送红包给微信用户的异步任务。若不划分队列,那么当统计类的异步任务很多很多的时候,发送红包给微信用户的异步任务就要等待一段时间才能被执行到(任务多可能会等待十几分钟甚至一两个小时),这对用户体验来说是非常不友好的。此时就可以将队列划分为low, middle, high这几种队列,对于统计类的这些任务可以扔去low的队列中,而对于发送红包这种重要的任务就扔去high队列中,确保尽可能快的被执行到。具体划分,需要更加业务场景来划分。

通过-Q参数来指定队列名:

python celery worker -A test_project -Q low;
python celery worker -A test_project -Q middle;
python celery worker -A test_project -Q high;


# django代理中设置
CELERY_QUEUES = (
    Queue('low', Exchange('low', type='direct')),
    Queue('middle', Exchange('middle', type='direct')),
    Queue('high', Exchange('high', type='direct')),
)

对任务指定队列运行

方式1
@app.task(bind=True, queue='middle', name='send_wx_text')
def send_wx_text(self, target_origin_id, to_user, txt):
    ......
    
方式2
send_wx_text.apply_async((target_origin_id, to_user, txt), queue='middle')

方式3
CELERY_ROUTES = {
    'send_wx_text': {
        'queue': 'middle',
    },
}

处理结果需要关心的,不要返回处理结果

大部分异步任务都是不需要关心处理结果的,此时不需要将处理结果返回。由于返回处理结果,需要存储,这意味着有IO操作,会降低性能,若将结果存储在redis或者RabbitMQ 中,随着任务数的增加,消耗的内存也会增加(曾经因为这个原因爆过内存.....)。若非要存储结果,也要设置好结果的保存时间,以免内存泄漏。

设置例子

# 设置结果的保存时间
CELERY_TASK_RESULT_EXPIRES = 10*60

# 设置默认不存结果
CELERY_IGNORE_RESULT = True


# 任务单独指定, 会覆盖全局配置
@app.task(bind=True, ignore_result=True)
def send_wx_text(self, target_origin_id, to_user, txt):
    ......


并发数设置

对开启的进程数做过测试,发现当进程数等于cpu核心数时,性能是最好的,少了不能很好的利用多核,多了性能下降估计是因为开多的进程带来的性能的提升小于因上下文切换带来的性能损失。对于协程池的大小设置,需要进行测试来得出最佳数量。

例子:

[program:test_project-high-jobs]
command=/opt/.virtualenvs/test_project/bin/celery worker -A test_project -P gevent -c 100 -l INFO -Q high -n %%h-test_project-high --without-gossip --without-mingle --without-heartbeat
environment=PATH="/opt/.virtualenvs/test_project/bin"
directory=/srv/test_project.com/application
user=uwsgi
priority=10
autorestart=true
process_name=TEST_PROJECT_%(process_num)s
;进程数==核心数
numprocs=4
stdout_logfile=/var/log/supervisor/test_project-high-jobs.log
stderr_logfile=/var/log/supervisor/test_project-high-jobs.log

使用gevent时注意task中是否使用gevent不支持的库

利用gevent来部署worker, 若task使用了那些不支持gevent的库(例如requests的普通模式),会使到异步变同步,极其影响性能。这一点对于其它的协程库也是一样的例如eventlet。

对失败进行重试处理

重试需要注意的点

  • 明确哪些失败或错误是需要重试的,不要无条件的重试
  • 设定最大重试次数(max_retries),避免无限重试
  • 重试的延时最好是更具重试次数进行增量延时,给更多的时间让等待服务恢复正常,以提高成功率

简单的例子

def backoff(attempts):
    """
    1, 2, 4, 8, 16, 32, ...
    """
    return 2 ** attempts

# 设置max_retries, 限定重试次数
@app.task(bind=True, max_retries=3)
def send_wx_text(self, target_origin_id, to_user, txt):
    try:
        r, err = wx_api.send_text(to_user, txt)
        if err and int(err.code) in (45047):
            # 更具重试次数增加重试的延时的时间
            raise self.retry(args=[target_origin_id, to_user, txt], 
                             countdown=backoff(self.request.tretries))
    except BaseException as e:
        raise e
        

尽快失败,而不阻塞

对于一些爬虫类的异步任务,我们常常是使用代理去爬,一般来说代理服务商提供的代理能用的概率比较低,容易导致timeout,对于这些可能会失败的任务应该尽快让它失败而不是在等待它失败,避免做无畏的等待,进而提高性能。预估一下任务的正常处理时间,根据这个时间来规定一个任务的最大执行时间。

简单的例子

@app.task(bind=True, max_retries=3, soft_time_limit=5)
def send_wx_text(self, target_origin_id, to_user, txt):
    .....

对任务进行通用处理

任务失败或者重试,一般来说是需要进行记录,若每一个任务都写一遍相关的代码,这非常麻烦。此时,通过执行任务的base参数对任务进行统一处理。

例子

from myproject.tasks import app

# 具体的可以看看Task类中的方法
class BaseTask(app.Task):
    abstract = True

    def on_retry(self, exc, task_id, args, kwargs, einfo):
        sentrycli.captureException(exc)
        super(BaseTask, self).on_retry(exc, task_id, args, kwargs, einfo)

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        sentrycli.captureException(exc)
        super(BaseTask, self).on_failure(exc, task_id, args, kwargs, einfo)
        
        
@app.task(bind=True, max_retries=3, soft_time_limit=5, ignore_result=True, base=BaseTask)
def send_wx_text(self, target_origin_id, to_user, txt):
    .......

使用RabbitMQ作为broker

理由:

  • 更稳定
  • 消耗内存比redis要小,用redis做broker当任务量很多时,内存极剧上升,估计有内存泄漏问题
  • rabbitmq-plugins这监控工具非常好用(用flower作为监控工具,在测试服务器部署跑,内存会随着时间的推移而不断增加,估计有内存泄漏问题)

跨应用的task生产

场景:工程A利用tornado接收微信转发过来的用户信息,但信息的处理是在使用Django的工程B中。也就是说,由工程A做消息的生产者,工程B作为消息的消费者。若工程A,B的broker都是一样的,那么只需要在工程A设置一个同名的异步任务并指定exchange, routing_key即可。若broker不一致, 则先指定broker,指定exchange, 指定routing_key。原理就是,amqp协议中,只要指定了exchange和routing_key,就可以将任务分发到绑定的队列当中了。celery库做了封装,会更具queue来指定exchange和routing_key。

相同broker的例子

#工程B中的代码
@app.task(bind=True, name='send_wx_text',
max_retries=3, soft_time_limit=5, ignore_result=True, base=BaseTask)
def send_wx_text(self, target_origin_id, to_user, txt):
    # 含有具体实现
    ......


# 工程A中的代码
from celery.app.task import Task
send_wx_text = Task()
send_wx_text.name='send_wx_text'
send_wx_text.apply_async([target_origin_id, to_user, txt],exchange='test', routing_key='test')


不同broker的例子

#工程B中的代码
@app.task(bind=True, name='send_wx_text',
max_retries=3, soft_time_limit=5, ignore_result=True, base=BaseTask)
def send_wx_text(self, target_origin_id, to_user, txt):
    # 含有具体实现
    ......


# 工程A中的代码
from celery.app.task import Task
from celery import Celery
app = Celery(borker='redis://127.0.0.1:6379/15')
send_wx_text = Task()
send_wx_text.bind(app)
send_wx_text.name='send_wx_text'
send_wx_text.apply_async([target_origin_id, to_user, txt], exchange='test', routing_key='test')
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,776评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,527评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,361评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,430评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,511评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,544评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,561评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,315评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,763评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,070评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,235评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,911评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,554评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,173评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,424评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,106评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,103评论 2 352

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,647评论 18 139
  • RabbitMQ详解 本文地址:http://www.host900.com/index.php/articles...
    嘉加家佳七阅读 2,511评论 0 9
  • “ 消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列...
    落羽成霜丶阅读 3,984评论 1 41
  • 就在最近,田亮晒出和总教头郎平以及队员朱婷、张常宁、袁心玥合影。 字里行间充斥着愉悦与激动。网友们也难掩对女排姑娘...
    身高管理师阅读 829评论 0 0
  • 题目1: DOM0 事件和DOM2级在事件监听使用方式上有什么区别? 在DOM0级处理程序,事件名以on开头,比如...
    ShawnRong阅读 230评论 0 0