1.timeloop 定时执行任务,可运用于多周期任务
#!/usr/bin/env python
# encoding: utf-8
import time
import requests
from timeloop import Timeloop
from datetime import timedelta
def run_api():
url="http://127.0.0.1:5000/indexNew"
res=requests.get(url).text
print("5s job current time:{}".format(time.ctime()))
cron=Timeloop()
#每隔2s执行一次
@cron.job(interval=timedelta(seconds=2))
def run_job_every_2S():
run_api()
if __name__=="__main__":
cron.start(block=True)
2.threading.Timer()方法,此方法只能执行一次,多次执行需要添加循环
#!/usr/bin/env python
# encoding: utf-8
import time
import requests
import threading
def run_api():
url="http://127.0.0.1:5000/indexNew"
res=requests.get(url).text
print("5s job current time:{}".format(time.ctime()))
def main():
threads=[]
for i in range(3):
t=threading.Timer(10,run_api)#Timer(interval,function,args[],kwargs={})其中interval是指定时间,function执行的方法,args方法的参数
t.start()
threads.append(t)
for thread in threads:
thread.join()
print(len(threads))
return len(threads)
if __name__=="__main__":
num=0
while True:
print("Main threading is run")
main()
num+=1
if num==4:
break
3.apscheduler 定时任务
import time
import requests
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
def run_api():
url="http://127.0.0.1:5000/indexNew"
res=requests.get(url).text
print("5s job current time:{}".format(time.ctime()))
cheduler=BlockingScheduler()
cheduler.add_job(run_api,"interval",seconds=10) #每10S执行一次
cheduler.start()
4.flask_apscheduler 定时任务
from flask import Flask,redirect,url_for,request,render_template,make_response
from flask import escape,session,abort,flash
from werkzeug.utils import secure_filename
from flask_apscheduler import APScheduler
import os
class Config(object):
'''创建配置'''
#任务列表
JOBS=[
{'id':'job1',
'func':'__main__:task1', #要执行任务的方法名
'args':("张三","12"), #入参
'trigger':'interval',#执行器,固定周期执行一次,也可以用cron与Linux定时器类似
#'hour':1,
'seconds': 60
},
{
'id':'job2',
'func':'__main__:task2',
'args':("李四","14"),
'trigger':'cron',
'hour':11,
'minute':23,
'second': 1 #注意此参数不带S
}
]
app=Flask(__name__)
app.config.from_object(Config) #为实例化的flask引入配置
def task1(name,age):
print("执行任务1")
print("姓名:%s,年龄:%s"%(name,age))
def task2(name,age):
print("执行任务2")
print("姓名:%s,年龄:%s" % (name, age))
if __name__ == '__main__':
cheduler = APScheduler()
cheduler.init_app(app)
cheduler.start()
app.run(host='127.0.0.1',debug=True,port=5000)