安装
pipenv install django-celery
主配置文件导入celery配置
# settings.py
from .celeryconfig import *
BROKER_BECKEND = 'redis'
BROKER_URL = 'redis://192.168.2.128:6379/1'
CELERY_RESULT_BACKEND = 'redis://192.168.2.128:6379/2'
celery配置文件
#celeryconfig.py
from datetime import timedelta
import djcelery
# setup_loader用于扫描所有app下tasks.py文件中的task
djcelery.setup_loader()
# 设置不同的任务队列,将普通任务和定时任务分开,redis中实际key如下
# 1) "_kombu.binding.work_queue"
# 2) "_kombu.binding.beat_tasks"
CELERY_QUEUES = {
'beat_tasks': {
'exchange': 'beat_tasks',
'exchange_type': 'direct',
'binding_key': 'beat_tasks'
},
'work_queue': {
'exchange': 'work_queue',
'exchange_type': 'direct',
'binding_key': 'work_queue'
},
}
# 设置默认队列
CELERY_DEFAULT_QUEUE = 'work_queue'
CELERY_IMPORTS = (
'course.tasks',
)
# 有些情况下可以防止死锁
CELERY_FORCE_EXECV = True
# 设置并发的worker数量
CELERY_CONCURRENCY = 4
# 允许重试
CELERY_ACKS_LATE = True
# 每个worker最多执行100个任务被销毁,可以防止内存泄漏
CELERY_MAX_TASKS_PER_CHILD = 100
# 单个任务的最大运行时间
CELERY_TASK_TIME_LIMIT = 12 * 30
# 定时任务配置
CELERYBEAT_SCHEDULE = {
'task1': {
'task': 'course-task',
'schedule': timedelta(seconds=5),
'options': {
'queue': 'beat_tasks'
}
}
}
app中创建tasks.py并创建task
# (app)course->tasks.py
import time
from celery.task import Task
class CourseTask(Task):
name = 'course-task'
def run(self, *args, **kwargs):
print('start course task')
time.sleep(4)
print('args={}, kwargs={}'.format(args, kwargs))
print('end course task')
app视图函数调用task
# (app)course->views.py
from django.http import JsonResponse
from course.tasks import CourseTask
def do(request):
# 执行异步任务
print('start do request')
# CourseTask.delay()
CourseTask.apply_async(args=('hello',), queue='work_queue')
print('end do request')
return JsonResponse({'result': 'ok'})
启动命令
python manage.py celery worker -l INFO
python manage.py celery beat -l INFO
原文:https://www.cnblogs.com/Peter2014/p/11629080.html