最近在使用APScheduler做作业调度的时候碰到一个问题,就是有时候我的作业用一个cron trigger不能完全包含所有的触发条件,同时这个作业不能被创建多个实例。
比如期望作业在下面时间运行
此时我们可能就不能仅仅通过一个cron来描述我所有的触发条件,对于这种问题,我期望能对一个作业指定多个cron trigger,当任何一个trigger满足时就触发作业允许。
看了一下APScheduler,它可以支持用户自定trigger,实现起来也比较简单,就是要继承apscheduler.triggers.base.BaseTrigger类,并实现get_next_fire_time(previous_fire_time, now)方法。
好了,说干就干,先实现我的MultiCronTrigger类
下面是MultiCronTrigger的代码
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from apscheduler.triggers.base import BaseTrigger
class MultiCronTrigger(BaseTrigger):
triggers = []
def __init__(self, triggers):
self.triggers = triggers
def get_next_fire_time(self, previous_fire_time, now):
min_next_fire_time = None
for trigger in self.triggers:
next_fire_time = trigger.get_next_fire_time(previous_fire_time, now)
if min_next_fire_time is None:
min_next_fire_time = next_fire_time
if next_fire_time < min_next_fire_time:
min_next_fire_time = next_fire_time
return min_next_fire_time
整个测试类试试,测试类定义了两个CronTrigger对象,第一个trigger每三秒运行一次作业,第二个trigger每五秒运行一次作业。
import time
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
def myjob():
print(‘job run at %s‘ % datetime.now())
if __name__ == ‘__main__‘:
scheduler = BackgroundScheduler()
trigger1 = CronTrigger(hour=‘*‘, minute=‘*‘, second=‘*/3‘)
trigger2 = CronTrigger(hour=‘*‘, minute=‘*‘, second=‘*/5‘)
job = scheduler.add_job(myjob, MultiCronTrigger([trigger1, trigger2]))
scheduler.start()
try:
while True:
time.sleep(5)
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()
运行一下测试,可以看到job在每个3的倍数和5的倍数秒上都会执行作业。
转载请以链接形式标明本文地址
本文地址:http://blog.csdn.net/kongxx/article/details/50508398
APScheduler Multi CronTrigger 实现
原文:http://blog.csdn.net/kongxx/article/details/50508398