前言
最近某个Flask Web项目需要定时读取数据库,并对数据进行更新,想了想还是有自己的实现办法的:
引入threading
from threading import Thread
thr = Thread(target=timed_task)
thr.start()
这样就通过一个函数构造了一个线程,每次在manager.run()
之前加上这几行代码,就大概可以实现要求了
但是存在问题,Python有GIL全局锁限制,一个Python进程实际上总是一个线程在跑,无法充分使用CPU(可能还可以使用多进程Process,但博主不太熟悉这方面,没有尝试),自然性能上会有很大问题。
并且还有一个问题是,博主使用uwsgi在服务器上部署Web应用,所以在这里写出这样的代码就太过牵强了,我想应该有一个工具能够不断的监控、分配任务、执行任务,它就是Celery
。
Celery介绍
Celery翻译为“队列”,它的工作过程也自然离不开这个概念。
Celery里有两个模块:worker和beat
worker:用于执行队列中的任务
beat:用于定时分配任务
这两个模块可以同时启动,也可以分别启动
任务可以有两个来源
代码内调用:将需要放入执行队列的任务函数import进来
beat定时派发:在配置文件里设置好需要调用的任务函数和调用周期,beat就会自动派发任务到队列里了
Celery安装
Celery使用之前需要配置消息中间件
一般是用Redis数据库来通讯更加方便,需要本地安装redis服务并启动,具体操作请参考其他博客和官方文档。
Python也需要安装redis支持
pip install redis
用pip可以非常简单地安装
pip install celery
Celery最简单的Demo
我们先来看看这个代码,它是一个最简单的celery程序
celery_demo.py
import celery
import time
worker = celery.Celery("celery_name", backend="redis://localhost:6379/", broker="redis://localhost:6379/")
@worker.task
def hello():
return "hello,{}".format(time.time())
这样每次处理hello这个任务的时候,就会返回“hello,”加上一个时间戳
如此而来,我们只是定义好了任务函数和worker(celery对象)
我们还需要创建一个py来调用这个模块(当然你也可以直接在命令行把这个模块import进去)
celery_schedule.py
from celery_demo import hello
hello.delay()
每运行一次celery_schedule.py,一个hello任务就会被放入任务队列,等待worker执行
现在我们已经将它运行了一次,我们需要开启worker来执行它
在命令行运行如下代码来启动worker:
celery worker -A celery_demo.worker -l info
// 后面的-l info参数意思是开启日志模式,所有消息将会打印在命令行
可以发现命令行已经出现结果了
[2018-09-01 19:02:32,218: INFO/MainProcess] Connected to redis://localhost:6379//
[2018-09-01 19:02:32,224: INFO/MainProcess] mingle: searching for neighbors
[2018-09-01 19:02:33,238: INFO/MainProcess] mingle: all alone
[2018-09-01 19:02:33,274: INFO/MainProcess] celery@exqlnet-PC ready.
[2018-09-01 19:02:33,275: INFO/MainProcess] Received task: celery_demo.hello[9f32d5e8-282f-44b1-a6b7-39d21682b5f7]
[2018-09-01 19:02:33,276: INFO/MainProcess] Received task: celery_demo.hello[bb4342f4-4950-4b9d-b0d1-dd20614b8b29]
[2018-09-01 19:02:33,276: INFO/MainProcess] Received task: celery_demo.hello[8edd36ba-eadc-428a-9398-06f7910e777f]
[2018-09-01 19:02:33,285: INFO/ForkPoolWorker-1] Task celery_demo.hello[9f32d5e8-282f-44b1-a6b7-39d21682b5f7] succeeded in 0.00840658400557004s: 'hello,1535799753.2767727'
[2018-09-01 19:02:33,285: INFO/ForkPoolWorker-3] Task celery_demo.hello[8edd36ba-eadc-428a-9398-06f7910e777f] succeeded in 0.007285808002052363s: 'hello,1535799753.278022'
[2018-09-01 19:02:33,290: INFO/ForkPoolWorker-2] Task celery_demo.hello[bb4342f4-4950-4b9d-b0d1-dd20614b8b29] succeeded in 0.013728965997870546s: 'hello,1535799753.2767704'
Celery定时任务
那么既然已经可以简单实现任务分配和执行了,那么如何定时分配任务呢?
我们在celery_demo.py里加一些东西:
import celery
import time
from datetime import timedelta
worker = celery.Celery("celery_name", backend="redis://localhost:6379/", broker="redis://localhost:6379/")
class Config:
CELERYBEAT_SCHEDULE = {
'update_info': {
'task': 'celery_demo.hello',
"schedule": timedelta(seconds=3),
}
}
worker.config_from_object(Config)
@worker.task
def hello():
return "hello,{}".format(time.time())
类Config是一个配置类,CELERYBEAT_SCHEDULE是用来配置定时任务的,具体的直接看代码
worker.config_from_object()传入一个配置类或对象即可加载配置
一开始就说到Celery的beat才是定时安排任务的工具,所以我们需要用beat来启动定时,在命令行运行以下代码:
celery beat -A celery_demo.worker -l info
启动beat,现在应该可以在命令行看到如下信息:
LocalTime -> 2018-09-01 19:12:53
Configuration ->
. broker -> redis://localhost:6379//
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%INFO
. maxinterval -> 5.00 minutes (300s)
[2018-09-01 19:12:53,821: INFO/MainProcess] beat: Starting...
[2018-09-01 19:12:53,839: INFO/MainProcess] Scheduler: Sending due task update_info (celery_demo.hello)
[2018-09-01 19:12:56,827: INFO/MainProcess] Scheduler: Sending due task update_info (celery_demo.hello)
[2018-09-01 19:12:59,827: INFO/MainProcess] Scheduler: Sending due task update_info (celery_demo.hello)
[2018-09-01 19:13:02,827: INFO/MainProcess] Scheduler: Sending due task update_info (celery_demo.hello)
[2018-09-01 19:13:05,827: INFO/MainProcess] Scheduler: Sending due task update_info (celery_demo.hello)
说明任务已经分配了,由于停留时间,我这里已经分配了5个任务到队列里
再次运行启动worker命令
celery worker -A celery_demo.worker -l info
可以在屏幕上看到以下信息了
[tasks]
. celery_demo.hello
[2018-09-01 19:14:40,146: INFO/MainProcess] Connected to redis://localhost:6379//
[2018-09-01 19:14:40,152: INFO/MainProcess] mingle: searching for neighbors
[2018-09-01 19:14:41,163: INFO/MainProcess] mingle: all alone
[2018-09-01 19:14:41,172: INFO/MainProcess] celery@exqlnet-PC ready.
[2018-09-01 19:14:41,336: INFO/MainProcess] Received task: celery_demo.hello[8edf753c-edd8-4bf0-b708-22dc53fcf07a]
[2018-09-01 19:14:41,338: INFO/MainProcess] Received task: celery_demo.hello[d747f1a7-12fa-4557-8a98-4db0d4c6f9b3]
[2018-09-01 19:14:41,340: INFO/MainProcess] Received task: celery_demo.hello[0925b2ba-4c24-428c-958b-5d2072292e8e]
[2018-09-01 19:14:41,347: INFO/MainProcess] Received task: celery_demo.hello[c63183f5-c191-42c3-99c4-128555100b69]
[2018-09-01 19:14:41,350: INFO/MainProcess] Received task: celery_demo.hello[8c74ab95-83b7-4d96-a724-044dd4276c30]
[2018-09-01 19:14:41,352: INFO/MainProcess] Received task: celery_demo.hello[86410550-3817-47c5-8f1b-8c3bf467ae72]
[2018-09-01 19:14:41,352: INFO/MainProcess] Received task: celery_demo.hello[e00e896f-79cf-40c4-82a7-59b89beebd78]
[2018-09-01 19:14:41,353: INFO/MainProcess] Received task: celery_demo.hello[7ee3d655-5d20-45fd-85b0-6de38dcf2958]
[2018-09-01 19:14:41,354: INFO/MainProcess] Received task: celery_demo.hello[e0bcb917-693b-4bea-97fc-0987f337c6bc]
[2018-09-01 19:14:41,355: INFO/MainProcess] Received task: celery_demo.hello[2f3068bb-d7dc-4f82-8d70-76a03e2fd682]
[2018-09-01 19:14:41,359: INFO/ForkPoolWorker-2] Task celery_demo.hello[c63183f5-c191-42c3-99c4-128555100b69] succeeded in 0.011140925002109725s: 'hello,1535800481.348562'
[2018-09-01 19:14:41,360: INFO/ForkPoolWorker-4] Task celery_demo.hello[8edf753c-edd8-4bf0-b708-22dc53fcf07a] succeeded in 0.021171232001506723s: 'hello,1535800481.3396409'
[2018-09-01 19:14:41,361: INFO/ForkPoolWorker-2] Task celery_demo.hello[8c74ab95-83b7-4d96-a724-044dd4276c30] succeeded in 0.0003823369988822378s: 'hello,1535800481.3609214'
[2018-09-01 19:14:41,362: INFO/ForkPoolWorker-3] Task celery_demo.hello[d747f1a7-12fa-4557-8a98-4db0d4c6f9b3] succeeded in 0.02279239599738503s: 'hello,1535800481.33969'
[2018-09-01 19:14:41,362: INFO/ForkPoolWorker-4] Task celery_demo.hello[86410550-3817-47c5-8f1b-8c3bf467ae72] succeeded in 0.0008120330021483824s: 'hello,1535800481.3620644'
[2018-09-01 19:14:41,363: INFO/ForkPoolWorker-2] Task celery_demo.hello[e00e896f-79cf-40c4-82a7-59b89beebd78] succeeded in 0.000599899998633191s: 'hello,1535800481.362508'
[2018-09-01 19:14:41,364: INFO/ForkPoolWorker-4] Task celery_demo.hello[7ee3d655-5d20-45fd-85b0-6de38dcf2958] succeeded in 0.0004470089988899417s: 'hello,1535800481.3638816'
[2018-09-01 19:14:41,364: INFO/ForkPoolWorker-3] Task celery_demo.hello[e0bcb917-693b-4bea-97fc-0987f337c6bc] succeeded in 0.000407947001804132s: 'hello,1535800481.363944'
[2018-09-01 19:14:41,364: INFO/ForkPoolWorker-2] Task celery_demo.hello[2f3068bb-d7dc-4f82-8d70-76a03e2fd682] succeeded in 0.00035140299587510526s: 'hello,1535800481.3644059'
[2018-09-01 19:14:41,365: INFO/ForkPoolWorker-1] Task celery_demo.hello[0925b2ba-4c24-428c-958b-5d2072292e8e] succeeded in 0.016904400996281765s: 'hello,1535800481.3485875'