1.定义:
Celery是一个异步的任务队列(也叫做分布式任务队列)
2.工作结构
Celery分为3个部分
(1)worker部分负责任务的处理,即工作进程(我的理解工作进程就是你写的python代码,当然还包括python调用系统工具功能)
(2)broker部分负责任务消息的分发以及任务结果的存储,这部分任务主要由中间数据存储系统完成,比如消息队列服务器RabbitMQ、redis、Amazon SQS、MongoDB、IronMQ等或者关系型数据库,使用关系型数据库依赖sqlalchemy或者django的ORM
(3)Celery主类,进行任务最开始的指派与执行控制,他可以是单独的python脚本,也可以和其他程序结合,应用到django或者flask等web框架里面以及你能想到的任何应用
3.话不多说,用起来
(1)安装Celery(要安装celery3版本,4版本改动较大没测试)
pip install celery==3.1.17
(2)broker部分此处使用安装好的redis服务6380端口的db0作为消息队列,普通redis服务的安装此处不做介绍
(3)Celery的使用一(单独脚本调用,简单方便)
注:不考虑任务的结果存储情况
<1>/tmp/tasks.py(实际脚本中不要写中文注释)
#!/usr/bin/env python # -*- coding=utf-8 -*- from celery import Celery from celery import platforms #用于开启root也可以启动celery服务,默认是不允许root启动celery的 platforms.C_FORCE_ROOT = True #创建一个celery实例,传递进去的第一个参数tasks必须是本文件的文件名tasks,指定broker为本机redis6380服务 celery = Celery(‘tasks‘, broker=‘redis://localhost:6380/0‘) #使用celery实例的task装饰器装饰add函数,此处的add函数可以当作后期的耗时任务对待 @celery.task def add(x,y): return x + y
<2>启动celery服务
#cd /tmp #celery -A tasks worker --loglevel=info
<3>验证执行任务
导入模块,执行add函数,此处使用add.delay(3,4)而不是add(3,4),因为被celery封装了,要异步执行需要额外使用add.delay(3,4)
需要注意,如果把返回值赋值给一个变量,那么原来的应用程序也会被阻塞,需要等待异步任务返回的结果。因此,实际使用中,不需要把结果赋值。
#cd /tmp #python >>>from tasks import add >>>add.delay(3,4)
celery服务的窗口会刷出任务的信息,以及是否处理成功,以及结果
将来只要在别的程序中引入tasks中的add函数,就是异步的了,是不是有点屌。。。。。
<4>扩展知识,指定队列名
传入redis中的指定队列testq怎么玩?(其他broker引擎也支持)
启动celery服务的时候添加额外参数-Q ‘队列名‘
#cd /tmp #celery -A tasks.tasks worker --loglevel=info -Q ‘testq‘
跑任务的时候指定testq队列名
#cd /tmp #python >>>from tasks import add >>>add.delay(3,4,queue=‘testq‘)
<5>扩展知识,指定开启的worker进程数(底层是调用的Python的multiprocessing模块中的Pool进程池思想来做)
-c 5 开启5个worker进程来同时抢任务,跑任务
#cd /tmp #celery -A tasks.tasks worker --loglevel=info -c 5
<6>扩展知识,管理broker里面的数据,查看任务状态,以及任务的详细信息
安装一个叫flower的webui,提供任务查询,worker生命管理,以及路由管理等(底层是通过tornado框架封装的)
#pip install flower #任意目录下执行都可以 #celery flower --port=5555 --broker=redis://localhost:6380/0
里面可以看到任务参数,结果,接受任务时间,任务开始时间,任务状态,Started是任务进行中,Success是任务跑完执行成功
(4)Celery的使用二(项目方式,也叫做Python包方式,结构清晰,低耦合;相比纯脚本方式略复杂,用不用由你)
创建一个叫做proj的Python包(创建Python包的操作此处不做详细说明,tree /tmp/proj)
<1>proj/celery.py
from __future__ import absolute_import 据说添加此行可以低降低出错的概率哦(阿门保佑;其实就是兼容Python版本的一个东东)
创建一个celery的实例,名字叫做app,传递进去的第一个参数是Python包的名字,include加载任务文件,config_from_object
指定celery的配置文件(好吧,看起来比单纯使用脚本方式麻烦点,请继续往下看)
#!/usr/bin/env python # -*- coding=utf-8 -*- from __future__ import absolute_import from celery import Celery app = Celery(‘proj‘, include=[‘proj.tasks‘]) app.config_from_object(‘proj.config‘) if __name__ == ‘__main__‘: app.start()
<2>proj/config.py
配置文件里指定broker
#!/usr/bin/env python # -*- coding:utf-8 -*- from __future__ import absolute_import BROKER_URL = ‘redis://127.0.0.1:6380/0‘
<3>proj/tasks.py
导入celery实例,实例绑定任务
#!/usr/bin/env python # -*- coding:utf-8 -*- from __future__ import absolute_import from proj.celery import app @app.task def add(x, y): return x + y
<4>开启celery服务(特定目录指定包名字启动)
#cd /tmp/ #celery -A proj worker -l info
<5>扩展功能,指定队列名,调整worker进程数,页面管理celery同上,不再做说明
(5)Celery的使用三(django-celery模式;#反正我喜欢用这种)
django调用celery跑异步任务,常见场景有注册成功,发送邮件可以异步来防止网络IO阻塞,以及耗时间的任务,例如需要去跑9000台IP的某些配置参数任务,或者下发任务执行,可能需要10几分钟才能跑完,就可以WEB应用中使用这种异步方式
<1>安装django-celery软件包
一定要注意celery的版本和django-celery的小版本要保持一致,否则会有各种杂七杂八的小问题(都是泪.......)
#pip install celery==3.1.17 #pip install django-celery==3.1.17
<2>创建celery必须的数据库表结构
#cd Python_20161203 #python manage.py migrate
<3>django项目的settings.py文件中追加如下内容;app呢是django项目里面的应用名字
settings.py
import djcelery djcelery.setup_loader() BROKER_URL = ‘redis://127.0.0.1:6380/0‘ CELERY_RESULT_BACKEND = ‘redis://127.0.0.1:6380/1‘ CELERY_TASK_SERIALIZER = ‘json‘ CELERY_RESULT_SERIALIZER = ‘json‘ CELERY_ACCEPT_CONTENT = [‘json‘] CELERY_IMPORTS = (‘app.tasks‘, ) CELERYBEAT_SCHEDULER = ‘djcelery.schedulers.DatabaseScheduler‘ CELERYD_CONCURRENCY = 20
参数说明(可以根据自己的需求添加自己的参数):
CELERY_RESULT_BACKEND = "redis://127.0.0.1:6380/1‘" #结果存储
CELERY_TASK_RESULT_EXPIRES = 1200 # celery任务执行结果的超时时间,我的任务都不需要返回结果,只需要正确执行就行
CELERYD_CONCURRENCY = 20 # celery worker的并发数 也是命令行-c指定的数目,事实上实践发现并不是worker也多越好,保证任务不堆积,加上一定新增任务的预留就可以
CELERYD_PREFETCH_MULTIPLIER = 4 # celery worker 每次去redis取任务的数量,我这里预取了4个慢慢执行,因为任务有长有短没有预取太多
CELERYD_MAX_TASKS_PER_CHILD = 200 # 每个worker执行了多少任务就会死掉,我建议数量可以大一些,比如200
CELERYBEAT_SCHEDULER = ‘djcelery.schedulers.DatabaseScheduler‘ # 这是使用了django-celery默认的数据库调度模型,任务执行周期都被存在你指定的orm数据库中
<4>app/tasks.py(在django的app应用目录下创建tasks.py任务文件,里面调用复杂的任务函数)
#!/usr/bin/env python # -*- coding=utf-8 -*- ############################### from __future__ import absolute_import from celery import task import time #task装饰器封装了celery函数,为耗时的操作 @task def add(x,y): for i in range(30): print i time.sleep(1) return x + y
<5>添加验证功能,查看实际效果
app/urls.py
urlpatterns = [ url(r‘^celery_test/,views.celery_test), ]
app/views.py
def celery_test(request): from tasks import add add.delay(4,8) return HttpResponse(‘Celery testing666‘)
<6>开启djanog服务和celery服务(虽然耦合了,但是还是需要额外开启)
#python manage.py runserver 0.0.0.0:8000 #另一个窗口开启celery服务 #python manage.py celery worker --loglevel=info
<7>发送http的GET请求,调用celery去执行异步任务(大功告成)
celery那端的屏幕输出如下:
[2016-12-01 16:16:00,940: INFO/MainProcess] Received task: app.tasks.add[06a8d603-a7d3-4732-b8f3-ad010d531200] [2016-12-01 16:16:00,941: WARNING/Worker-1] 0 [2016-12-01 16:16:01,943: WARNING/Worker-1] 1 [2016-12-01 16:16:02,945: WARNING/Worker-1] 2 [2016-12-01 16:16:03,947: WARNING/Worker-1] 3 [2016-12-01 16:16:04,948: WARNING/Worker-1] 4 [2016-12-01 16:16:05,950: WARNING/Worker-1] 5 [2016-12-01 16:16:06,952: WARNING/Worker-1] 6 [2016-12-01 16:16:07,954: WARNING/Worker-1] 7 [2016-12-01 16:16:08,955: WARNING/Worker-1] 8 [2016-12-01 16:16:09,957: WARNING/Worker-1] 9 [2016-12-01 16:16:10,958: WARNING/Worker-1] 10 [2016-12-01 16:16:11,959: WARNING/Worker-1] 11 [2016-12-01 16:16:12,961: WARNING/Worker-1] 12 [2016-12-01 16:16:13,962: WARNING/Worker-1] 13 [2016-12-01 16:16:14,964: WARNING/Worker-1] 14 [2016-12-01 16:16:15,964: WARNING/Worker-1] 15 [2016-12-01 16:16:16,966: WARNING/Worker-1] 16 [2016-12-01 16:16:17,968: WARNING/Worker-1] 17 [2016-12-01 16:16:18,969: WARNING/Worker-1] 18 [2016-12-01 16:16:19,971: WARNING/Worker-1] 19 [2016-12-01 16:16:20,973: WARNING/Worker-1] 20 [2016-12-01 16:16:21,974: WARNING/Worker-1] 21 [2016-12-01 16:16:22,976: WARNING/Worker-1] 22 [2016-12-01 16:16:23,978: WARNING/Worker-1] 23 [2016-12-01 16:16:24,979: WARNING/Worker-1] 24 [2016-12-01 16:16:25,981: WARNING/Worker-1] 25 [2016-12-01 16:16:26,982: WARNING/Worker-1] 26 [2016-12-01 16:16:27,984: WARNING/Worker-1] 27 [2016-12-01 16:16:28,986: WARNING/Worker-1] 28 [2016-12-01 16:16:29,987: WARNING/Worker-1] 29 [2016-12-01 16:16:30,990: INFO/MainProcess] Task app.tasks.add[06a8d603-a7d3-4732-b8f3-ad010d531200] succeeded in 30.049149203s: 12