在做基于B/S应用中,经常有需要后台运行任务的需求,最简单比如发送邮件。在一些如防火墙,WAF等项目中,前台只是为了展示内容与各种参数配置,后台守护进程才是重头戏。所以在防火墙配置页面中可能会经常看到调用cgi,但真正做事的一般并不是cgi,比如说执行关机命令,他们的逻辑如下:
(ps:上图所说的前台界面包含通常web开发中的后端,不然也没有socket一说)
class MgrService(win32serviceutil.ServiceFramework): """ Usage: ‘python topmgr.py install|remove|start|stop|restart‘ """ #服务名 _svc_name_ = "Mgr" #服务显示名称 _svc_display_name_ = "Daemon Mgr" #服务描述 _svc_description_ = "Daemon Mgr" def __init__(self, args): win32serviceutil.ServiceFramework.__init__(self, args) self.hWaitStop = win32event.CreateEvent(None, 0, 0, None) def SvcDoRun(self): self.ReportServiceStatus(win32service.SERVICE_START_PENDING) INFO("mgr startting...") self.ReportServiceStatus(win32service.SERVICE_RUNNING) self.start() # 等待服务被停止 INFO("mgr waitting...") win32event.WaitForSingleObject(self.hWaitStop, win32event.INFINITE) INFO("mgr end") def SvcStop(self): self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING) INFO("mgr stopping...") self.stop() INFO("mgr stopped") # 设置事件 win32event.SetEvent(self.hWaitStop) self.ReportServiceStatus(win32service.SERVICE_STOPPED) def start(self): pass def stop(self): pass很简单,这样就实现了windows中的服务,也就是说脱离终端,运行于后台。INFO等函数只是简单的记录作用,可直接忽略。
class Engine(MgrService): rbufsize = -1 wbufsize = 0 def start(self): INFO(‘wait connection‘) self.server = StreamServer((HOST, PORT), self.msg_handle) self.server.serve_forever() def msg_handle(self,socket,address): try: rfile = socket.makefile(‘rb‘, self.rbufsize) wfile = socket.makefile(‘wb‘, self.wbufsize) headers = Message(rfile).dict INFO(‘get a connection from:%s,headers:%s‘ % (str(address), headers)) if ‘module‘ in headers and headers[‘module‘] in MODULES: MODULES[headers[‘module‘]].handle(wfile, headers) except Exception: ERROR(‘msg_handle exception,please check‘) def stop(self): if hasattr(self, server): self.server.stop()当有新连接到来,由msg_handle处理,首先读取发送来的消息,消息格式使用了最简单的http的格式,即(键名:键值)的格式,你要问我为什么采用这个格式,哈哈,格式简单,python有现成的库解析。
MODULES = { # module: handle module class } def module_register(module_name, handle_class): if module_name in MODULES: WARN(‘duplicate module_name:‘ + module_name) else: MODULES[module_name] = handle_class
class Module(object): SECRE_KEY = "YI-LUO-KEHAN" MODULE_NAME = "BASE_MODULE" PREFIX = "do_" # method prefix def __init__(self, wfile, headers): self.wfile = wfile self.headers = headers def __getattr__(self, name): try: return self.headers[name] except Exception: ERROR("%s has no attr:%s,please check" %(self.MODULE_NAME, name)) @classmethod def handle(cls, wfile, headers): module_obj = cls(wfile, headers) module_obj.schedule_default() def verify(self): if hmac.new(self.SECRE_KEY, self.MODULE_NAME).hexdigest() == self.signature: return True else: WARN("client verify failed,signature:%s" % str(self.signature)) def schedule_default(self): err_code = 0 if self.verify() and self.action: func_name = self.PREFIX + self.action try: getattr(self, func_name)() except AttributeError: err_code = 1 ERROR("%s has no method:%s" %(self.MODULE_NAME, func_name)) except Exception: err_code = 2 ERROR("module:%s,method:%s,exception" % (self.MODULE_NAME, func_name)) else: err_code = 3 if err_code: self.send_error({‘err_code‘:err_code}) def send_success(self, msg=‘‘): data = {‘success‘:True,‘msg‘:msg} self.wfile.write(json.dumps(data)) def send_error(self, msg=‘‘): data = {‘success‘:False,‘msg‘:msg} self.wfile.write(json.dumps(data))
TASK = {} # task_id: pid class ScanModule(Module): MODULE_NAME = "SCAN_MODULE" def do_start(self): self.send_success(‘start ok‘) DEBUG(‘------------task start------------‘) task_ids = [int(task_id) for task_id in self.task_ids.split(‘,‘) if int(task_id) not in TASK] for task_id in task_ids: try: cmd = ‘python scan.py -t %s‘ % task_id DEBUG(cmd) self.sub = Popen(cmd, shell=True, cwd=CWD) pid = int(self.sub.pid) TASK[task_id] = pid INFO(‘%s start a new task,task_id:%s,pid:%s‘ %(self.MODULE_NAME, task_id, pid)) except Exception: ERROR(‘%s start a new task,task_id:%s failed‘ % (self.MODULE_NAME, task_id)) def do_stop(self): self.send_success(‘stop ok‘) DEBUG(‘------------task stop------------‘) task_ids = [int(task_id) for task_id in self.task_ids.split(‘,‘) if int(task_id) in TASK] for task_id in task_ids: pid = TASK.pop(task_id) try: INFO(‘%s stop a new task,task_id:%s,pid:%s‘ %(self.MODULE_NAME, task_id, pid)) call([‘taskkill‘, ‘/F‘, ‘/T‘, ‘/PID‘, str(pid)]) except Exception: ERROR(‘%s taskkill a task failed,task_id:%s,pid:%s‘ %(self.MODULE_NAME, task_id, pid)) module_register(ScanModule.MODULE_NAME, ScanModule)
#!/usr/bin/env python #-*-encoding:UTF-8-*- import hmac import gevent from gevent import monkey monkey.patch_socket() addr = (‘localhost‘, 6667) def send_request(module_name,request_headers): SECRE_KEY = "YI-LUO-KEHAN" socket = gevent.socket.socket() socket.connect(addr) request_headers[‘module‘] = module_name request_headers[‘signature‘] = hmac.new(SECRE_KEY, module_name).hexdigest() h = ["%s:%s" %(k, v) for k,v in request_headers.iteritems()] h.append(‘\n‘) request = ‘\n‘.join(h) socket.send(request) print socket.recv(8192) socket.close() if __name__ =="__main__": import sys if sys.argv[1] == ‘start‘: send_request(‘SCAN_MODULE‘,{‘action‘:‘start‘,‘task_ids‘:‘1‘}) else: send_request(‘SCAN_MODULE‘,{‘action‘:‘stop‘,‘task_ids‘:‘1‘})
[Python网络编程]浅析守护进程后台任务的设计与实现,布布扣,bubuko.com
原文:http://blog.csdn.net/yueguanghaidao/article/details/35572811