处理大量消息的分布式系统
专注于实时处理的异步任务队列
Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。
user提交任务,提交到broker(消息中简介),workers一个个的工人,去broker里取任务一个个执行,执行完之后存储到store (broker,workers,store都是一个个程序,所以都是放在单独的一个个py文件里)
应用场景:
1.用户提交任务将视频转成MP4格式,传上去要转,转很耗费时间,所以就可以提一个转视频的任务,然后把请求返回给用户告诉用户任务以提交,你可以去干别的事。然后任务执行完成之后就把结果放在store里(成功,失败,正在执行中),用户就要store里查看结果
2.注册发邮件:一个用户一旦注册了之后,把发短信和发邮件的任务提交上去,让它异步给我去发邮件,一旦注册成功,用户看的是注册成功了,其实后台有封邮件朝他发呢
3.抢购东西,正常逻辑:(秒杀的时候,会显示您正在排队中,点提交会立马到数据库查询是否有数据,有的话就减一,存订单,返回)如果一瞬间来了好多好多人,你程序是扛不住的,所以是在秒杀之前把数据放入到redis当中,不去操作数据库,去操作redis,如我这商品有10个就去拿十个数字就好了,每次减一个数字的时候,告诉他购买成功了,他这次请求减一代表他秒杀成功了,这速度比你操作数据库会快一些,但是如果很多人也扛不住,所以也要用到salary进行异步提交任务,在redis管道中排队,然后立马给用户返回“您正在排队中”,这种就是流量的削峰
多线程在处理数据库的时候,可能会存在秒超的情况,所以就需要加锁乐观锁(在修改数据的时候,不加锁,然后修改完数据之后再检查,别人是否动过我的数据,如果动过就要进行回滚)悲观锁(修改数据的时候,要加锁,改完数据之后再释放锁)
Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等 (redis不仅可以当数据库,也可以当消息中间件)
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
任务结果存储也不是celery提供的,Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等
消息中间件中的信息和任务结果存储的数据都是放在redis的不同db当中
Celery version 4.0 runs on Python ?2.7, 3.4, 3.5? PyPy ?5.4, 5.5? This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required. If you’re running an older version of Python, you need to be running an older version of Celery: Python 2.6: Celery series 3.1 or earlier. Python 2.5: Celery series 3.0 or earlier. Python 2.4 was Celery series 2.2 or earlier. Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.
celery不支持windows,windows上是可以用的,只是多加些东西而已
异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理统计商品的消耗量,生成一个折线图,页面静态化(首页经常被访问,比如博客的首页系统,每次访问都去查数据库很耗时,那我可以放入到缓存中也很耗时,所以给用户一个静态页面)等等
pip install celery
消息中间件:RabbitMQ/Redis
app=Celery(‘任务名‘,backend=‘xxx‘,broker=‘xxx‘)
创建py文件:celery_app_task.py
import celery import time # broker=‘redis://127.0.0.1:6379/2‘ 不加密码 #这是装有broker任务存放的redis中的db,backed是返回消息存放的redis中的db backend=‘redis://:123456@127.0.0.1:6379/1‘ broker=‘redis://:123456@127.0.0.1:6379/2‘ #可能一个项目中有多个Celery所以要指定一个名字‘test‘而且不能重名 #任务其实就是个函数 #需要用一个装饰器装饰,表示该任务是被celery管理的,并且可以用celery执行的 cel=celery.Celery(‘test‘,backend=backend,broker=broker) @cel.task def add(x,y): import time time.sleep(3) return x+y
创建py文件:add_task.py,添加任务
#提交任务到消息队列中,返回任务的id from celery_app_task import add result = add.delay(4,5) print(result.id)
注:windows下:celery worker -A celery_app_task -l info -P eventlet
celery_app_task(这是自己一开始创建py文件的名字)-l不是数字1
from celery_app_task import cel if __name__ == ‘__main__‘: cel.worker_main() # cel.worker_main(argv=[‘--loglevel=info‘)
创建py文件:result.py,查看任务执行结果
from celery.result import AsyncResult from celery_app_task import cel #id是在add_task.py中add.delay(4,5)提交任务时返回得到的任务id,cel是celery_app_task.py中Celery实例化得到的对象 async = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=cel) if async.successful(): result = async.get() print(result) # result.forget() # 将结果删除 elif async.failed(): print(‘执行失败‘) elif async.status == ‘PENDING‘: print(‘任务等待中被执行‘) elif async.status == ‘RETRY‘: print(‘任务异常后正在重试‘) elif async.status == ‘STARTED‘: print(‘任务已经开始被执行‘)
执行 run.py ,或者执行命令:celery worker -A celery_app_task -l info
执行 result.py,检查任务状态并获取结果
pro_cel ├── celery_task# celery相关文件夹 │ ├── celery.py # celery连接和配置相关文件,必须叫这个名字 │ └── tasks1.py # 所有任务函数 │ └── tasks2.py # 所有任务函数 ├── check_result.py # 检查结果 └── send_task.py # 触发任务
celery.py
from celery import Celery cel = Celery(‘celery_demo‘, broker=‘redis://127.0.0.1:6379/1‘, backend=‘redis://127.0.0.1:6379/2‘, # 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类 include=[‘celery_task.tasks1‘, ‘celery_task.tasks2‘ ]) # 时区 cel.conf.timezone = ‘Asia/Shanghai‘ # 是否使用UTC cel.conf.enable_utc = False
tasks1.py
import time from celery_task.celery import cel @cel.task def test_celery(res): time.sleep(5) return "test_celery任务结果:%s"%res
tasks2.py
import time from celery_task.celery import cel @cel.task def test_celery2(res): time.sleep(5) return "test_celery2任务结果:%s"%res
check_result.py
from celery.result import AsyncResult from celery_task.celery import cel async = AsyncResult(id="08eb2778-24e1-44e4-a54b-56990b3519ef", app=cel) if async.successful(): result = async.get() print(result) # result.forget() # 将结果删除,执行完成,结果不会自动删除 # async.revoke(terminate=True) # 无论现在是什么时候,都要终止 # async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。 elif async.failed(): print(‘执行失败‘) elif async.status == ‘PENDING‘: print(‘任务等待中被执行‘) elif async.status == ‘RETRY‘: print(‘任务异常后正在重试‘) elif async.status == ‘STARTED‘: print(‘任务已经开始被执行‘)
send_task.py
from celery_task.tasks1 import test_celery from celery_task.tasks2 import test_celery2 # 立即告知celery去执行test_celery任务,并传入一个参数 result = test_celery.delay(‘第一个的执行‘) print(result.id) result = test_celery2.delay(‘第二个的执行‘) print(result.id)
添加任务(执行send_task.py),开启work:celery worker -A celery_task -l info -P eventlet,检查任务执行结果(执行check_result.py)
add_task.py
from celery_app_task import add from datetime import datetime # 方式一 2019-02-13 10:19:56时间格式 # v1 = datetime(2019, 2, 13, 18, 19, 56) # print(v1) # v2 = datetime.utcfromtimestamp(v1.timestamp()) # print(v2) #取出要执行任务的时间对象,调用apply_async方法,args是参数,eta是执行的时间 # result = add.apply_async(args=[1, 3], eta=v2) # print(result.id) # 方式二 ctime = datetime.now() # 默认用utc时间 utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) from datetime import timedelta #取十秒之后的事件对象 time_delay = timedelta(seconds=10) #时间对象相加 task_time = utc_ctime + time_delay # 使用apply_async并设定时间 result = add.apply_async(args=[4, 3], eta=task_time) print(result.id)
多任务结构中celery.py修改如下
from datetime import timedelta from celery import Celery from celery.schedules import crontab cel = Celery(‘tasks‘, broker=‘redis://127.0.0.1:6379/1‘, backend=‘redis://127.0.0.1:6379/2‘, include=[ ‘celery_task.tasks1‘, ‘celery_task.tasks2‘, ]) cel.conf.timezone = ‘Asia/Shanghai‘ cel.conf.enable_utc = False cel.conf.beat_schedule = { # 名字随意命名 ‘add-every-10-seconds‘: { # 执行tasks1下的test_celery函数 ‘task‘: ‘celery_task.tasks1.test_celery‘, # 每隔2秒执行一次 # ‘schedule‘: 1.0, # ‘schedule‘: crontab(minute="*/1"), ‘schedule‘: timedelta(seconds=2), # 传递参数 ‘args‘: (‘test‘,) }, # ‘add-every-12-seconds‘: { # ‘task‘: ‘celery_task.tasks1.test_celery‘, # 每年4月11号,8点42分执行 # ‘schedule‘: crontab(minute=42, hour=8, day_of_month=11, month_of_year=4), # ‘schedule‘: crontab(minute=42, hour=8, day_of_month=11, month_of_year=4), # ‘args‘: (16, 16) # }, }
启动work执行:celery worker -A celery_task -l info -P eventlet
安装包
celery==3.1.25
django-celery==3.1.20
在项目目录下创建celeryconfig.py
import djcelery djcelery.setup_loader() CELERY_IMPORTS=( ‘app01.tasks‘, ) #有些情况可以防止死锁 CELERYD_FORCE_EXECV=True # 设置并发worker数量 CELERYD_CONCURRENCY=4 #允许重试 CELERY_ACKS_LATE=True # 每个worker最多执行100个任务被销毁,可以防止内存泄漏 CELERYD_MAX_TASKS_PER_CHILD=100 # 超时时间 CELERYD_TASK_TIME_LIMIT=12*30
在app01目录下创建tasks.py
from celery import task @task def add(a,b): with open(‘a.text‘, ‘a‘, encoding=‘utf-8‘) as f: f.write(‘a‘) print(a+b)
视图函数views.py
from django.shortcuts import render,HttpResponse from app01.tasks import add from datetime import datetime def test(request): # result=add.delay(2,3) ctime = datetime.now() # 默认用utc时间 utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) from datetime import timedelta time_delay = timedelta(seconds=5) task_time = utc_ctime + time_delay result = add.apply_async(args=[4, 3], eta=task_time) print(result.id) return HttpResponse(‘ok‘)
settings.py
INSTALLED_APPS = [ ... ‘djcelery‘, ‘app01‘ ] ... from djagocele import celeryconfig BROKER_BACKEND=‘redis‘ BOOKER_URL=‘redis://127.0.0.1:6379/1‘ CELERY_RESULT_BACKEND=‘redis://127.0.0.1:6379/2‘
原文:https://www.cnblogs.com/huangxuanya/p/11178529.html