python-分布式任务队列

celery 分布式任务队列工具

Celery是一个分布式任务队列工具,是一个异步的任务队列基于分布式消息传递

基本

  • Broker: 消息队列使用的中间人 有RabbiMQ redis mongodb 等一系列数据库
  • Task: 用来定义任务
  • backend: 用来保存结果
  • Worker: 执行单元, 用来从中间人取出任务 , 并把结果发给backen

数据保存

各个sqalchemy数据库


CELERY_RESULT_BACKEND = 'db+scheme://user:password@host:port/dbname'

# sqlite (filename) CELERY_RESULT_BACKEND = ‘db+sqlite:///results.sqlite’

# mysql CELERY_RESULT_BACKEND = ‘db+mysql://scott:tiger@localhost/foo’

# postgresql CELERY_RESULT_BACKEND = ‘db+postgresql://scott:tiger@localhost/mydatabase’

# oracle CELERY_RESULT_BACKEND = ‘db+oracle://scott:tiger@127.0.0.1:1521/sidname’

worker 设置

CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_ACCEPT_CONTENT = ['pickle']
# 设置队列 feeds
CELERY_ROUTES = {'feed.tasks.import_feed': {'queue': 'feeds'}}
BROKER_URL = 'amqp://'      # 指定 Broker
CELERY_RESULT_BACKEND = ''  # 指定 Backend
CELERY_TIMEZONE='Asia/Shanghai'                     # 指定时区,默认是 UTC
# CELERY_TIMEZONE='UTC'
CELERY_IMPORTS = (                                  # 指定导入的任务模块
    'celery_app.service_monitor',
    'celery_app.service_diagnose'
)
消息机制    支持 RabbitMQ, Redis, Beanstalk, MongoDB, CouchDB,
以及 SQL 数据库.
容错      与 RabbitMQ配合可完美实现错误恢复
分布式     运行于一台或多台服务器。支持Broker群集和HA,可任意添加worker而无需在服务器中心节点配置。
并发      通过Python的multiprocessing, Eventlet, gevent 或者他们的混合实现并发执行.
Scheduling  支持cron类的递归式任务,或者指定时间、倒数等任务执行方式.
延迟      极低延迟.
返回值     任务运行结果可储存在指定的结果存储后台,你可以等待结果或忽略运算结果
返回值存储   支持SQL数据库, MongoDB, Redis, Tokyo
Tyrant, Cassandra, 或 AMQP (消息通知).
Webhooks    用户跨语言/平台任务分配。
Rate limiting   Supports rate limiting by using the token bucket algorithm, which accounts for bursts of traffic. Rate limits can be set for each task type, or globally for all.  
消息路由     通过AMQP灵活的路由模型你可以将任务路由到任意worker服务器,可配置或运行时指定。
远程控制     可通过广播消息远程控制worker节点。Celery内置了大量的相关命令,也可以轻松实现自定义命令(只适用AMQP和Redis)
监控      可实时获得workers的一切信息
对象序列化   支持 Pickle, JSON, YAML,或自定义序列化程序. 
错误追踪     
UUID    每个任务都有一个UUID用于查询该任务的运行状态以及返回值。
出错重试    当任务执行失败时可根据配置重试。配置内容包括最大重试次数,重试时间间隔。
任务集 任务集由多个子任务构成,可以获得子任务的数量,执行情况,以及各个子任务的运算结果。
Made for Web    可通过Ajax查询任务运行状态和运行结果。
出错通知    当任务出错是可通过邮件通知管理员

任务调用

service_monitor 为注册任务

延迟调用

add.s(2, 2)()

简单调用

service_monitor(url)

可设置回调调用

service_monitor.apply_async(args=["http://www.baidu.com"], link=res.s())

所有组任务执行完后 所有结果发送到res任务中去 也就是回调res任务

chord((service_monitor.s("http://www.baidu.com") for i in range(3)) , res.s())()

设置每个任务执行后的任务 .s 前任务作为参数的一部分

add.apply_async((2, 2), link=add.s(16))

链接任务 结果发给后任务

chain(add.s(4, 4) | mul.s(8))().get()
另一种用法
g = chain(add.s(4) | mul.s(8))
g(4).get()
还有一种
(add.s(4, 4) | mul.s(8))().get()

任务可设置参数

add.apply_async((10, 10), serializer='json')
add.apply_async((2, 2), compression='zlib')
add.apply_async((2,2), countdown=10, debug=True)
s = add.subtask((2, 2), {'debug': True}, countdown=10)# args kwargs options

应用

task.py

from celery import Celery

app = Celery('tasks', broker='amqp://guest@localhost//')

@app.task
def add(x, y):
    return x + y

运行

celery -A tasks worker --loglevel=info

此时 一个worker 已经开始运行

单机多个worker

$ celery worker --loglevel=INFO --concurrency=10 -n worker1.%h
$ celery worker --loglevel=INFO --concurrency=10 -n worker2.%h
$ celery worker --loglevel=INFO --concurrency=10 -n worker3.%h

下发任务

test.py

# -*- coding: utf-8 -*-
from celery_app.service_monitor import service_monitor, res
from celery import chord
result = chord((service_monitor.s("http://www.baidu.com") for i in range(3)) ,  res.s())()
# 查看任务
result.ready()
result.get(timeout=1)

交换与路由

enter description here
enter description here

三种交换方式

Direct Exchange
直接交换,也就是指定一个消息被其中一个队列接收,这个消息被celerybeat定义一个routing key,如果你发送给交换机并且那个队列且绑定的bingdingkey,那么就会直接被转给这个Queue.
Topic Exchange
这种交换方式可以根据类型的属性进行统配,然后根据统配的类型进行交换到指定的Queue.

Fanout Exchange
广播交换,如果你有某个task,可能处理时间比较长,但是却要求很高的实时性,那么你可能需要多台服务器的多个worker进行处理,每个worker负责其中一部分工作,但是celerybeat 只会生成一个任务,被某个worker取走就没有了,所以你需要让每个服务器的队列都要收到这个消息,这里很需要注意的是:你的fanout类型的消息在生成的时候要有多份,每个队列一份,而不是一个消息发送给单一队列的次数。

celery -A proj worker -Q feeds,celery
为该worker指定一或多个消息队列, worker只取该队列中的任务。可以指定多个队列.
调用

service_monitor.apply_async(args=['http://cnn.com/rss'],queue='feeds',routing_key='feeds')

优先级

单个任务优先级


from kombu import Exchange, Queue
app.conf.task_queues = [
    Queue('tasks', Exchange('tasks'), routing_key='tasks',
          queue_arguments={'x-max-priority': 10},
]

统一优先级

app.conf.task_queue_max_priority = 10

发送广播

current_app.control.broadcast('rate_limit',
arguments={'task_name': 'celery_app.service_monitor.service_monitor',
'rate_limit': '200/m'})
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350

推荐阅读更多精彩内容