首页 > 编程语言 > 详细

斑马斑马-09-白云之上-python任务调度之celery

时间:2020-04-30 13:53:09      阅读:97      评论:0      收藏:0      [点我收藏+]

一、celery简介 

1:celery是什么

  Celery是一个python开发的异步分布式任务调度模块。

2:celery是使用场景

  异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

  定时任务:定时执行某件事情,比如每天数据统计

3:celery特点 

  简单:一单熟悉了celery的工作流程后,配置和使用还是比较简单的。
  高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务。
  快速:一个单进程的celery每分钟可处理上百万个任务。
  灵活: 几乎celery的各个组件都可以被扩展及自定制。

4:工作原理

   技术分享图片

 

 

Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

消息中间件:Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

任务执行单元:Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储:Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

二、Hello,Celery 

1:版本介绍

  在Django项目的环境下配合celery和redis使用异步调用

Django==3.0.4
celery==3.1.26.post2
django-celery==3.3.1
django-redis==4.7.0
redis==2.10.6  

安装相应的软件包

  pip install redis==2.10.6 -i https://pypi.tuna.tsinghua.edu.cn/simple
  pip install django-redis==4.7.0 -i https://pypi.tuna.tsinghua.edu.cn/simple

2:场景介绍(异步)

  比如,想泡壶茶喝。当时的情况是:开水没有;水壶要洗,茶壶,茶杯要洗;火已生了,茶叶也有了。怎么办?

  办法甲:洗好水壶,灌上凉水,放在火上;在等待水开的时间里,洗茶壶、洗茶杯、拿茶叶;等水开了,泡茶喝。

  办法乙:先做好一些准备工作,洗水壶,灌水烧水;坐待水开了,洗茶壶茶杯,拿茶叶;一切就绪,泡茶喝。

   技术分享图片

 

 

   2.1 代码架构

  技术分享图片

 

 

  2.2 首先要保证redis是可以正常使用的,参考斑马斑马-05-白云之上-Redis初识   

技术分享图片
from redis import StrictRedis
from django_redis import get_redis_connection

sr = StrictRedis("39.99.213.203", port=6379, db=1)


def set_redis():
    res = sr.set("name", "zhangsan")
    print(res)


def get_redis():
    res = sr.get("name")
    print(res)


if __name__ == __main__:
    # 设置值
    set_redis()
    # 获取值
    get_redis()
redis测试

  技术分享图片

  2.3 设置url和views还有茶方法,至此计划B可以运行

技术分享图片
from django.contrib import admin
from django.urls import path
from course.views import planA,planB

urlpatterns = [
    path(admin/, admin.site.urls),
    path(planA/,planA,name=planA),
    path(planB/, planB, name=planB)
]
url.py
技术分享图片
from django.shortcuts import render
from django.http import JsonResponse
from course.tasks import CourseTask
from datetime import datetime
import time
from course import# Create your views here.
def planA(request):

    start_time = time.time()
    start_ = time.strftime("%H:%M:%S", time.localtime())
    print(A计划开始, start_time)
    pc = 茶.泡茶()
    pc.洗水壶()  # 1
    CourseTask.apply_async((洗茶壶,洗茶杯, 拿茶叶), queue=work_queue)
    pc.烧水()  # 15
    end_time = time.time()
    end_ = time.strftime("%H:%M:%S", time.localtime())
    spendtime = str((end_time - start_time))
    print(B计划结束, end_time)
    dict_content = {}
    dict_content.setdefault("beginTime", start_)
    dict_content.setdefault("endTime", end_)
    dict_content.setdefault("spendTime", spendtime)
    return JsonResponse(dict_content)
def planB(request):
    start_time = time.time()
    start_=time.strftime("%H:%M:%S", time.localtime())
    print(B计划开始, start_time)
    pc = 茶.泡茶()
    pc.洗水壶()    #1
    pc.烧水()      #15
    pc.洗茶壶()    #1
    pc.洗茶杯()    #1
    pc.拿茶叶()    #2
    end_time = time.time()
    end_=time.strftime("%H:%M:%S", time.localtime())
    spendtime = str((end_time - start_time))
    print(B计划结束, end_time)
    dict_content={}
    dict_content.setdefault("beginTime",start_)
    dict_content.setdefault("endTime",end_)
    dict_content.setdefault("spendTime",spendtime)
    return JsonResponse(dict_content)
view.py
技术分享图片
import time


class 泡茶():
    def 洗水壶(self):
        time.sleep(1)

    def 烧水(self):
        time.sleep(15)

    def 洗茶壶(self):
        print("洗茶壶")
        time.sleep(1)

    def 洗茶杯(self):
        print("洗茶杯")
        time.sleep(1)

    def 拿茶叶(self):
        print("拿茶叶")
        time.sleep(2)

  2.4 创建任务和配置文件  

技术分享图片
# 创建任务
import time
from celery.task import Task
from course importclass CourseTask(Task):
    name = course-task
    def run(self, *args, **kwargs):
        pc = 茶.泡茶()
        actions = list(args)
        print(actions)
        for action in actions:
            if action == "洗茶壶":
                pc.洗茶壶()
                continue
            if action == "洗茶杯":
                pc.洗茶杯()
                continue
            if action == "拿茶叶":
                pc.拿茶叶()
                continue
tasks
INSTALLED_APPS = [
‘django.contrib.admin‘,
‘django.contrib.auth‘,
‘django.contrib.contenttypes‘,
‘django.contrib.sessions‘,
‘django.contrib.messages‘,
‘django.contrib.staticfiles‘,
# ‘course.apps.App01Config‘,
‘course‘,
‘djcelery‘,
]
from .celeryconfig import *
#定义中间件:使用的对象
BROKER_BACKEND=‘redis‘
#定义中间件:数据库地址
BROKER_URL=‘redis://39.99.213.203:6379/1‘
#定义中间件:结果存储地址
CELERY_RESULT_BACKEND=‘redis://39.99.213.203:6379/2‘
技术分享图片
"""
Django settings for celeryDemo project.

Generated by ‘django-admin startproject‘ using Django 3.0.4.

For more information on this file, see
https://docs.djangoproject.com/en/3.0/topics/settings/

For the full list of settings and their values, see
https://docs.djangoproject.com/en/3.0/ref/settings/
"""

import os

# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))


# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/3.0/howto/deployment/checklist/

# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = ^ry*-g$fr)j1vz-^w+zw2wz@7$^&h_3)f3_r_swyenqb+pfum_

# SECURITY WARNING: don‘t run with debug turned on in production!
DEBUG = True

ALLOWED_HOSTS = []


# Application definition

INSTALLED_APPS = [
    django.contrib.admin,
    django.contrib.auth,
    django.contrib.contenttypes,
    django.contrib.sessions,
    django.contrib.messages,
    django.contrib.staticfiles,
    # ‘course.apps.App01Config‘,
    course,
    djcelery,
]
from .celeryconfig import *
#定义中间件:使用的对象
BROKER_BACKEND=redis
#定义中间件:数据库地址
BROKER_URL=redis://39.99.213.203:6379/1
#定义中间件:结果存储地址
CELERY_RESULT_BACKEND=redis://39.99.213.203:6379/2
MIDDLEWARE = [
    django.middleware.security.SecurityMiddleware,
    django.contrib.sessions.middleware.SessionMiddleware,
    django.middleware.common.CommonMiddleware,
    django.middleware.csrf.CsrfViewMiddleware,
    django.contrib.auth.middleware.AuthenticationMiddleware,
    django.contrib.messages.middleware.MessageMiddleware,
    django.middleware.clickjacking.XFrameOptionsMiddleware,
]

ROOT_URLCONF = celeryDemo.urls

TEMPLATES = [
    {
        BACKEND: django.template.backends.django.DjangoTemplates,
        DIRS: [os.path.join(BASE_DIR, templates)]
        ,
        APP_DIRS: True,
        OPTIONS: {
            context_processors: [
                django.template.context_processors.debug,
                django.template.context_processors.request,
                django.contrib.auth.context_processors.auth,
                django.contrib.messages.context_processors.messages,
            ],
        },
    },
]

WSGI_APPLICATION = celeryDemo.wsgi.application


# Database
# https://docs.djangoproject.com/en/3.0/ref/settings/#databases

DATABASES = {
    default: {
        ENGINE: django.db.backends.sqlite3,
        NAME: os.path.join(BASE_DIR, db.sqlite3),
    }
}


# Password validation
# https://docs.djangoproject.com/en/3.0/ref/settings/#auth-password-validators

AUTH_PASSWORD_VALIDATORS = [
    {
        NAME: django.contrib.auth.password_validation.UserAttributeSimilarityValidator,
    },
    {
        NAME: django.contrib.auth.password_validation.MinimumLengthValidator,
    },
    {
        NAME: django.contrib.auth.password_validation.CommonPasswordValidator,
    },
    {
        NAME: django.contrib.auth.password_validation.NumericPasswordValidator,
    },
]


# Internationalization
# https://docs.djangoproject.com/en/3.0/topics/i18n/

LANGUAGE_CODE = en-us

TIME_ZONE = UTC

USE_I18N = True

USE_L10N = True

USE_TZ = True


# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/3.0/howto/static-files/

STATIC_URL = /static/
settings.py
技术分享图片
import djcelery
from datetime import timedelta
djcelery.setup_loader()
CELERY_QUEUES = {
    beat_tasks: {
        exchange: beat_tasks,
        exchange_type: direct,
        binding_key: beat_tasks
    },
    work_queue: {
        exchange: work_queue,
        exchange_type: direct,
        binding_key: work_queue
    }
}
CELERY_DEFAULT_QUEUE = work_queue


CELERY_IMPORTS=(
    course.tasks
)


#  有些情况下可以防止死锁
CELERYD_FORCE_EXECV = True

#  设置并发的worker数量
CELERYD_CONCURRENCY = 4

# 允许重试
CELERY_ACKS_LATE = True

#  每个worker最多执行100个任务被销毁,可以防止内存泄漏
CELERYD_MAX_TASKS_PER_CHILD = 100

#  单个任务的最大运行时间,超过就杀死
CELERYD_TASK_TIME_LEMIT = 12 * 30

#  定时任务
CELERYBEAT_SCHEDULE = {
    task1: {
        task: course-task,
        schedule: timedelta(seconds=5),  # 每5秒执行一次
        options: {
            queue: beat_tasks  # 当前定时任务是跑在beat_tasks队列上的
        }
    }
}
celeryconfig.py

3:测试  

python manage.py runserver

python manage.py celery worker -l info (工人)启动worker节点来处理任务

python manage.py celery beat -l info (领导)启动定时任务,当到时间时,把任务放入broker中,broker检测到了,让worker去工作。

python manage.py celery flower 错误日志监控,启动时必须先启动worker
技术分享图片

三、Hello,Celery 2

 

1:文件读写的demo

  url中添加:path(‘add_test/‘, add_test, name=‘add_test‘),

技术分享图片
def add_test(request):
    from course.tasks import add
    from django.http import HttpResponse

    ctime = datetime.now()
    # 默认用utc时间
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    from datetime import timedelta
    time_delay = timedelta(seconds=5)
    task_time = utc_ctime + time_delay
    result = add.apply_async(args=[4, 3], eta=task_time)
    print(result.id)#如果定了CELERY_RESULT_BACKEND 此时会把结果插入到redis数据库中
    return HttpResponse(ok)
原来的view页面定义一个add_test方法
技术分享图片
@task
def add(a, b):
    with open(a.text, a, encoding=utf-8) as f:
        f.write(a)
    print(a + b)
原來的tasks中添加自定義函數

  以下是完整内容  

技术分享图片
"""celeryDemo URL Configuration

The `urlpatterns` list routes URLs to views. For more information please see:
    https://docs.djangoproject.com/en/3.0/topics/http/urls/
Examples:
Function views
    1. Add an import:  from my_app import views
    2. Add a URL to urlpatterns:  path(‘‘, views.home, name=‘home‘)
Class-based views
    1. Add an import:  from other_app.views import Home
    2. Add a URL to urlpatterns:  path(‘‘, Home.as_view(), name=‘home‘)
Including another URLconf
    1. Import the include() function: from django.urls import include, path
    2. Add a URL to urlpatterns:  path(‘blog/‘, include(‘blog.urls‘))
"""
from django.contrib import admin
from django.urls import path
from course.views import planA,planB,add_test

urlpatterns = [
    path(admin/, admin.site.urls),
    path(planA/,planA,name=planA),
    path(planB/, planB, name=planB),
    path(add_test/, add_test, name=add_test),

]
urls.py
技术分享图片
# 创建任务
import time
from celery import task
from celery.task import Task
from course importclass CourseTask(Task):
    name = course-task
    def run(self, *args, **kwargs):
        pc = 茶.泡茶()
        actions = list(args)
        print(actions)
        for action in actions:
            if action == "洗茶壶":
                pc.洗茶壶()
                continue
            if action == "洗茶杯":
                pc.洗茶杯()
                continue
            if action == "拿茶叶":
                pc.拿茶叶()
                continue

@task
def add(a, b):
    with open(a.text, a, encoding=utf-8) as f:
        f.write(a)
    print(a + b)
tasks.py
技术分享图片
from django.shortcuts import render
from django.http import JsonResponse
from course.tasks import CourseTask
from datetime import datetime
import time
from course import# Create your views here.
def planA(request):
    start_time = time.time()
    start_ = time.strftime("%H:%M:%S", time.localtime())
    print(A计划开始, start_time)
    pc = 茶.泡茶()
    pc.洗水壶()  # 1
    CourseTask.apply_async((洗茶壶, 洗茶杯, 拿茶叶), queue=work_queue)
    pc.烧水()  # 15
    end_time = time.time()
    end_ = time.strftime("%H:%M:%S", time.localtime())
    spendtime = str((end_time - start_time))
    print(B计划结束, end_time)
    dict_content = {}
    dict_content.setdefault("beginTime", start_)
    dict_content.setdefault("endTime", end_)
    dict_content.setdefault("spendTime", spendtime)
    return JsonResponse(dict_content)


def planB(request):
    start_time = time.time()
    start_ = time.strftime("%H:%M:%S", time.localtime())
    print(B计划开始, start_time)
    pc = 茶.泡茶()
    pc.洗水壶()  # 1
    pc.烧水()  # 15
    pc.洗茶壶()  # 1
    pc.洗茶杯()  # 1
    pc.拿茶叶()  # 2
    end_time = time.time()
    end_ = time.strftime("%H:%M:%S", time.localtime())
    spendtime = str((end_time - start_time))
    print(B计划结束, end_time)
    dict_content = {}
    dict_content.setdefault("beginTime", start_)
    dict_content.setdefault("endTime", end_)
    dict_content.setdefault("spendTime", spendtime)
    return JsonResponse(dict_content)


def add_test(request):
    from course.tasks import add
    from django.http import HttpResponse

    ctime = datetime.now()
    # 默认用utc时间
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    from datetime import timedelta
    time_delay = timedelta(seconds=5)
    task_time = utc_ctime + time_delay
    result = add.apply_async(args=[4, 3], eta=task_time)
    print(result.id)#如果定了CELERY_RESULT_BACKEND 此时会把结果插入到redis数据库中
    return HttpResponse(ok)
views.py

2:测试

  技术分享图片

 

 

 

五、Web监控管理服务-Flower

1:简介

Flower是Celery的一个实时监控和管理Web界面工具,目前仍在活跃的开发之中,但已经是一个很重要的可用工具了 

2:使用

  把大象装冰箱分三步,第一步:打开冰箱门,第二步:把大象放进去,第三步:关上冰箱门。

  使用Flower也分三步,第一步:安装,第二步:启动,第三步:访问。

  2.1 安装:使用pip安装Flower:

  pip3 install flower -i https://pypi.tuna.tsinghua.edu.cn/simple/ --trusted-host pypi.tuna.tsinghua.edu.cn

  2.2 运行 flower命令启动web-server:   

  python manage.py celery flower

  2.3 web页面访问:

  127.0.0.1:5555/dashboard

技术分享图片

 

六、日志打印

1:日志设置

from celery.utils.log import get_task_logger
logger=get_task_logger(__name__)

logger.debug(actions)
技术分享图片
# 创建任务
import time
from celery import task
from celery.task import Task
from course importfrom celery.utils.log import get_task_logger
logger=get_task_logger(__name__)

class CourseTask(Task):
    name = course-task
    def run(self, *args, **kwargs):
        pc = 茶.泡茶()
        actions = list(args)
        logger.debug(actions)
        for action in actions:
            if action == "洗茶壶":
                pc.洗茶壶()
                continue
            if action == "洗茶杯":
                pc.洗茶杯()
                continue
            if action == "拿茶叶":
                pc.拿茶叶()
                continue

@task
def add(a, b):
    with open(a.text, a, encoding=utf-8) as f:
        f.write(a)
    print(a + b)
tasks

2:启动

python manage.py celery worker -l debug -f  E:\Study\code\Python\celeryDemo\aaa.log

3:查看日志

技术分享图片

 

 

  

 

 

 

斑马斑马-09-白云之上-python任务调度之celery

原文:https://www.cnblogs.com/YK2012/p/12786284.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!