首页 > 其他 > 详细

celery异步消息队列的使用

时间:2020-05-25 00:17:10      阅读:59      评论:0      收藏:0      [点我收藏+]

1、准备工作

1.1 流程图

技术分享图片

 

 

 2、环境安装

  2.1、在Ubuntu中需要安装redis

技术分享图片
安装redis
$sudo apt-get update
$sudo apt-get install redis-server
启动redis
$redis-server
连接redis
$redis-cli 
$redis-cli -h ip -a 6379 
安装Python操作redis的包
pip install redis 
重启redis
sudo service reids restart 
安装redis

redis默认绑定的ip为127.0.0.1其他电脑无法访问Ubuntu的redis

技术分享图片

 

 

 重启redis服务 service redis restart

查看绑定端口

技术分享图片

 

 

 在wind上telnet ip  6379 成功说明成功

2.2、安装celery

pip install celery

2、开始使用celery

2、1基本应用

在/home/zbwu103/celery 文件中创建一个tasks.py的任务文件

技术分享图片
#task.py 

from celery import Celery
 
app = Celery(tasks,
             broker=redis://192.168.1.111,
             backend=redis://192.168.1.111
             #redis://密码@ip
)
 
@app.task
def add(x,y):
    print("running...",x,y)
    return x+y        
View Code

在home/zbwu103/celery的目录启动监听任务

#打印日志的模式运行
celery -A tasks worker --loglevel=info

在开一个终端,到/home/zbwu103/celery用Python进入命令行运行

from tasks import add 
t = add.delay(4,5)

#t.result.ready() 查看任务是否完成,完成返回True,未完成返回False
#t.get()  返回完成之后的结果
#t.task_id  返回任务的唯一ID号,可以通过ID查询到任务

上面任务都是在终端上运行,如果终端关闭tasks也会终止。

所以需要任务在后台运行

技术分享图片

 

 

 

celery multi stop w1 停止 w1

2.2 、在项目中如何使用celery

技术分享图片

 

 

 技术分享图片

技术分享图片
from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery(my_proj,
             broker=redis://192.168.1.111,
             backend=redis://192.168.1.111,
             include=[myp_roj.tasks])

app.conf.update(
    result_expires=3600,
)
if __name__ == __main__:
    app.start()
celery
技术分享图片
from __future__ import absolute_import, unicode_literals
import subprocess
from .celery import app

@app.task
def add(x,y):
    return x+y

@app.task
def run_cmd(cmd):
    obj = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
    return obj.stdout.read().decode(utf-8)
tasks.py

在my_proj同级目录启动

技术分享图片

 

 

 查看任务启动情况

ps -ef |grep celery

2.3 、celery 定时任务

celery使用beat来执行celert beat 来实现定时任务

worker定时任务

技术分享图片
from celery import Celery
from celery.schedules import crontab

app = Celery(task,
             broker=redis://192.168.1.111,
             backend=redis://192.168.1.111)


@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test(‘hello‘) every 10 seconds.
    sender.add_periodic_task(10.0, test.s(hello), name=add every 10)

    # Calls test(‘world‘) every 30 seconds
    sender.add_periodic_task(30.0, test.s(world), expires=10)

    # Executes every Monday morning at 7:30 a.m.
    sender.add_periodic_task(
        crontab(hour=21, minute=26, day_of_week=Sum),
        test.s(Happy Mondays!),
    )


@app.task
def test(arg):
    print(runing test.....)
    print(arg)
periodic_task.py

启动定时任务

celery -A periodic_task worker

另外开一个任务调度区不断的检测你的任务计划

celery -A periodic_task beat

2.4、celery和django配置一起使用

在setting同级的目录中新建一个celery.py的文件配置celery基本的配置

技术分享图片

技术分享图片
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the ‘celery‘ program.
os.environ.setdefault(DJANGO_SETTINGS_MODULE, CeleryTest.settings)

app = Celery(CeleryTest)

# Using a string here means the worker don‘t have to serialize
# the configuration object to child processes.
# - namespace=‘CELERY‘ means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object(django.conf:settings, namespace=CELERY)

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print(Request: {0!r}.format(self.request))
celery.py

在setting.py同级的目录配置__init__.py

技术分享图片
from __future__ import absolute_import, unicode_literals
 
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
 
__all__ = [celery_app]
__init__.py

在APP的目录里面新建一个tasks.py的任务来填写任务

技术分享图片
#app01/tasks.py
# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task
import time

@shared_task
def add(x, y):
    print("running task add,我是windows ")
    time.sleep(1)
    return x + y


@shared_task
def mul(x, y):
    return x * y


@shared_task
def xsum(numbers):
    return sum(numbers)
tasks.py

从views中调用任务

技术分享图片
/app01/view.py
from django.shortcuts import render,HttpResponse
from  app01 import tasks
from celery.result import AsyncResult

def index(request):

    res = tasks.add.delay(5,999)

    print("res:",res)
    print(res.status)
    # import pdb
    # pdb.set_trace()
    return HttpResponse(res.task_id)


def task_res(request):
    #通过ID获取结果
    result = AsyncResult(id="be4933c0-ed9b-4a04-ade8-79f4c57cfc74")

    #return HttpResponse(result.get())
    return HttpResponse(result.status)
views.py

 

celery异步消息队列的使用

原文:https://www.cnblogs.com/wuzhibinsuib/p/12953645.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!