1 安装
pip install apscheduler
2 APScheduler最基本的用法: “定时几秒后启动job” 每个job都会以thread的方式被调度。分为BlockingScheduler与BackgroundScheduler方式
BackgroundScheduler例子(不阻塞)
from apscheduler.schedulers.background import BackgroundScheduler import time def job(): print(‘job 3s‘) if __name__==‘__main__‘: sched = BackgroundScheduler(timezone=‘MST‘) sched.add_job(job, ‘interval‘, id=‘3_second_job‘, seconds=3) sched.start() while(True): print(‘main 1s‘) time.sleep(1)
通过这个输出,我们也可以发现,调用start函数后,job()并不会立即开始执行。而是等待3s后,才会被调度执行。
3 add_job来添加作业(或装饰器模式添加作业)
其中,date表示具体的一次性任务,interval表示循环任务,cron表示定时任务
只执行一次
scheduler.add_job(job2, ‘date‘, run_date=datetime.datetime(2019, 2, 25, 19, 5, 6), args=[‘text‘], id=‘job2‘)
每5秒执行一次:
scheduler.add_job(job1, ‘interval‘, seconds=5, id=‘job1‘) # 每隔5秒执行一次
每天凌晨1点30分50秒执行一次 (装饰器模式,参考https://www.cnblogs.com/Neeo/p/10435059.html#添加作业)
@sc.scheduled_job(‘cron‘, day_of_week=‘*‘, hour=1, minute=‘30‘, second=‘50‘)
例:
from apscheduler.schedulers.blocking import BlockingScheduler # 后台运行 sc = BlockingScheduler() @sc.scheduled_job(‘interval‘, seconds=2, id=‘job1‘) # 每隔2秒执行一次 def check(): print("ok...") if __name__ == ‘__main__‘: try: sc.start() except Exception as e: print(‘定时任务执行失败‘) finally: print("end")
# 每隔2分钟执行一次, */1:每隔1分钟执行一次
scheduler.add_job(job1, ‘cron‘, minute="*/2", id=‘job1‘)
# 每小时执行一次
scheduler.add_job(job1, ‘interval‘, hours=1, id=‘job1‘)
# 每小时执行一次,上下浮动120秒区间内
# scheduler.add_job(job1, ‘interval‘, hours=1, id=‘job1‘, jitter=120)
参考:https://www.jianshu.com/p/b829a920bd33
https://www.cnblogs.com/Neeo/p/10435059.html
https://www.cnblogs.com/yueerwanwan0204/p/5480870.html
原文:https://www.cnblogs.com/pu369/p/12672789.html