在我们的异常检测应用中,需要对每组IoT设备分别训练一个模型,每个模型对一组设备的指标数据进行实时异常检测。方案采用master-worker的方式实现模型对外服务,但是每个worker的日志需要集中收集到同一个日志文件,而不是每个worker产生一个日志文件。此时我们采用logging模块的QueueHandler+QueueListener的方案,master在创建worker之前,首先创建一个日志队列,创建worker时,将日志队列传入,同时worker端使用QueueHandler作为日志处理例程,将日志写入队列,master端启动Queuelistener对日志队列中的消息进行监听,对接收到的消息调用日志处理例程进行处理(file handler、console handler)。相关代码如下:
master侧的日志相关代码:
def _init_logger(self): self.logger = logging.getLogger("job.manager") self.log_queue = Queue() self.log_queue_listener = QueueListener(self.log_queue, *self.logger.handlers) def _register_signals(self): signal.signal(signal.SIGINT, self.stop) signal.signal(signal.SIGTERM, self.stop) def _start_single_worker(self, worker_id, worker_group, worker_config): worker = Worker( worker_id=worker_id, worker_group=worker_group, group_members=worker_config.get(‘group_members‘), consume_topic=mq_config.get(‘sample_topic‘), consume_subscription=worker_group + ‘_subscription‘, produce_topic=mq_config.get(‘result_topic‘), model_path=worker_config.get(‘model‘).get(‘binary_path‘), model_type=worker_config.get(‘model‘).get(‘type‘), log_queue=self.log_queue) try: worker.start() except Exception as e: raise Exception("worker start error")
worker侧日志相关代码:
def __init__(self, worker_id, worker_group, group_members, consume_topic, consume_subscription, produce_topic, model_path, model_type, log_queue): self.id = worker_id self.group = worker_group, self.group_members = group_members self.running = True self.status = ‘idle‘ self.logger = None self.mq_client = None self.model = None self._init_logger(log_queue) self._init_mq_client(consume_topic, consume_subscription, produce_topic) self._load_model(model_path, model_type) self._register_signals() self.logger.info(‘Worker {} started.‘.format(self.id)) def __del__(self): if self.mq_client: self.mq_client.close() @property def status(self): with self._mutex: return self._status @status.setter def status(self, status): with self._mutex: self._status = status def _init_logger(self, log_queue): queue_handler = QueueHandler(log_queue) self.logger = logging.getLogger(‘job.worker‘) self.logger.addHandler(queue_handler) def _register_signals(self): signal.signal(signal.SIGINT, self.stop) signal.signal(signal.SIGTERM, self.stop)
原文:https://www.cnblogs.com/zcsh/p/14338526.html