首页 > 编程语言 > 详细

python2--djnago xadmin 定时任务,手把手

时间:2020-08-28 23:24:48      阅读:86      评论:0      收藏:0      [点我收藏+]

settings.py

INSTALLED_APPS = (
    djcelery,

)


import djcelery

djcelery.setup_loader()
BROKER_URL = redis://localhost:6379
CELERYBEAT_SCHEDULER = djcelery.schedulers.DatabaseScheduler
CELERY_RESULT_BACKEND = djcelery.backends.database:DatabaseBackend
CELERY_ACCEPT_CONTENT = [application/json]
CELERY_TASK_SERIALIZER = json
CELERY_RESULT_SERIALIZER = json
CELERY_TIMEZONE = Asia/Shanghai
CELERY_LOG_FILE = os.path.join(os.path.join(os.path.join(BASE_DIR, logs), celery), celery.log)
CELERYBEAT_LOG_FILE = os.path.join(os.path.join(os.path.join(BASE_DIR, logs), celery), beat.log)
CELERY_TASK_RESULT_EXPIRES = 10

项目同名目录下的__init__.py

# -*- coding:utf-8 -*-
from __future__ import absolute_import,unicode_literals

from .celery import app as celery_app

__all__ = [celery_app]

import pymysql
pymysql.install_as_MySQLdb()

项目同名目录下新建celery.py

# -*- coding:utf-8 -*-

from __future__ import absolute_import,unicode_literals

import os
from celery import Celery, platforms
from django.conf import settings

# set the default Django settings module for the ‘celery‘ program.
os.environ.setdefault(DJANGO_SETTINGS_MODULE, WebSite.settings)

# WebSite主应用名
app = Celery(WebSite)
platforms.C_FORCE_ROOT = True

app.config_from_object(django.conf:settings)
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

@app.task(bind=True)
def debug_task(self):
    print(Request: {0!r}.format(self.request))

app.conf.timezone = UTC

# from datetime import timedelta
# app.conf.update(
#     CELERYBEAT_SCHEDULE = {
#         ‘task_add‘: {
#             ‘task‘: ‘apps.tasks.start_running‘,
#             ‘schedule‘: timedelta(seconds=10)
#         },
#     },
# )

执行定时任务的app下的adminx.py

# -*- coding:utf-8 -*-
from __future__ import absolute_import, unicode_literals
from djcelery.models import (
  TaskState, WorkerState,
  PeriodicTask, IntervalSchedule, CrontabSchedule,
)
import xadmin

xadmin.site.register(IntervalSchedule) # 存储循环任务设置的时间
xadmin.site.register(CrontabSchedule) # 存储定时任务设置的时间
xadmin.site.register(PeriodicTask) # 存储任务
xadmin.site.register(TaskState) # 存储任务执行状态
xadmin.site.register(WorkerState) # 存储执行任务的worker

执行定时任务的app下新建tasks.py

# -*- coding:utf-8 -*-
from __future__ import absolute_import
from WebSite.celery import app
from celery import task, shared_task


@app.task
def start_running(info):
    print(info)
    print(--->>开始执行任务<<---)
    print(比如发送短信或邮件)
    print(>---任务结束---<)
@shared_task
def mul(x, y):
    print(乘法,x*y)
    return x * y

 

版本要求,一定安要求来,否则莫名其妙的各种问题百度都查不到
pip2 install celery==3.1.23
pip2 install django==1.9
pip2 install django-celery==3.1.17
pip2 install flower==0.9.2
pip2 install redis==2.10.6

 

python2--djnago xadmin 定时任务,手把手

原文:https://www.cnblogs.com/lutt/p/13579898.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!