最近在写一个分布式微博爬虫,主要就是使用celery做的分布式任务调度。celery确实比较好用,但是也遇到一些问题,我遇到的问题主要集中在定时任务和任务路由这两个部分。本文不会讲解celery的基本使用,如果需要看celery入门教程的话,请点击这里跳转。
celery worker -A app_name -l info
必须推荐在项目的根目录运行而且,这里的app_name必须是项目中的Celery实例的完整引用路径*。如果不在项目根目录运行,那么相关的调用也得切换到app同级目录下,这一点可以通过命令行进行佐证celery的定时任务会有一定时间的延迟。比如,我规定模拟登陆新浪微博任务每隔10个小时执行一次,那么定时任务第一次执行就会在开启定时任务之后的10个小时后才会执行。而我抓取微博需要马上执行,需要带上cookie,所以不能等那1个小时。这个没有一个比较好的解决方法,可以使用celery的
crontab()
来代替schdule
做定时,它会在启动的时候就执行。我采用的方法是第一次手动执行该任务,然后再通过schedule
执行。celery的定时任务可能会让任务重复。定时器一定只能在一个节点启动,否则会造成任务重复。另外,如果当前worker节点都停止了,而beat在之后才停止,那么下一次启动worker的时候,它还会执行上一次未完成的任务,可能会有重复。
由于抓取用户和抓取用户关注、粉丝的任务耗时和工作量不同,所以需要使用任务路由,将任务按比重合理分配到各个分布式节点上,这就需要使用到celery提供的task queue。如果单独使用task queue还好,但是和定时任务一起使用,就可能出现问题。我遇到的问题就是定时任务压根就不执行!开始我的配置大概就是这样
app.conf.update(
CELERY_TIMEZONE='Asia/Shanghai',
CELERY_ENABLE_UTC=True,
CELERY_ACCEPT_CONTENT=['json'],
CELERY_TASK_SERIALIZER='json',
CELERY_RESULT_SERIALIZER='json',
CELERYBEAT_SCHEDULE={
'user_task': {
'task': 'tasks.user.excute_user_task',
'schedule': timedelta(minutes=3),
},
'login_task': {
'task': 'tasks.login.excute_login_task',
'schedule': timedelta(hours=10),
},
},
CELERY_QUEUES=(
Queue('login_queue', exchange=Exchange('login', type='direct'), routing_key='for_login'),
Queue('user_crawler', exchange=Exchange('user_info', type='direct'), routing_key='for_user_info'),
Queue('fans_followers', exchange=Exchange('fans_followers', type='direct'), routing_key='for_fans_followers')
)
)
结果过了一天发现定时任务并没有执行,后来把task
加上了一个option
字段,指定了任务队列,就可以了,比如
'user_task': {
'task': 'tasks.user.excute_user_task',
'schedule': timedelta(minutes=3),
'options': {'queue': 'fans_followers', 'routing_key': 'for_fans_follwers'}
},
- 部分分布式节点一直出现
Received task
,但是却不执行其中的任务的情况。这种情况下重启worker节点一般就可以恢复。但是最好查查原因。通过查看flower的失败任务信息,才发现是插入数据的时候有的异常未被处理。这一点严格说来并不是celery的bug,不过也很令人费解。所以推荐在使用celery的时候配合使用flower做监控。