定时任务是一个通用场景常见的功能,之前我使用django的时候,更习惯使用celery中的定时任务,现在花时间看了看
apscheduler
感觉不错,就写了demo,并集成到项目代码中了
任务调度主要就是以下几个功能
其中添加定时任务方式,有以下三种方式
crontab
表达式定时任务了Tip:
crontab
写法可以参考这个网站 https://crontab.guru/
在FastAPI异步框架中,选择 AsyncIOScheduler
调度程序
默认使用sqlite
持久化定时任务,不至于重启就失效
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.triggers.cron import CronTrigger
Schedule = AsyncIOScheduler(
jobstores={
‘default‘: SQLAlchemyJobStore(url=‘sqlite:///jobs.sqlite‘)
}
)
Schedule.start()
这些就需要自己查看文档和issues
搜索了
定时任务有很多种方案,比如可以使用 arq
, fastapi-utils
, celery
说到
arq
我之前,使用过rq
并且学习的时候稍微翻译了一下文档 rq v1.0 https://codercharm.github.io/Python-rq-doc-cn/#/ 一晃过去一年了。
原文:https://www.cnblogs.com/CharmCode/p/14191009.html