tasks.py
from __future__ import absolute_import, unicode_literals from .celery import app from celery.task import Task class MyTask(Task): def on_success(self, retval, task_id, args, kwargs): print(‘task done==================>: {0}‘.format(retval)) return super(MyTask, self).on_success(retval, task_id, args, kwargs) def on_failure(self, exc, task_id, args, kwargs, einfo): print(‘task fail, reason=================>: {0}‘.format(exc)) return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo) @app.task(base=MyTask) def add(x, y): return x + y
意思是绑定任务为实例方法,执行中的任务能获取到了自己执行任务的各种信息,可以根据这些信息做很多其他操作,例如判断链式任务是否到结尾等等。
示例:
# tasks.py from celery.utils.log import get_task_logger logger = get_task_logger(__name__) @app.task(bind=True) def add(self, x, y): logger.info(self.request.__dict__) return x + y
------------------目录结构-------------------
--start.py
--proj
--celery.py
--tasks.py
--------------------------------------------------
from __future__ import absolute_import, unicode_literals #导入安装的celery(所以上面要写绝对导入),而不是自己导自己(from .celery) from celery import Celery app = Celery(‘proj‘, broker=‘redis://‘, backend=‘redis://‘, include=[‘proj.tasks‘,]) # 这个celery管理了哪些task文件可以有多个,其中有个定时任务periodic_task # Optional configuration, see the application user guide. # update方法更新配置,也可以直接写在上面初始化Celery里面 app.conf.update( result_expires=3600, # 任务结果一小时内没人取就丢弃 ) if __name__ == ‘__main__‘: app.start()
from __future__ import absolute_import, unicode_literals from .celery import app #from celery import Celery import time @app.task(bind=True) def test_mes(self): for i in range(1, 11): time.sleep(1) self.update_state(state="PROGRESS", meta={‘p‘: i*10}) return ‘finish‘
from proj.tasks import test_mes import sys def pm(body): res = body.get(‘result‘) if body.get(‘status‘) == ‘PROGRESS‘: sys.stdout.write(‘\r任务进度: {0}%‘.format(res.get(‘p‘))) sys.stdout.flush() else: print(‘\r‘) print(res) r = test_mes.delay() r.get(on_message=pm, propagate=False) #‘FINISH‘
模拟excel入库前端显示任务进度
from __future__ import absolute_import, unicode_literals from .celery import app #from celery import Celery import time @app.task(bind=True) def excel_into_db(self): # 读取excel到内存==>校验excel==>入库 self.update_state(state="PROGRESS", meta={‘step‘:‘读取excel到内存...‘, ‘prog‘:‘10%‘}) time.sleep(5) self.update_state(state="PROGRESS", meta={‘step‘:‘读取excel到内存...‘, ‘prog‘:‘100%‘}) self.update_state(state="PROGRESS", meta={‘step‘:‘校验excel格式...‘, ‘prog‘:‘10%‘}) time.sleep(10) self.update_state(state="PROGRESS", meta={‘step‘:‘校验excel格式...‘, ‘prog‘:‘10%‘}) self.update_state(state="PROGRESS", meta={‘step‘:‘入库...‘, ‘prog‘:‘10%‘}) time.sleep(5) self.update_state(state="PROGRESS", meta={‘step‘:‘入库...‘, ‘prog‘:‘100%‘}) return ‘上传成功!‘
原文:https://www.cnblogs.com/staff/p/13138581.html