首页 > 其他 > 详细

Django + celery +redis使用

时间:2019-07-04 18:58:02      阅读:81      评论:0      收藏:0      [点我收藏+]

1.安装包

pip install celery

pip install django-celery

pip install pymysql

 

2.创建一个django项目

- proj/
  - proj/__init__.py
  - proj/settings.py
  - proj/urls.py
- manage.py 

3.修改__init__.py

import pymysql
pymysql.install_as_MySQLdb()

4.修改settings.py,加celery配置

# 是否启用celery任务
IS_USE_CELERY = True
# 本地开发的 celery 的消息队列(RabbitMQ)信息
# BROKER_URL_DEV = ‘amqp://guest:guest@127.0.0.1:5672/‘
BROKER_URL_DEV = redis://127.0.0.1:6379/1
# TOCHANGE 调用celery任务的文件路径, List of modules to import when celery starts.
CELERY_IMPORTS = (
    celerytest.task,
)
# ===============================================================================
# CELERY 配置
# ===============================================================================
if IS_USE_CELERY:
    try:
        import djcelery
        import sys
        INSTALLED_APPS += (
            djcelery,            # djcelery
        )
        djcelery.setup_loader()
        CELERY_ENABLE_UTC = False
        CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"
        if "celery" in sys.argv:
            DEBUG = False
        # celery 的消息队列(redis)信息
        BROKER_URL = os.environ.get(BK_BROKER_URL, BROKER_URL_DEV)
        #BROKER_URL = ‘amqp://guest:guest@127.0.0.1:5672/‘
       
    except:
        pass

5.修改settings.py,改数据库配置

# ===============================================================================
# 数据库设置, 本地开发数据库设置
# ===============================================================================
DATABASES = {
    default: {
        ENGINE: django.db.backends.mysql,  # 默认用mysql
        NAME: celerytest,                 # 数据库名 (默认与APP_ID相同)
        USER: root,                       # 你的数据库user
        PASSWORD: 123456,                  # 你的数据库password
        HOST: 127.0.0.1,               # 开发的时候,使用localhost
        PORT: 3306,                       # 默认3306
    },
}

6.修改urls.py,添加链接

from django.conf.urls import include, url
from django.contrib import admin
import test
urlpatterns = [
    url(r^admin/, include(admin.site.urls)),
    url(r^aa/$, test.test),
    url(r^add/$, test.addtest),
    url(r^del/$, test.deltest),
    url(r^tt/$, test.startandstop),
]

7.添加task.py,celery任务

# -*- coding: utf-8 -*-
import django     #测试不加如这两行启动worker要报错,而且需要加在最前
django.setup()

import time
from celery import task,shared_task

#直接执行
@task()
def sayhello():
    print(hello ...)
    time.sleep(2)
    print(world ...)

#定时任务
@shared_task(name="celerytest")
def everysay(**kwargs):
    """定时任务调用执行作业"""
    sayadd(**kwargs)

def sayadd(**kwargs):
    print(add1 ...)
    time.sleep(2)
    print(int(kwargs[x])+int(kwargs[y]))

8.添加test.py,调用任务

# -*- coding: utf-8 -*-
from django.shortcuts import render,HttpResponse
from task import *
from djcelery import models as djmodels
import json
def getcrontabtime(minute,hour,day_of_month,month_of_year,day_of_week): mycrontab = {} mycrontab[minute] = minute mycrontab[hour] = hour mycrontab[day_of_month] = day_of_month mycrontab[month_of_year] = month_of_year mycrontab[day_of_week] = day_of_week return mycrontab def test(request): sayhello.delay()#如果有参数sayhello.delay(x,y,z) return HttpResponse("hello world") def addtest(request): ‘‘‘添加定时任务‘‘‘ task, created = djmodels.PeriodicTask.objects.get_or_create(name=123456, task=celerytest) #task对应的是task文件里的函数方法 mycrontab = getcrontabtime(5,*,*,*,*) crontab = djmodels.CrontabSchedule.objects.filter(**mycrontab).first() if crontab is None: crontab = djmodels.CrontabSchedule.objects.create(**mycrontab) task.crontab = crontab task.enabled = True param_dic = {} param_dic[x] = 5 param_dic[y] = 2 # print param_dic # if type(param_dic) == list: # task.args = param_dic if type(param_dic) == dict: task.kwargs = json.dumps(param_dic) task.save() return HttpResponse("添加任务") def deltest(request): ‘‘‘删除定时任务‘‘‘ try: djmodels.PeriodicTask.objects.filter(name=123456).delete() except: return HttpResponse("删除定时任务失败") return HttpResponse("删除定时任务成功") def startandstop(request): ‘‘‘启停定时作业‘‘‘ status = request.GET.get(status,0) task, created = djmodels.PeriodicTask.objects.get_or_create(name=123456, task=celerytest) if status == 0: task.enabled = False task.save() return HttpResponse("停用成功") elif status == 1: task.enabled = True task.save() return HttpResponse("启用成功") else: return HttpResponse("系统错误")

9.项目架构

技术分享图片

10.运行前准备

#配置数据库迁移,生成celery需要的数据表
python manage.py migrate
#启动Redis
sudo redis-server /etc/redis/redis.conf
#启动worker
python manage.py celery worker --loglevel=info
#启动心跳
python manage.py celery beat --max-interval=10 --loglevel=INFO
--max-interval=10 :每十秒侦测一次任务 --loglevel=INFO:日志等级是INFO

11.启动测试

#启动web
python manage.py runserver 0.0.0.0:9999
#直接执行
http://127.0.0.1:9999/aa/
#添加定时服务
http://127.0.0.1:9999/add/
#删除定时服务
http://127.0.0.1:9999/add/
#暂停执行定时服务
http://127.0.0.1:9999/tt/
http://127.0.0.1:9999/tt/?status=0
#启动执行定时服务
http://127.0.0.1:9999/tt/?status=1

 

Django + celery +redis使用

原文:https://www.cnblogs.com/CGCong/p/10191182.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!