首页 > 其他 > 详细

celery

时间:2019-08-30 21:20:09      阅读:87      评论:0      收藏:0      [点我收藏+]

Celery的安装配置

  • pip install celery
  • 消息中间件:RabbitMQ/Redis
  • app=Celery(‘任务名‘, broker=‘xxx‘, backend=‘xxx‘)

Celery执行异步任务

基本使用

创建py文件:celery_app_task.py

import celery
import time
broker = redis://127.0.0.1:6379/0
backend = redis://127.0.0.1:6379/1
app = celery.Celery(test,backend=backend,broker=broker)

@app.task
def add(x, y):
    time.sleep(1)
    return x + y

创建py文件:add_task.py,添加任务

from celery_app_task import add
result = add.delay(4, 5)
print(result.id)

创建py文件:run.py,执行任务,或者使用命令执行:celery worker -A celery_app_task -l info

注:windows下:celery worker -A celery_app_task -l info -P eventlet

from celery_app_task import app
if __name__ == __main__:
    app.worker_main()
    # app.worker_main(argv=[‘--loglevel=info‘)

创建py文件:result.py,查看任务执行结果

from celery.result import AsyncResult
from celery_app_task import app

async = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=app)

if async.successful():
    result = async.get()
    print(result)
    # result.forget() # 将结果删除
elif async.failed():
    print(执行失败)
elif async.status == PENDING:
    print(任务等待中被执行)
elif async.status == RETRY:
    print(任务异常后正在重试)
elif async.status == STARTED:
    print(任务已经开始被执行)

执行 add_task.py,添加任务,并获取任务ID

执行 run.py ,或者执行命令:celery worker -A celery_app_task -l info

执行 result.py,检查任务状态并获取结果

多任务结构

pro_cel
    ├── celery_task # celery相关文件夹
    │   ├── celery.py   # celery连接和配置相关文件,必须叫这个名字
    │   └── tasks1.py    #  所有任务函数
    │    └── tasks2.py    #  所有任务函数
    ├── check_result.py # 检查结果
    └── send_task.py    # 触发任务

celery.py

from celery import Celery

app = Celery(celery_demo,
             broker=redis://127.0.0.1:6379/1,
             backend=redis://127.0.0.1:6379/2,
             # 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类
             include=[celery_task.tasks1,
                      celery_task.tasks2
                      ])

# 时区
app.conf.timezone = Asia/Shanghai
# 是否使用UTC
app.conf.enable_utc = False

tasks1.py

import time
from celery_task.celery import app

@app.task
def test_celery1(res):
    time.sleep(5)
    return "test_celery1任务结果:%s" % res

tasks2.py

import time
from celery_task.celery import app
@app.task
def test_celery2(res):
    time.sleep(5)
    return "test_celery2任务结果:%s" % res

send_task.py

from celery_task.tasks1 import test_celery1
from celery_task.tasks2 import test_celery2

# 立即告知celery去执行test_celery任务,并传入一个参数
result = test_celery1.delay(第一个的执行)
print(result.id)
result = test_celery2.delay(第二个的执行)
print(result.id)

check_result.py

from celery.result import AsyncResult
from celery_task.celery import app

async = AsyncResult(id="08eb2778-24e1-44e4-a54b-56990b3519ef", app=app)

if async.successful():
    result = async.get()
    print(result)
    # result.forget() # 将结果删除,执行完成,结果不会自动删除
    # async.revoke(terminate=True)  # 无论现在是什么时候,都要终止
    # async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。
elif async.failed():
    print(执行失败)
elif async.status == PENDING:
    print(任务等待中被执行)
elif async.status == RETRY:
    print(任务异常后正在重试)
elif async.status == STARTED:
    print(任务已经开始被执行)

添加任务(执行send_task.py),开启work:celery worker -A celery_task -l info -P eventlet,检查任务执行结果(执行check_result.py)

Celery执行定时任务

设定时间让celery执行一个任务

add_task.py

from celery_app_task import add
from datetime import datetime

ctime = datetime.now()
# 默认用utc时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay

# 使用apply_async并设定时间
result = add.apply_async(args=[4, 3], eta=task_time)
print(result.id)

类似于contab的定时任务

多任务结构中celery.py修改如下

from datetime import timedelta
from celery import Celery
from celery.schedules import crontab

cel = Celery(tasks, broker=redis://127.0.0.1:6379/1, backend=redis://127.0.0.1:6379/2, include=[
    celery_task.tasks1,
    celery_task.tasks2,
])
cel.conf.timezone = Asia/Shanghai
cel.conf.enable_utc = False

cel.conf.beat_schedule = {
    # 名字随意命名
    add-every-10-seconds: {
        # 执行tasks1下的test_celery函数
        task: celery_task.tasks1.test_celery,
        # 每隔2秒执行一次
        # ‘schedule‘: 1.0,
        # ‘schedule‘: crontab(minute="*/1"),
        schedule: timedelta(seconds=2),
        # 传递参数
        args: (test,)
    },
    # ‘add-every-12-seconds‘: {
    #     ‘task‘: ‘celery_task.tasks1.test_celery‘,
    #     每年4月11号,8点42分执行
    #     ‘schedule‘: crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
    #     ‘schedule‘: crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
    #     ‘args‘: (16, 16)
    # },
}

启动一个beat:celery beat -A celery_task -l info

启动work执行:celery worker -A celery_task -l info -P eventlet

Django中使用Celery

安装包

celery==3.1.25  # 不行就下载2.10.6
django-celery==3.1.20  # 不行就装最新版或者4.xx.xx

在项目目录下创建celeryconfig.py

import djcelery
djcelery.setup_loader()
CELERY_IMPORTS=(
    app01.tasks,
)
#有些情况可以防止死锁
CELERYD_FORCE_EXECV=True
# 设置并发worker数量
CELERYD_CONCURRENCY=4
#允许重试
CELERY_ACKS_LATE=True
# 每个worker最多执行100个任务被销毁,可以防止内存泄漏
CELERYD_MAX_TASKS_PER_CHILD=100
# 超时时间
CELERYD_TASK_TIME_LIMIT=12*30

在app01目录下创建tasks.py

from celery import task
@task
def add(a,b):
    with open(a.text, a, encoding=utf-8) as f:
        f.write(a)
    print(a+b)

视图函数views.py

from django.shortcuts import render,HttpResponse
from app01.tasks import add
from datetime import datetime
def test(request):
    # result=add.delay(2,3)
    ctime = datetime.now()
    # 默认用utc时间
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    from datetime import timedelta
    time_delay = timedelta(seconds=5)
    task_time = utc_ctime + time_delay
    result = add.apply_async(args=[4, 3], eta=task_time)
    print(result.id)
    return HttpResponse(ok)

settings.py

INSTALLED_APPS = [
    ...
    djcelery,
    app01
]

...

from djagocele import celeryconfig
BROKER_BACKEND=redis
BOOKER_URL=redis://127.0.0.1:6379/1
CELERY_RESULT_BACKEND=redis://127.0.0.1:6379/2

 

 

celery

原文:https://www.cnblogs.com/zhouze/p/11436613.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!