APScheduler(Advanced Python Scheduler)是一个轻量级的Python定时任务调度框架(Python库)。
APScheduler有三个内置的调度系统,其中包括:
APScheduler可以任意混合和匹配调度系统和作业存储的后端,其中支持后端存储作业包括:
APScheduler内继承了几个常见的Python框架:
使用pip安装:
pip install apscheduler
pip install apscheduler==3.6.3
如果超时或者出现别的情况,可以选择:
# 方法1:使用豆瓣源下载 pip install -i https://pypi.doubanio.com/simple/ apscheduler # 方法2:使用清华源下载 pip install -i https://pypi.tuna.tsinghua.edu.cn/simple apscheduler
要是再不行,点击该链接或者pypi官网下载了。下载并解压缩,进入跟setup.py文件同级的目录,打开cmd,使用命令进行下载:
python setup.py install
APScheduler共有4种组件,分别是:
如果你的应用在每次启动的时候都会重新创建作业,那么使用默认的作业存储器(MemoryJobStore)即可,但是如果你需要在调度器重启或者应用程序奔溃的情况下任然保留作业,你应该根据你的应用环境来选择具体的作业存储器。例如:使用Mongo或者SQLAlchemy JobStore (用于支持大多数RDBMS)
对执行器的选择取决于你使用上面哪些框架,大多数情况下,使用默认的ThreadPoolExecutor已经能够满足需求。如果你的应用涉及到CPU密集型操作,你可以考虑使用ProcessPoolExecutor来使用更多的CPU核心。你也可以同时使用两者,将ProcessPoolExecutor作为第二执行器。
当你调度作业的时候,你需要为这个作业选择一个触发器,用来描述这个作业何时被触发,APScheduler有三种内置的触发器类型:
当你需要调度作业的时候,你需要为这个作业选择一个触发器,用来描述该作业将在何时被触发,APScheduler有3中内置的触发器类型:
有两种方式可以添加一个新的作业:
import datetime from apscheduler.schedulers.blocking import BlockingScheduler def job2(text): print(‘job2‘, datetime.datetime.now(), text) scheduler = BlockingScheduler() scheduler.add_job(job2, ‘date‘, run_date=datetime.datetime(2019, 2, 25, 19, 5, 6), args=[‘text‘], id=‘job2‘) scheduler.start() 上例中,只在2010-2-25 19:05:06执行一次,args传递一个text参数。
下面来个简单的例子,作业每个5秒执行一次:
import datetime from apscheduler.schedulers.blocking import BlockingScheduler def job1(): print(‘job1‘, datetime.datetime.now()) scheduler = BlockingScheduler() scheduler.add_job(job1, ‘interval‘, seconds=5, id=‘job1‘) # 每隔5秒执行一次 scheduler.start()
每天凌晨1点30分50秒执行一次
from apscheduler.schedulers.blocking import BlockingScheduler # 后台运行 sc = BlockingScheduler() f = open(‘t1.text‘, ‘a‘, encoding=‘utf8‘) @sc.scheduled_job(‘cron‘, day_of_week=‘*‘, hour=1, minute=‘30‘, second=‘50‘) def check_db(): print(111111111111) if __name__ == ‘__main__‘: try: sc.start() f.write(‘定时任务成功执行‘) except Exception as e: sc.shutdown() f.write(‘定时任务执行失败‘) finally: f.close()
每几分钟执行一次:
import datetime from apscheduler.schedulers.blocking import BlockingScheduler def job1(): print(‘job1‘, datetime.datetime.now()) scheduler = BlockingScheduler() # 每隔2分钟执行一次, */1:每隔1分钟执行一次 scheduler.add_job(job1, ‘cron‘, minute="*/2", id=‘job1‘) scheduler.start()
每小时执行一次:
import datetime from apscheduler.schedulers.blocking import BlockingScheduler def job1(): print(‘job1‘, datetime.datetime.now()) scheduler = BlockingScheduler() # 每小时执行一次 scheduler.add_job(job1, ‘interval‘, hours=1, id=‘job1‘) # 每小时执行一次,上下浮动120秒区间内 # scheduler.add_job(job1, ‘interval‘, hours=1, id=‘job1‘, jitter=120) scheduler.start()
原文:https://www.cnblogs.com/Mr-shen/p/12664029.html