首页 > 其他 > 详细

celery实现任务进度查询

时间:2020-06-16 00:06:04      阅读:499      评论:0      收藏:0      [点我收藏+]

 

如果想在任务成功或者失败额外做点事,可以重写Task类。

tasks.py

from __future__ import absolute_import, unicode_literals
from .celery import app
from celery.task import Task

class MyTask(Task):
    def on_success(self, retval, task_id, args, kwargs):
        print(task done==================>: {0}.format(retval))
        return super(MyTask, self).on_success(retval, task_id, args, kwargs)

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print(task fail, reason=================>: {0}.format(exc))
        return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)

@app.task(base=MyTask)
def add(x, y):
    return x + y

 

@app.task(bind=True)

意思是绑定任务为实例方法,执行中的任务能获取到了自己执行任务的各种信息,可以根据这些信息做很多其他操作,例如判断链式任务是否到结尾等等。

示例:

# tasks.py
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)
@app.task(bind=True)
def add(self, x, y):
    logger.info(self.request.__dict__)
    return x + y

 

案例模拟进度条,查看任务进度

------------------目录结构-------------------

--start.py

--proj

   --celery.py

   --tasks.py

--------------------------------------------------

技术分享图片
from __future__ import absolute_import, unicode_literals
#导入安装的celery(所以上面要写绝对导入),而不是自己导自己(from .celery)
from celery import Celery

app = Celery(proj,
             broker=redis://,
             backend=redis://,
             include=[proj.tasks,])  # 这个celery管理了哪些task文件可以有多个,其中有个定时任务periodic_task

# Optional configuration, see the application user guide.
# update方法更新配置,也可以直接写在上面初始化Celery里面
app.conf.update(
    result_expires=3600,  # 任务结果一小时内没人取就丢弃
)

if __name__ == __main__:
    app.start()
celery.py

 

技术分享图片
from __future__ import absolute_import, unicode_literals
from .celery import app
#from celery import Celery
import time

@app.task(bind=True)
def test_mes(self):
    for i in range(1, 11):
        time.sleep(1)
        self.update_state(state="PROGRESS", meta={p: i*10})
    return finish
tasks.py

 

技术分享图片
from proj.tasks import test_mes
import sys

def pm(body):
    res = body.get(result)
    if body.get(status) == PROGRESS:
        sys.stdout.write(\r任务进度: {0}%.format(res.get(p)))
        sys.stdout.flush()
    else:
        print(\r)
        print(res)
r = test_mes.delay()
r.get(on_message=pm, propagate=False)  #‘FINISH‘
run.py

 

 

模拟excel入库前端显示任务进度

from __future__ import absolute_import, unicode_literals
from .celery import app
#from celery import Celery
import time

@app.task(bind=True)
def excel_into_db(self):
    # 读取excel到内存==>校验excel==>入库
    self.update_state(state="PROGRESS", meta={step:读取excel到内存..., prog:10%})
    time.sleep(5)
    self.update_state(state="PROGRESS", meta={step:读取excel到内存..., prog:100%})
    self.update_state(state="PROGRESS", meta={step:校验excel格式..., prog:10%})
    time.sleep(10)
    self.update_state(state="PROGRESS", meta={step:校验excel格式..., prog:10%})
    self.update_state(state="PROGRESS", meta={step:入库..., prog:10%})
    time.sleep(5)
    self.update_state(state="PROGRESS", meta={step:入库..., prog:100%})
    return 上传成功!

 

celery实现任务进度查询

原文:https://www.cnblogs.com/staff/p/13138581.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!