简介
Celery是一个用于执行异步任务的框架。这个框架使用python编写,使用这个框架,很容易就能将Web应用中的一些耗时的作业转交给工作池,让工作池中的worker以异步的方式执行这些作业。
架构
一般通过启动一个或多个worker进程来部署Celery。这些worker进程连接上消息代理(以下称之为broker)来获取任务请求。broker随机将任务请求分发给worker。通过调用Celery的API,用户生成一个任务请求,并且将这个请求发布给broker。在worker完成任务后,将完成的任务信息发送给broker。
通过启动新的worker进程并让这些进程连上broker,可以很方便的扩展worker池。每个worker可以和其他的worker同步执行任务。
broker的选择
官方支持的broker列表如下
Name | Status | Monitoring | Remote Control |
---|---|---|---|
RabbitMQ | Stable | Yes | Yes |
Redis | Stable | Yes | Yes |
Amazon SQS | Stable | No | No |
Zookeeper | Experimental | No | No |
存储结果
如果需要存储任务的结果,需要给Celery配置一个result store。Redis和memcache是很好的选择。
创建Celery应用
首先,安装Celery
pip install celery
然后,为celery应用创建一个模块。对于小的应用,通常的做法是把所有代码放在一个叫tasks.py的文件中
import celery
app = celery.Celery('example')
编写任务
task 是Celery中最基本的单元。Celery有很多装饰器来定义task,只需要编写一个函数并且加上一个装饰器,就能注册一个能异步执行的任务,比如下面这个函数
@app.task
def add(x, y):
return x+y
通过下面的代码,就能让这个任务异步执行
add.delay(1,2)
最佳实践
管理异步任务
Celery中的任务都是异步执行的,也就是说,在主进程中调用Celery函数后,这个函数会将任务信息发送给broker后立刻返回。
有两种方式可以获取任务执行的结果。一种方式是将执行的结果持久化,比如写入数据库。
在大多数情况下,将异步任务的结果写入数据库是很低效的。另外一种方式是使用Celery中结果存储(result store)的功能。这个功能是可选的,需要配置相关的参数。
选择序列化格式
Celery任务的输入和输出都要经过序列化和反序列化。序列化会带来一系列的问题,最好在设计任务的时候就将这点考虑到。
Celery默认会使用Pickle来对消息进行序列化。Pickle的好处是简单易用,但是在使用的过程中会有一些坑。当代码发生变动时,已经序列化的对象,反序列化后依然是变更前的代码。
好的实践是使用JSON作为序列化格式,使用JSON,不仅可以强迫开发者认真地设计参数,还可以避免使用pickle带来的安全隐患。
使用下面的配置
CELERY_TASK_SERIALIZER=json
短任务
在Celery的worker池中,worker并发地执行任务。因此,将任务设置成多线程是有意义的。每个任务应该尽可能做最小的有用工作量,以便尽可能高效地分配工作。
在发布任务的过程中,我们会将任务通过网络发给broker,broker通过网络发给worker,在worker上对任务进行反序列化,这个过程中的开销比线程之间传递任务信息要大的多。我们在设计任务的时候要考虑到这一点。如果把向数据库插入一条数据作为一个Celery任务,对资源的利用率将是不高的。但是,在同一个任务中进行1次API调用而不是几次,将会有很大的区别。
短任务会让部署和重启变得容易些。并且会比那些长任务更加不容易出错。
为任务设置超时
当执行一个任务成百上千次时,由于网络问题可能导致一个任务卡住,导致队列被阻塞而不能处理更多的任务。可以通过设置hard timeouts和soft timeouts来解决这个问题。
相关文档见这里。
创建幂等的任务
任务可能会由于各种各样的原因报错或者被打断。在分布式系统中,与其想办法处理所有可能导致出错的情景,不如在设计任务时实现幂等性。在任务开始时,永远不要预设系统的状态。尽可能不要改变外部的状态。
任务在重跑时带来的副作用越小,一个分布式的系统就越能够自我修复。举个例子,当一个预想不到的错误发生时,幂等的任务只要告诉Celery去重跑就行。如果错误是短暂的,任务的幂等性能使得系统在没有人为干涉的情况下能很快自我修复。
使用acks_late
当worker收到broker发来的任务时,worker会向broker回复一个确认信息(acknowledgement)。通常情况下,broker在收到这个ack后,会将这个任务从队列中移除。但是,加入worker在执行任务的时候突然挂掉,并且已经向broker发送的确认信息,这个任务将不会再次执行。Celery在检测到worker挂掉的情况下,会尝试向其他的worker重新发送这个任务信息。但是在一些极端情况下,比如网络挂掉,硬件错误或者其他的场景下,Celery不能正确地处理这一情况。
可以通过配置acks_late=True
,使worker只有在任务完成(成功/失败)的情况下,才向broker发送确认信息。在任务信息不能丢失的场景中,这个功能是及其有用的。
但是,只有在任务被设计成幂等,以及短任务的情况(broker在将任务发送给新的worker前,会保留任务信息一段时间)下,才有用的一个配置项。
自定义task类
虽然任务看起来像一个函数,但是在用Celery装饰器装饰那个函数时,实际上是返回了一个类,这个类实现了__call__方法。(这也是为什么task可以被绑定到self,并且可以使用delay和apply_async方法的原因)。这个装饰器使用起来很方便,但是有些情况下,一些任务可能会有同样的特性,或者执行同样的流程,如果继续沿用函数式的写法,可能不会很好地表达这种特性。
通过创建celery.Task的抽象子类,可以通过继承,来构建一套其他任务所需的工具和行为。子类的常见行为包括,设置rate和重试行为,初始化,甚至是一些配置项。
比如下面这个例子
class SubTask(Task):
abstract = True
default_retry_delay = 1
max_retries = 3
ignore_result = True
task_time_limit = 15
acks_late = True
Canvas
Celery的文档中提到了一些可以将多个任务组成一个工作流的方法。现在就开始熟悉这些方法吧。不损害上面原则的前提下,它们提供了一些方法来完成复杂的任务。尤其是chord,这个方法让任务并发执行,并且在任务完成的时候将结果传递给其他的任务。当然,这样的特性要求配置一个result store。