__init__
.py文件,不然会报错# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
import os, sys
from celery import Celery
# celery项目中的所有导包地址, 都是以CELERY_BASE_DIR为基准设定.
# 执行celery命令时, 也需要进入CELERY_BASE_DIR目录执行.
import django
CELERY_BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.join(CELERY_BASE_DIR, ‘../opwf‘))
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "opwf.settings")
# 针对于celery中使用了django代码的问题,要对django环境进行配置
# 定义celery实例, 需要的参数, 1, 实例名, 2, 任务发布位置, 3, 结果保存位置
app = Celery(‘celery‘,
broker=‘redis://127.0.0.1:6379/2‘, # 任务存放的地方
backend=‘redis://127.0.0.1:6379/3‘, # 结果存放的地方
include=[‘tasks‘, ‘tasks_beat‘])
# 由于上面配置了路径,所以导入时需要注意
# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
# @app.task 指定将这个函数的执行交给celery异步执行
# 相当于装饰器,将下面函数打包给app
from main import app
@app.task(bind=True)
def send_email(self, user_list):
# 在方法中导包
from utils.MySendEmail import EmailInform
# time.sleep(5)
try:
# 用 res 接收发送结果, 成功是:0, 失败是:-1
res = EmailInform(user_list)
except Exception as e:
res = ‘-1‘
if res == ‘-1‘:
# 如果发送结果是 -1 就重试.
self.retry(countdown=5, max_retries=3, exc=Exception(‘邮箱发送失败‘))
from django.conf import settings
from django.core.mail import send_mail
from user.models import User
def EmailInform(user_list):
for user_id in user_list:
email = User.objects.filter(pk=user_id).first().email
username = User.objects.filter(pk=user_id).first().username
subject = ‘工单系统‘
message = ‘‘
from_email = settings.EMAIL_FROM
recipient_list = [email]
html_message = ‘dear{},您有新的工单审批信息了!可以点击以下链接进行工单审批:<a href="http://127.0.0.1:8080/login/">审批工单</a>‘.format(username)
send_mail(subject=subject,
message=message,
from_email=from_email,
recipient_list=recipient_list,
html_message=html_message)
def addWorkerOrder(request):
user_info = decodeToken(request)
user_id = user_info.get(‘user_id‘)
name = request.data.get(‘name‘)
form = request.data.get(‘form‘)
flowconf = FlowConf.objects.filter(name=name).first().pk
dic = {
‘flowconf_id‘: flowconf,
‘create_user_id‘: user_id,
‘order_status‘: ‘1‘,
‘parameter‘: form
}
work_obj = WorkOrderModel.objects.create(**dic)
approveconf_obj = list(NewFlowUserRoleActionConf.objects.filter(flowconf_id=work_obj.flowconf_id).values())
time = 0
for item in approveconf_obj:
if item[‘approvetype‘] == ‘1‘:
if time == 0:
child_data = {
‘mainorder_id‘: work_obj.id,
‘approve_user_id‘: ‘‘,
‘approbe_user_role‘: Role.objects.filter(pk=item[‘approve_type_id‘]).first().zh_name,
‘approve_userrole_id‘: item[‘approve_type_id‘],
‘sequence_number‘: item[‘sequence‘],
‘approve_type_id‘: ‘1‘
}
SubOrderModel.objects.create(**child_data)
time += 1
else:
child_data = {
‘mainorder_id‘: work_obj.id,
‘approve_user_id‘: ‘‘,
‘approbe_user_role‘: Role.objects.filter(pk=item[‘approve_type_id‘]).first().zh_name,
‘approve_userrole_id‘: item[‘approve_type_id‘],
‘sequence_number‘: item[‘sequence‘],
‘approve_type_id‘: ‘1‘,
‘suborder_status‘: ‘3‘
}
SubOrderModel.objects.create(**child_data)
time += 1
if item[‘approvetype‘] == ‘2‘:
if time == 0:
child_data = {
‘mainorder_id‘: work_obj.id,
‘approve_user_id‘: User.objects.filter(pk=item[‘approve_type_id‘]).first().id,
‘approbe_user_role‘: ‘‘,
‘approve_userrole_id‘: item[‘approve_type_id‘],
‘sequence_number‘: item[‘sequence‘],
‘approve_type_id‘: ‘2‘
}
print(‘obj‘, child_data)
SubOrderModel.objects.create(**child_data)
time += 1
else:
child_data = {
‘mainorder_id‘: work_obj.id,
‘approve_user_id‘: User.objects.filter(pk=item[‘approve_type_id‘]).first().id,
‘approbe_user_role‘: ‘‘,
‘approve_userrole_id‘: item[‘approve_type_id‘],
‘sequence_number‘: item[‘sequence‘],
‘approve_type_id‘: ‘2‘,
‘suborder_status‘: ‘3‘
}
print(‘obj‘, child_data)
SubOrderModel.objects.create(**child_data)
user_list = []
user_id = SubOrderModel.objects.filter(mainorder_id=work_obj.id, sequence_number=1).first().approve_user_id
if user_id:
user_list.append(user_id)
else:
role_id = SubOrderModel.objects.filter(mainorder_id=work_obj.id, sequence_number=1).first().approve_userrole_id
user_get = UserRole.objects.filter(role_id=role_id).values(‘user_id‘)
for i in user_get:
user_list.append(i.get(‘user_id‘))
sys.path.insert(0, os.path.join(‘/home/worker/opwf_project‘, ‘celery_task‘))
from celery_task.tasks import send_email
send_email.delay(user_list)
res = {‘msg‘: ‘添加成功‘, ‘code‘: 200, ‘id‘: work_obj.pk, ‘flowconf‘: work_obj.flowconf_id}
return res
def chooseSuborder(request):
# 子工单id
suborder_id = request.data.get(‘suborder_id‘)
# 子工单审批状态
action_status = request.data.get(‘action_status‘)
# 意见
decision = request.data.get(‘decision‘)
# 所属实例工单id
mainorder = SubOrderModel.objects.filter(pk=suborder_id).first().mainorder_id
# 子工单序号
sequence_number = SubOrderModel.objects.filter(pk=suborder_id).first().sequence_number
SubOrderModel.objects.filter(pk=suborder_id).update(action_status=action_status)
SubOrderModel.objects.filter(pk=suborder_id).update(suborder_status=2)
SubOrderModel.objects.filter(pk=suborder_id).update(approve_text=decision)
squenue_list = []
squenue_obj = SubOrderModel.objects.filter(mainorder_id=mainorder).values(‘sequence_number‘)
for i in squenue_obj:
squenue_list.append(i.get(‘sequence_number‘))
print(‘list‘, squenue_list)
index = squenue_list.index(sequence_number) + 1
print(‘index‘, index)
if action_status == ‘4‘:
# 4 子工单退回--------主工单 驳回
WorkOrderModel.objects.filter(pk=mainorder).update(order_status=2) # 主工单一旦完成,子工单不变
# if squenue_list[index-1] != squenue_list[-1]:
# for m in squenue_list[index:]:
# SubOrderModel.objects.filter(sequence_number=m).update(suborder_status=2)
# SubOrderModel.objects.filter(sequence_number=m).update(action_status=action_status)
if action_status == ‘3‘:
# 3 子工单拒绝-------主工单 完成
WorkOrderModel.objects.filter(pk=mainorder).update(order_status=3) # 主工单一旦驳回,子工单不变
# if squenue_list[index-1] != squenue_list[-1]:
# for m in squenue_list[index:]:
# SubOrderModel.objects.filter(sequence_number=m).update(suborder_status=2)
# SubOrderModel.objects.filter(sequence_number=m).update(action_status=action_status)
if action_status == ‘2‘:
# suborder_status 1 待处理 2 已经处理 3 待上一节点处理
if squenue_list[index - 1] == squenue_list[-1]:
WorkOrderModel.objects.filter(pk=mainorder).update(order_status=3)
else:
# suborder_status 1 待处理 2 已经处理 3 待上一节点处理
SubOrderModel.objects.filter(mainorder_id=mainorder, sequence_number=index + 1).update(suborder_status=1)
user_list = []
user_id = SubOrderModel.objects.filter(sequence_number=index + 1, mainorder_id=mainorder).first().approve_user_id
if user_id:
user_list.append(user_id)
else:
role_id = SubOrderModel.objects.filter(sequence_number=index + 1, mainorder_id=mainorder).first().approve_userrole_id
user_get = UserRole.objects.filter(role_id=role_id).values(‘user_id‘)
for i in user_get:
user_list.append(i.get(‘user_id‘))
# EmailInform(user_list)
sys.path.insert(0, os.path.join(‘/home/worker/opwf_project‘, ‘celery_task‘))
from celery_task.tasks import send_email
send_email.delay(user_list)
res = {‘msg‘: ‘ok‘, ‘code‘: 200}
return res
class WorkOrderView(APIView):
serializer_class = WorkOrderSerializer
page_size = 4
def get(self, request):
id = request.query_params.get(‘id‘)
if id is None:
user_info = decodeToken(request)
queryset = getWorkerorder(user_info)
ret = basePaginator(queryset, request, self.page_size, self.serializer_class)
else:
ser = WorkOrderModel.objects.filter(id=id)
ret = WorkOrderSerializer(ser, many=True).data
return Response(ret)
def post(self, request):
res = addWorkerOrder(request)
return Response(res)
class SubOrderView(APIView):
serializer_class = SubOrderSerializer
page_size = 4
def get(self, request):
mainorder = request.query_params.get(‘mainorder‘)
if mainorder is None:
user_info = decodeToken(request)
queryset = getSuborder(user_info)
ret = basePaginator(queryset, request, self.page_size, self.serializer_class)
else:
ser = SubOrderModel.objects.filter(mainorder_id=mainorder)
ret = SubOrderSerializer(ser, many=True).data
return Response(ret)
def post(self, request):
res = chooseSuborder(request)
return Response(res)
celery -A celery_task worker -l INFO
# -*- coding: utf-8 -*-
# from __future__ import absolute_import, unicode_literals
#
import os, sys
CELERY_BASE_DIR = os.path.dirname(os.path.abspath(__file__))
print(sys.path)
from celery import Celery
# celery项目中的所有导包地址, 都是以CELERY_BASE_DIR为基准设定.
# 执行celery命令时, 也需要进入CELERY_BASE_DIR目录执行.
import django
sys.path.insert(0, os.path.join(CELERY_BASE_DIR, ‘../opwf‘))
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "opwf.settings")
# 定义celery实例, 需要的参数, 1, 实例名, 2, 任务发布位置, 3, 结果保存位置
app = Celery(‘celery‘,
broker=‘redis://127.0.0.1:6379/2‘, # 任务存放的地方
backend=‘redis://127.0.0.1:6379/3‘, # 结果存放的地方
include=[‘celery_task.tasks‘, ‘celery_task.tasks_beat‘, ‘celery_task.tasks_timeout‘])
# 由于上面配置了路径,所以导入时需要注意
app.conf.update(
result_expires=3600, #执行结果放到redis里,一个小时没人取就丢弃
)
from celery.schedules import crontab
# 配置定时任务
app.conf.beat_schedule = {
‘add-every-monday-seconds‘: {
‘task‘: ‘celery_task.tasks_beat.say_hello‘,
‘schedule‘: crontab(hour=8, minute=30, day_of_week=2),
},
‘check-every-24-hours‘: {
‘task‘: ‘celery_task.tasks_timeout.timeout_workorder‘,
‘schedule‘: 180.0,
# 每180秒发送一次任务(定时检测有没有超时工单,一旦超时就通知创建者)
}
}
app.conf.timezone = ‘Asia/Shanghai‘
if __name__ == ‘__main__‘:
app.start()
# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
# @app.task 指定将这个函数的执行交给celery异步执行
# 相当于装饰器,将下面函数打包给app
from celery_task.celery import app
@app.task(bind=True)
def send_email(self, user_list):
# 在方法中导包
from utils.MySendEmail import EmailInform
# time.sleep(5)
try:
# 用 res 接收发送结果, 成功是:0, 失败是:-1
res = EmailInform(user_list)
except Exception as e:
res = ‘-1‘
if res == ‘-1‘:
# 如果发送结果是 -1 就重试.
self.retry(countdown=5, max_retries=3, exc=Exception(‘邮箱发送失败‘))
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
# @app.task 指定将这个函数的执行交给celery异步执行
# 相当于装饰器,将下面函数打包给app
from celery_task.celery import app
@app.task(bind=True)
def say_hello(self):
# 在方法中导包
from utils.MySendEmail import sayHello
# time.sleep(5)
try:
# 用 res 接收发送结果, 成功是:0, 失败是:-1
res = sayHello()
except Exception as e:
res = ‘-1‘
if res == ‘-1‘:
# 如果发送结果是 -1 就重试.
self.retry(countdown=5, max_retries=3, exc=Exception(‘邮箱发送失败‘))
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
# @app.task 指定将这个函数的执行交给celery异步执行
from django.db.models import Q
from django.utils import timezone
# 相当于装饰器,将下面函数打包给app
from celery_task.celery import app
from user.models import User
from workerorder.models import WorkOrderModel
now = timezone.now()
start_time = now - timezone.timedelta(hours=24)
end_time = now
@app.task(bind=True)
def timeout_workorder(self):
# 在方法中导包
from utils.MySendEmail import timeOutEmail
workorder_queryset = WorkOrderModel.objects.exclude(
Q(create_time__range=(start_time, end_time))).filter(Q(order_status=‘1‘))
for query in workorder_queryset:
username = User.objects.filter(pk=query.create_user_id).first().username
email = User.objects.filter(pk=query.create_user_id).first().email
res = timeOutEmail(username, email)
WorkOrderModel.objects.filter(id=query.id).update(order_status=2)
# -*- coding: utf-8 -*-
from django.conf import settings
from django.core.mail import send_mail
from user.models import User
from workerorder.models import SubOrderModel
def EmailInform(user_list):
for user_id in user_list:
email = User.objects.filter(pk=user_id).first().email
username = User.objects.filter(pk=user_id).first().username
subject = ‘工单系统‘
message = ‘‘
from_email = settings.EMAIL_FROM
recipient_list = [email]
html_message = ‘dear{},您有新的工单审批信息了!可以点击以下链接进行工单审批:<a href="http://127.0.0.1:8080/login/">审批工单</a>‘.format(username)
send_mail(subject=subject,
message=message,
from_email=from_email,
recipient_list=recipient_list,
html_message=html_message)
def sayHello():
obj = SubOrderModel.objects.filter(suborder_status=‘1‘, action_status=‘1‘)
for i in obj:
if i.approve_type_id == ‘2‘:
username = User.objects.filter(pk=i.approve_user_id).first().username
email = User.objects.filter(pk=i.approve_user_id).first().eamil
subject = ‘工单系统‘
message = ‘‘
from_email = settings.EMAIL_FROM
recipient_list = [email]
html_message = ‘Dear{}, 您有新的指定工单需要审批,十万火急!‘.format(username)
send_mail(subject=subject,
message=message,
from_email=from_email,
recipient_list=recipient_list,
html_message=html_message)
def addWorkerOrder(request):
user_info = decodeToken(request)
user_id = user_info.get(‘user_id‘)
name = request.data.get(‘name‘)
form = request.data.get(‘form‘)
flowconf = FlowConf.objects.filter(name=name).first().pk
dic = {
‘flowconf_id‘: flowconf,
‘create_user_id‘: user_id,
‘order_status‘: ‘1‘,
‘parameter‘: form
}
work_obj = WorkOrderModel.objects.create(**dic)
approveconf_obj = list(NewFlowUserRoleActionConf.objects.filter(flowconf_id=work_obj.flowconf_id).values())
time = 0
for item in approveconf_obj:
if item[‘approvetype‘] == ‘1‘:
if time == 0:
child_data = {
‘mainorder_id‘: work_obj.id,
‘approve_user_id‘: ‘‘,
‘approbe_user_role‘: Role.objects.filter(pk=item[‘approve_type_id‘]).first().zh_name,
‘approve_userrole_id‘: item[‘approve_type_id‘],
‘sequence_number‘: item[‘sequence‘],
‘approve_type_id‘: ‘1‘
}
SubOrderModel.objects.create(**child_data)
time += 1
else:
child_data = {
‘mainorder_id‘: work_obj.id,
‘approve_user_id‘: ‘‘,
‘approbe_user_role‘: Role.objects.filter(pk=item[‘approve_type_id‘]).first().zh_name,
‘approve_userrole_id‘: item[‘approve_type_id‘],
‘sequence_number‘: item[‘sequence‘],
‘approve_type_id‘: ‘1‘,
‘suborder_status‘: ‘3‘
}
SubOrderModel.objects.create(**child_data)
time += 1
if item[‘approvetype‘] == ‘2‘:
if time == 0:
child_data = {
‘mainorder_id‘: work_obj.id,
‘approve_user_id‘: User.objects.filter(pk=item[‘approve_type_id‘]).first().id,
‘approbe_user_role‘: ‘‘,
‘approve_userrole_id‘: item[‘approve_type_id‘],
‘sequence_number‘: item[‘sequence‘],
‘approve_type_id‘: ‘2‘
}
print(‘obj‘, child_data)
SubOrderModel.objects.create(**child_data)
time += 1
else:
child_data = {
‘mainorder_id‘: work_obj.id,
‘approve_user_id‘: User.objects.filter(pk=item[‘approve_type_id‘]).first().id,
‘approbe_user_role‘: ‘‘,
‘approve_userrole_id‘: item[‘approve_type_id‘],
‘sequence_number‘: item[‘sequence‘],
‘approve_type_id‘: ‘2‘,
‘suborder_status‘: ‘3‘
}
print(‘obj‘, child_data)
SubOrderModel.objects.create(**child_data)
user_list = []
user_id = SubOrderModel.objects.filter(mainorder_id=work_obj.id, sequence_number=1).first().approve_user_id
if user_id:
user_list.append(user_id)
else:
role_id = SubOrderModel.objects.filter(mainorder_id=work_obj.id, sequence_number=1).first().approve_userrole_id
user_get = UserRole.objects.filter(role_id=role_id).values(‘user_id‘)
for i in user_get:
user_list.append(i.get(‘user_id‘))
from celery_task.tasks import send_email
send_email.delay(user_list)
res = {‘msg‘: ‘添加成功‘, ‘code‘: 200, ‘id‘: work_obj.pk, ‘flowconf‘: work_obj.flowconf_id}
return res
def chooseSuborder(request):
# 子工单id
suborder_id = request.data.get(‘suborder_id‘)
# 子工单审批状态
action_status = request.data.get(‘action_status‘)
# 意见
decision = request.data.get(‘decision‘)
# 所属实例工单id
mainorder = SubOrderModel.objects.filter(pk=suborder_id).first().mainorder_id
# 子工单序号
sequence_number = SubOrderModel.objects.filter(pk=suborder_id).first().sequence_number
SubOrderModel.objects.filter(pk=suborder_id).update(action_status=action_status)
SubOrderModel.objects.filter(pk=suborder_id).update(suborder_status=2)
SubOrderModel.objects.filter(pk=suborder_id).update(approve_text=decision)
squenue_list = []
squenue_obj = SubOrderModel.objects.filter(mainorder_id=mainorder).values(‘sequence_number‘)
for i in squenue_obj:
squenue_list.append(i.get(‘sequence_number‘))
print(‘list‘, squenue_list)
index = squenue_list.index(sequence_number) + 1
print(‘index‘, index)
if action_status == ‘4‘:
# 4 子工单退回--------主工单 驳回
WorkOrderModel.objects.filter(pk=mainorder).update(order_status=2) # 主工单一旦完成,子工单不变
# if squenue_list[index-1] != squenue_list[-1]:
# for m in squenue_list[index:]:
# SubOrderModel.objects.filter(sequence_number=m).update(suborder_status=2)
# SubOrderModel.objects.filter(sequence_number=m).update(action_status=action_status)
if action_status == ‘3‘:
# 3 子工单拒绝-------主工单 完成
WorkOrderModel.objects.filter(pk=mainorder).update(order_status=3) # 主工单一旦驳回,子工单不变
# if squenue_list[index-1] != squenue_list[-1]:
# for m in squenue_list[index:]:
# SubOrderModel.objects.filter(sequence_number=m).update(suborder_status=2)
# SubOrderModel.objects.filter(sequence_number=m).update(action_status=action_status)
if action_status == ‘2‘:
# suborder_status 1 待处理 2 已经处理 3 待上一节点处理
if squenue_list[index - 1] == squenue_list[-1]:
WorkOrderModel.objects.filter(pk=mainorder).update(order_status=3)
else:
# suborder_status 1 待处理 2 已经处理 3 待上一节点处理
SubOrderModel.objects.filter(mainorder_id=mainorder, sequence_number=index + 1).update(suborder_status=1)
user_list = []
user_id = SubOrderModel.objects.filter(sequence_number=index + 1, mainorder_id=mainorder).first().approve_user_id
if user_id:
user_list.append(user_id)
else:
role_id = SubOrderModel.objects.filter(sequence_number=index + 1, mainorder_id=mainorder).first().approve_userrole_id
user_get = UserRole.objects.filter(role_id=role_id).values(‘user_id‘)
for i in user_get:
user_list.append(i.get(‘user_id‘))
# EmailInform(user_list)
from celery_task.tasks import send_email
send_email.delay(user_list)
res = {‘msg‘: ‘ok‘, ‘code‘: 200}
return res
class WorkOrderView(APIView):
serializer_class = WorkOrderSerializer
page_size = 4
def get(self, request):
id = request.query_params.get(‘id‘)
if id is None:
user_info = decodeToken(request)
queryset = getWorkerorder(user_info)
ret = basePaginator(queryset, request, self.page_size, self.serializer_class)
else:
ser = WorkOrderModel.objects.filter(id=id)
ret = WorkOrderSerializer(ser, many=True).data
return Response(ret)
def post(self, request):
res = addWorkerOrder(request)
return Response(res)
class SubOrderView(APIView):
serializer_class = SubOrderSerializer
page_size = 4
def get(self, request):
mainorder = request.query_params.get(‘mainorder‘)
if mainorder is None:
user_info = decodeToken(request)
queryset = getSuborder(user_info)
ret = basePaginator(queryset, request, self.page_size, self.serializer_class)
else:
ser = SubOrderModel.objects.filter(mainorder_id=mainorder)
ret = SubOrderSerializer(ser, many=True).data
return Response(ret)
def post(self, request):
res = chooseSuborder(request)
return Response(res)
# 启动worker
celery -A celery_task worker -l INFO
# 启动beat
celery -A celery_task beat -l INFO
# -*- coding: utf-8 -*-
# from __future__ import absolute_import, unicode_literals
#
import os, sys
CELERY_BASE_DIR = os.path.dirname(os.path.abspath(__file__))
print(sys.path)
from celery import Celery
# celery项目中的所有导包地址, 都是以CELERY_BASE_DIR为基准设定.
# 执行celery命令时, 也需要进入CELERY_BASE_DIR目录执行.
import django
sys.path.insert(0, os.path.join(CELERY_BASE_DIR, ‘../opwf‘))
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "opwf.settings")
# 定义celery实例, 需要的参数, 1, 实例名, 2, 任务发布位置, 3, 结果保存位置
app = Celery(‘celery‘,
broker=‘redis://127.0.0.1:6379/2‘, # 任务存放的地方
backend=‘redis://127.0.0.1:6379/3‘, # 结果存放的地方
include=[‘celery_task.tasks‘, ‘celery_task.tasks_beat‘])
# 由于上面配置了路径,所以导入时需要注意
app.conf.update(
result_expires=3600, #执行结果放到redis里,一个小时没人取就丢弃
)
from celery.schedules import crontab
# 配置定时任务
app.conf.beat_schedule = {
‘add-every-180-seconds‘: {
‘task‘: ‘celery_task.tasks_beat.say_hello‘,
# ‘schedule‘: 180.0,
# 每180秒发送一次任务
‘schedule‘: crontab(hour=00, minute=22, day_of_week=1),
},
}
app.conf.timezone = ‘Asia/Shanghai‘
if __name__ == ‘__main__‘:
app.start()
pip install Django==2.2
pip install celery==4.4.7
pip install redis==3.5.3
# celery.py
# -*- coding: utf-8 -*-
from celery import Celery
import os,sys
import django
# 1.添加django项目根路径
CELERY_BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.join(CELERY_BASE_DIR, ‘../opwf‘))
# 2.添加django环境
os.environ.setdefault("DJANGO_SETTINGS_MODULE","opwf.settings")
django.setup() # 读取配置
# 3.celery基本配置
app = Celery(‘proj‘,
broker=‘redis://localhost:6379/14‘,
backend=‘redis://localhost:6379/15‘,
include=[‘celery_task.tasks‘,
‘celery_task.tasks2‘,
])
# 4.实例化时可以添加下面这个属性
app.conf.update(
result_expires=3600, #执行结果放到redis里,一个小时没人取就丢弃
)
# 5.配置定时任务:每5秒钟执行 调用一次celery_pro下tasks.py文件中的add函数
app.conf.beat_schedule = {
‘add-every-5-seconds‘: {
‘task‘: ‘celery_task.tasks.test_task_crontab‘,
‘schedule‘: 5.0,
‘args‘: (16, 16)
},
}
# 6.添加时区配置
app.conf.timezone = ‘UTC‘
if __name__ == ‘__main__‘:
app.start()
celery.py
# -*- coding: utf-8 -*-
from celery import Celery
import os,sys
import django
# 1.添加django项目根路径
CELERY_BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.join(CELERY_BASE_DIR, ‘../opwf‘))
# 2.添加django环境
os.environ.setdefault("DJANGO_SETTINGS_MODULE","opwf.settings")
django.setup() # 读取配置
# 3.celery基本配置
app = Celery(‘proj‘,
broker=‘redis://localhost:6379/14‘,
backend=‘redis://localhost:6379/15‘,
include=[‘celery_task.tasks‘,
‘celery_task.tasks2‘,
])
# 4.实例化时可以添加下面这个属性
app.conf.update(
result_expires=3600, #执行结果放到redis里,一个小时没人取就丢弃
)
# 5.配置定时任务:每5秒钟执行 调用一次celery_pro下tasks.py文件中的add函数
app.conf.beat_schedule = {
‘add-every-5-seconds‘: {
‘task‘: ‘celery_task.tasks.test_task_crontab‘,
‘schedule‘: 5.0,
‘args‘: (16, 16)
},
}
# 6.添加时区配置
app.conf.timezone = ‘UTC‘
if __name__ == ‘__main__‘:
app.start()
tasks.py
# -*- coding:utf8 -*-
from .celery import app
import time,random
@app.task
def randnum(start,end):
time.sleep(3)
return random.randint(start,end)
tasks2.py
# -*- coding: utf-8 -*-
# utils/rl_sms.py
from ronglian_sms_sdk import SmsSDK
from user.models import User
accId = ‘8a216da8747ac98201749c0de38723b7‘
accToken = ‘86072b540b4648229b27400414150ef2‘
appId = ‘8a216da8747ac98201749c0de45123be‘
def send_message(phone, datas):
user = User.objects.all()[0]
print(user.username, ‘%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%‘)
sdk = SmsSDK(accId, accToken, appId)
tid = ‘1‘ # 测试模板id为: 1. 内容为: 【云通讯】您的验证码是{1},请于{2}分钟内正确输入。
# mobile = ‘13303479527‘
# datas = (‘666777‘, ‘5‘) # 模板中的参数按照位置传递
# resp = sdk.sendMessage(tid, phone, datas)
print("##########################################")
print(‘执行了这个方法 send_message‘)
return ‘‘
def test_crontab(x,y):
print(‘############### 执行test_crontab测试任务 #############‘)
print(‘############### 邮件审批超时提醒 #############‘)
rl_sms.py
# 1.导入任务
from celery_task import tasks
# 2.执行异步任务
tasks.send_sms_code.delay(18538752511,())
# 单线程
celery -A celery_tasj worker -l INFO
# 一次性启动w1,w2两个worker
celery multi start w1 w2 -A celery_task -l INFO
# 查看当前有哪些worker在运行
celery -A celery_pro status
# 停止w1,w2两个worker
celery multi stop w1 w2 -A celery_task
# 项目中启动 celery worker
celery multi start celery_task -A celery_task -l debug --autoscale=50,10 # celery并发数:最多50,最少5
# 在项目中关闭 celery worker
ps auxww|grep "celery worker"|grep -v grep|awk ‘{print $2}‘|xargs kill -9
# 1.普通测试情动celery beat
celery -A celery_task beat -l INFO
# 2.在项目中后台启动celery beat(将日志放入指定位置)
celery -A celery_task beat -l debug >> /aaa/Scheduler.log 2>&1 &
# 3.停止celery beat
ps -ef | grep -E "celery -A celery_task beat" | grep -v grep|awk ‘{print $2}‘ | xargs kill -TERM &>
#!/usr/bin/env bash
source ../env/bin/activate
export DJANGO_SETTINGS_MODULE=celery_test.settings
base_dir=`pwd`
mup_pid() {
echo `ps -ef | grep -E "(manage.py)(.*):8000" | grep -v grep| awk ‘{print $2}‘`
}
start() {
python $base_dir/manage.py runserver 0.0.0.0:8000 &>> $base_dir/django.log 2>&1 &
pid=$(mup_pid)
echo -e "\e[00;31mmup is running (pid: $pid)\e[00m"
}
stop() {
pid=$(mup_pid)
echo -e "\e[00;31mmup is stop (pid: $pid)\e[00m"
ps -ef | grep -E "(manage.py)(.*):8000" | grep -v grep| awk ‘{print $2}‘ | xargs kill -9 &> /dev/null
}
restart(){
stop
start
}
# See how we were called.
case "$1" in
start)
start
;;
stop)
stop
;;
restart)
restart
;;
*)
echo $"Usage: $0 {start|stop|restart}"
exit 2
esac
service.sh
#!/bin/bash
source ../env/bin/activate
export C_FORCE_ROOT="true"
base_dir=`pwd`
celery_pid() {
echo `ps -ef | grep -E "celery -A celery_test worker" | grep -v grep| awk ‘{print $2}‘`
}
start() {
celery multi start celery_test -A celery_test -l debug --autoscale=50,5 --logfile=$base_dir/var/celery-%I.log --pidfile=celery_test.pid
}
restart() {
celery multi restart celery_test -A celery_test -l debug
}
stop() {
celery multi stop celery_test -A celery_test -l debug
}
#restart(){
# stop
# start
#}
# See how we were called.
case "$1" in
start)
start
;;
restart)
restart
;;
stop)
stop
;;
*)
echo $"Usage: $0 {start|stop|restart}"
exit 2
esac
#nohup celery -A celery_test worker -l debug --concurrency=10 --autoreload & >>celery.log
start-celery.sh
#!/bin/bash
#celery 定时任务运行
source ../env/bin/activate
export C_FORCE_ROOT="true"
base_dir=`pwd`
celery_pid() {
echo `ps -ef | grep -E "celery -A celery_test beat" | grep -v grep| awk ‘{print $2}‘`
}
start() {
#django 调度定时任务
#celery -A celery_test beat -l info -S django >> $base_dir/var/celery-cron.log 2>&1 &
celery -A celery_test beat -l debug >> $base_dir/var/Scheduler.log 2>&1 &
sleep 3
pid=$(celery_pid)
echo -e "\e[00;31mcelery is start (pid: $pid)\e[00m"
}
restart() {
pid=$(celery_pid)
echo -e "\e[00;31mcelery is restart (pid: $pid)\e[00m"
ps auxf | grep -E "celery -A celery_test beat" | grep -v grep| awk ‘{print $2}‘ | xargs kill -HUP &> /dev/null
}
stop() {
pid=$(celery_pid)
echo -e "\e[00;31mcelery is stop (pid: $pid)\e[00m"
ps -ef | grep -E "celery -A celery_test beat" | grep -v grep| awk ‘{print $2}‘ | xargs kill -TERM &> /dev/null
}
case "$1" in
start)
start
;;
restart)
restart
;;
stop)
stop
;;
*)
echo $"Usage: $0 {start|stop|restart}"
exit 2
esac
celery-crond.sh
原文:https://www.cnblogs.com/mapel1594184/p/14106696.html