#-*-coding=utf-8-*-from __future__ import absolute_importfrom celery import Celeryfrom kombu import Queueapp = Celery("proj",broker = "redis://10.121.76.204:17016/1",include = [‘proj.hotplay_task‘])app.conf.update(CELERY_DEFAULT_QUEUE = ‘hotplay_sh_default_queue‘,#CELERY_QUEUES = (Queue(‘hotplay_jy_queue‘),), #该队列是给server2用的,并不需要在这里申明)
from __future__ import absolute_importimport sysimport osimport hashlibimport timeimport subprocessfrom proj.celery import appreload(sys)sys.setdefaultencoding(‘utf-8‘)sys.path.append(os.path.join(os.path.dirname(__file__), "./"))HOTPLAY_CATCHUP_DIR = ‘/home/uaa/prog/hotplay_v2/online_task/catch_up‘@app.task(bind=True)def do_init_catchup(self, user_name, album_id, album_name, channel_name):print ‘start to init catch up of user %s album %s:%s in channel %s‘%(user_name, album_id, album_name, channel_name)job_args = ‘source %s/init_catch_up.sh %s %s %s %s > ./logs/%s_%s.log‘%(HOTPLAY_CATCHUP_DIR, user_name, album_id, album_name, channel_name, album_id, user_name)print ‘job_args:‘, job_argsP = subprocess.Popen(job_args,shell=True)rt_code = P.wait()if rt_code == 0:print ‘job success...‘else:print ‘job error:%d‘%(rt_code)# print ‘job error:%d, will retry in 5 min‘%(rt_code)# raise self.retry(countdown=300)@app.task(bind=True)def do_catchup(self, hotplay_id, start_dt, end_dt):print ‘start to catch up of %s:%s-%s‘%(hotplay_id, start_dt, end_dt)job_args = ‘source %s/catch_up_all_run.sh %s %s %s > ./logs/%s.log 2>&1‘%(HOTPLAY_CATCHUP_DIR, hotplay_id, start_dt, end_dt, hotplay_id)print ‘job_args:‘, job_argsP = subprocess.Popen(job_args,shell=True)rt_code = P.wait()if rt_code == 0:print ‘job success...‘else:print ‘job error:%d‘%(rt_code)# print ‘job error:%d, will retry in 5 min‘%(rt_code)# raise self.retry(countdown=300)
nohup celery -A proj worker -n hotplay_default_worker -c 3 -Q hotplay_sh_default_queue -l info &
#-*-coding=utf-8-*-from __future__ import absolute_importimport sysimport osimport tornado.webimport tornado.ioloopimport tornado.httpserverfrom celery.execute import send_taskfrom proj.hotplay_task import do_init_catchup, do_catchupreload(sys)sys.setdefaultencoding(‘utf-8‘)TORNADO_SERVER_PORT=10501class InitCatchupHandler(tornado.web.RequestHandler):def get(self, path):user_name = self.get_argument("user_name", None)album_id = self.get_argument("album_id",None)album_name = self.get_argument("album_name",None)channel_name = self.get_argument("channel_name", None)print "request user_name+album_id+album_name+channel_name:%s+%s_%s+%s"%(user_name, album_id, album_name, channel_name)if album_id == ‘0‘:self.write(‘test tornado server init catch up handler. sucess. just return\n‘)returntry:self.write("0")do_init_catchup.delay(user_name, album_id, album_name, channel_name)except:self.write("-1")class DoCatchupHandler(tornado.web.RequestHandler):def get(self, path):hotplay_id = self.get_argument("hotplay_id",None)start_dt = self.get_argument("start_dt",None)end_dt = self.get_argument("end_dt",None)print "request hotplay_id+start_dt+end_dt:%s+%s+%s"%(hotplay_id, start_dt, end_dt)if hotplay_id == ‘0‘:self.write(‘test tornado server catch up handler. sucess. just return\n‘)returntry:self.write("0")do_catchup.delay(hotplay_id, start_dt, end_dt)except:self.write("-1")class DoCatchupJYHandler(tornado.web.RequestHandler):def get(self, path):hotplay_id = self.get_argument("hotplay_id",None)start_dt = self.get_argument("start_dt",None)end_dt = self.get_argument("end_dt",None)print "request jy hotplay_id+start_dt+end_dt:%s+%s+%s"%(hotplay_id, start_dt, end_dt)#if hotplay_id == ‘0‘:# self.write(‘test tornado server catch up handler. sucess. just return\n‘)# returnsend_task(‘tasks.test1‘, args=[hotplay_id, start_dt, end_dt], queue=‘hotplay_jy_queue‘) #tasks.test1是server2上celery任务函数的file_name.func_name#file_name是任务函数所在文件相对于celery worker的路径#try:# self.write("0")# do_catchup.delay(hotplay_id, start_dt, end_dt)#except:# self.write("-1")application = tornado.web.Application([(r"/init_catchup/(.*)", InitCatchupHandler),(r"/do_catchup/(.*)", DoCatchupHandler),(r"/do_catchup_jy/(.*)", DoCatchupJYHandler),],template_path = "template", static_path="static")if __name__ == ‘__main__‘:http_server = tornado.httpserver.HTTPServer(application)http_server.listen(TORNADO_SERVER_PORT)tornado.ioloop.IOLoop.instance().start()
#-*-coding=utf-8-*-from __future__ import absolute_importfrom celery import Celeryfrom kombu import Queueapp = Celery("test",broker = "redis://10.121.76.204:17016/1"# include = [‘test.tasks‘])app.conf.update(CELERY_DEFAULT_QUEUE = ‘hotplay_sh_default_queue‘, #可省略,但不能和server1的配置不一样CELERY_QUEUES = (Queue(‘hotplay_jy_queue‘),),)@app.task()def test1(hotplay_id, start_dt, end_dt): #注意,名字要和tornado_server中send_task()函数用的func_name名字一样print ‘hotplay_id is %s, stat from %s to %s‘%(hotplay_id, start_dt, end_dt)
celery -A tasks worker -n hotplay_jy_worker -c 2 -Q hotplay_jy_queue -l info
from celery.executeimport
send_taskfrom proj.hotplay_taskimport
do_init_catchup, do_catchup版权声明:本文为博主原创文章,未经博主允许不得转载。
原文:http://blog.csdn.net/vintage_1/article/details/47664187