首页 > 其他 > 详细

Django 使用django-celery-beat实现动态添加周期性任务

时间:2020-09-28 21:42:08      阅读:492      评论:0      收藏:0      [点我收藏+]

 

前期准备

1.beat插件安装

pip3 install django-celery-beat

2.注册APP

INSTALLED_APPS = [
    ....   
    ‘django_celery_beat‘,
]

3.数据库变更

python3 manage.py migrate django_celery_beat

配置工作

目录结构请参考:https://www.cnblogs.com/-wenli/p/13723910.html

1.配置celerypro.py

from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings
from django.utils import timezone

# set the default Django settings module for the ‘celery‘ program.
# 为celery设置环境变量
os.environ.setdefault(DJANGO_SETTINGS_MODULE, voice_quality_assurance_configure.settings)
# 创建celery app
app = Celery(voice_quality_assurance_configure)
# Using a string here means the worker will not have to
# pickle the object when using Windows.
# 从单独的配置模块中加载配置
app.config_from_object(voice_quality_assurance_configure.celeryconfig)
# 设置app自动加载任务
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
# 解决时区问题,定时任务启动就循环输出
app.now = timezone.now

2.配置celeryconfig.py

from __future__ import absolute_import
from kombu import Queue
from django.conf import settings

# 设置代理人broker
CELERY_BROKER_URL = redis://127.0.0.1:6379/2
# 指定 Backend
CELERY_RESULT_BACKEND = redis://127.0.0.1:6379/1
# 指定时区,默认是 UTC
CELERY_TIMEZONE=Asia/Shanghai
# celery 序列化与反序列化配置
CELERY_TASK_SERIALIZER = pickle
CELERY_RESULT_SERIALIZER = pickle
CELERY_ACCEPT_CONTENT = [pickle, json]
CELERY_IGNORE_RESULT = True
# celery 的启动工作数量设置
CELERY_WORKER_CONCURRENCY = 10
# 任务预取功能,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。
CELERYD_PREFETCH_MULTIPLIER = 20
# 有些情况下可以防止死锁
CELERYD_FORCE_EXECV = True
# celery 的 worker 执行多少个任务后进行重启操作
CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
# 禁用所有速度限制,如果网络资源有限,不建议开足马力。
CELERY_DISABLE_RATE_LIMITS = True

# celery beat配置(周期性任务设置)
CELERY_ENABLE_UTC = False
CELERY_TIMEZONE = settings.TIME_ZONE
DJANGO_CELERY_BEAT_TZ_AWARE = False
CELERY_BEAT_SCHEDULER = django_celery_beat.schedulers:DatabaseScheduler

3.分别启动woker和beta

 项目根目录终端执行(voice_quality_assurance_configure为项目名称,简单来说,和manage.py文件同级)

celery -A voice_quality_assurance_configure beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler #启动beta 调度器使用数据库
celery worker -A voice_quality_assurance_configure --loglevel=info -n worker1 #启动celery worker

4.创建周期性任务

from datetime import datetime, timedelta
import json
import os,django

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "voice_quality_assurance_configure.settings")# project_name 项目名称
django.setup()
from django_celery_beat.models import PeriodicTask, IntervalSchedule
schedule, created = IntervalSchedule.objects.get_or_create(every=10,period=IntervalSchedule.SECONDS,)

# 带参数的创建方法,如下:
PeriodicTask.objects.create(
     interval=schedule,         # 上面创建10秒的间隔 interval 对象
     name=test_task,          # 设置任务的name值
     task=mission.tasks.my_task,  # 指定需要周期性执行的任务
     args=json.dumps([10, 2, 76]),
    expires=datetime.utcnow() + timedelta(seconds=30)
)

 

 

详解创建周期性任务的方法

创建基于interval的周期性任务

第一步创建间隔对象

schedule, created = IntervalSchedule.objects.get_or_create(
    every=10,
    period=IntervalSchedule.SECONDS,
)
IntervalSchedule.DAYS 固定间隔天数
IntervalSchedule.HOURS 固定间隔小时数
IntervalSchedule.MINUTES 固定间隔分钟数
IntervalSchedule.SECONDS 固定间隔秒数
IntervalSchedule.MICROSECONDS 固定间隔微秒

第二步创建任务

无参数的创建方法:

PeriodicTask.objects.create(
     interval=schedule,                  # we created this above.
     name=test_task,          # simply describes this periodic task.
     task=app名.tasks.任务函数名,  # name of task.
)

有参数的创建方法:

PeriodicTask.objects.create(
     interval=schedule,                  # we created this above.
     name=test‘_task,          # simply describes this periodic task.
     task=app名.tasks.任务函数名, # name of task. 
   args=json.dumps([arg1, arg2]), 
     kwargs=json.dumps({ be_careful: True, }), 
     expires=datetime.utcnow() + timedelta(seconds=30) )
class MonitorDeviceTask(object):
    """
    设备创建,增加周期性任务
    """

    def __init__(self, device_obj):
        self.device_obj = device_obj
        self.periodic_task = PeriodicTask.objects.create(
            interval=schedule,
            name=test_task,
            task=mission.tasks.my_task,
            args=json.dumps([self.device_obj.ip])
        )

    def starttask(self):
        """
        启动任务
        """
        self.periodic_task.enabled = True
        self.periodic_task.save()

    def stoptask(self):
        """
        停止任务
        """
        self.periodic_task.enabled = False
        self.periodic_task.save()

    def deltask(self):
        """
        删除任务
        """
        self.periodic_task.delete()    
        self.periodic_task.save()

创建基于 crontab 的周期性任务

from django_celery_beat.models import CrontabSchedule, PeriodicTask
schedule, _ = CrontabSchedule.objects.get_or_create( minute=30, hour=*, day_of_week=*, day_of_month=*, month_of_year=*, timezone=pytz.timezone(Canada/Pacific) )

 

Django 使用django-celery-beat实现动态添加周期性任务

原文:https://www.cnblogs.com/-wenli/p/13746509.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!