Celery是一个基于Python编写的分布式任务队列(Distributed Task Queue), 通过对Celery进行简单操作就可以实现任务(耗时任务, 定时任务)的异步处理
Celery4.0版本开始,不支持windows平台
pip install -U "Celery[redis]"
注意事项:
在windows上安装后,可能会出现如下报错:
ValueError: ‘__name__‘ in __slots__ conflicts with class variable
此时先卸载celery, 然后尝试通过如下命令重新进行安装
pip install -U https://github.com/celery/py-amqp/zipball/master pip install -U https://github.com/celery/billiard/zipball/master pip install -U https://github.com/celery/kombu/zipball/master pip install -U https://github.com/celery/celery/zipball/master pip install -U "Celery[redis]"
ln -s ~/.venv/project_dj/bin/celery /usr/bin/celery
[root@localhost ~]$ celery --help Options: -A, --app APPLICATION -b, --broker TEXT --result-backend TEXT --loader TEXT --config TEXT --workdir PATH -C, --no-color -q, --quiet --version --help Show this message and exit. Commands: amqp AMQP Administration Shell. beat Start the beat periodic task scheduler. call Call a task by name. control Workers remote control. events Event-stream utilities. graph The ``celery graph`` command. inspect Inspect the worker at runtime. list Get info from broker. logtool The ``celery logtool`` command. migrate Migrate tasks from one broker to another. multi Start multiple worker instances. purge Erase all messages from all known task queues. report Shows information useful to include in bug-reports. result Print the return value for a given task id. shell Start shell session with convenient access to celery symbols. status Show list of workers that are online. upgrade Perform upgrade between versions. worker Start worker instance.
# -*- coding: utf-8 -*- # @Time : 2021/5/24 11:20 # @Author : chinablue # @File : task.py from celery import Celery # 创建一个app(Celery实例),作为所有celery操作的切入点 broker_url = f"redis://:123456@127.0.0.1:6379/5" backend_url = f"redis://:123456@127.0.0.1:6379/6" app = Celery("tasks", broker=broker_url, backend=backend_url) # 定义一个任务 @app.task def add(x, y): return x + y
事项说明:
1) 创建Celery实例时,需要指定一个消息代理(broker)来接收和发送任务消息. 本文使用的是Redis(docker redis搭建)
2) broker和backend参数的格式: redis://:password@hostname:port/db_number
celery -A tasks worker --loglevel=INFO
事项说明:
1) 在生产环境中, 会使用supervisor工具将celery服务作为守护进程在后台运行
打开终端, 进入python命令行模式:
>>> result = add.delay(4, 4)
>>> result = add.apply_async((4, 4), countdown=5)
事项说明:
1) add.apply_async((4, 4)) 可以简写为 add.delay(4, 4)
2) add.apply_async((4, 4), countdown=5) 表示任务发出5秒后再执行
若想获取每个任务的执行信息,在创建Celery实例时, 需要指定一个后端(backend). 本文使用的是Redis(docker redis搭建)
result = add.delay(4, 4) result.ready() # 任务状态: 进行中, 已完成 result.failed() # 任务完成, 任务失败 result.successful() # 任务完成, 任务成功 result.state # 任务状态: PENDING, STARTED, SUCCESS result.get() # 获取任务的返回值 result.get(timeout=10) result.get(propagate=False) # 如果任务引发了异常, propagate=False表示异常不会被抛出来(默认情况会抛出来) result.id # 任务id
注意事项:
1) 在celery中,如果想配置backend参数,有如下三种方式
# -*- coding: utf-8 -*- # @Time : 2021/5/24 11:20 # @Author : chinablue # @File : task.py from celery import Celery # 创建一个app(Celery实例),作为所有celery操作的切入点 broker_url = f"redis://:123456@127.0.0.1:6379/5" backend_url = f"redis://:123456@127.0.0.1:6379/6" app = Celery("tasks", broker=broker_url, backend=backend_url) # 定义一个任务 @app.task def add(x, y): return x + y
# -*- coding: utf-8 -*- # @Time : 2021/5/24 11:20 # @Author : chinablue # @File : task.py from celery import Celery broker_url = f"redis://:123456@127.0.0.1:6379/5" backend_url = f"redis://:123456@127.0.0.1:6379/6" app = Celery("tasks") app.conf.update({ "broker_url": broker_url, "result_backend": backend_url, }) # 定义一个任务 @app.task def add(x, y): return x + y
# -*- coding: utf-8 -*- # @Time : 2021/5/24 11:20 # @Author : chinablue # @File : task.py from celery import Celery broker_url = f"redis://:123456@127.0.0.1:6379/5" backend_url = f"redis://:123456@127.0.0.1:6379/6" app = Celery("tasks") app.conf.broker_url = broker_url app.conf.result_backend = backend_url # 定义一个任务 @app.task def add(x, y): return x + y
Python Celery分布式任务队列的安装与介绍(基于Redis)
原文:https://www.cnblogs.com/reconova-56/p/14806920.html