首页 > 编程语言 > 详细

基于queue的python多进程日志

时间:2021-01-28 17:52:28      阅读:42      评论:0      收藏:0      [点我收藏+]

在我们的异常检测应用中,需要对每组IoT设备分别训练一个模型,每个模型对一组设备的指标数据进行实时异常检测。方案采用master-worker的方式实现模型对外服务,但是每个worker的日志需要集中收集到同一个日志文件,而不是每个worker产生一个日志文件。此时我们采用logging模块的QueueHandler+QueueListener的方案,master在创建worker之前,首先创建一个日志队列,创建worker时,将日志队列传入,同时worker端使用QueueHandler作为日志处理例程,将日志写入队列,master端启动Queuelistener对日志队列中的消息进行监听,对接收到的消息调用日志处理例程进行处理(file handler、console handler)。相关代码如下:

master侧的日志相关代码:

  def _init_logger(self):
        self.logger = logging.getLogger("job.manager")
        self.log_queue = Queue()
        self.log_queue_listener = QueueListener(self.log_queue, *self.logger.handlers)

    def _register_signals(self):
        signal.signal(signal.SIGINT, self.stop)
        signal.signal(signal.SIGTERM, self.stop)

    def _start_single_worker(self, worker_id, worker_group, worker_config):
        worker = Worker(
            worker_id=worker_id,
            worker_group=worker_group,
            group_members=worker_config.get(group_members),
            consume_topic=mq_config.get(sample_topic),
            consume_subscription=worker_group + _subscription,
            produce_topic=mq_config.get(result_topic),
            model_path=worker_config.get(model).get(binary_path),
            model_type=worker_config.get(model).get(type),
            log_queue=self.log_queue)
        try:
            worker.start()
        except Exception as e:
            raise Exception("worker start error")

worker侧日志相关代码:

    def __init__(self, worker_id, worker_group, group_members, consume_topic, consume_subscription,
                 produce_topic, model_path, model_type, log_queue):
        self.id = worker_id
        self.group = worker_group,
        self.group_members = group_members
        self.running = True
        self.status = idle
        self.logger = None
        self.mq_client = None
        self.model = None

        self._init_logger(log_queue)
        self._init_mq_client(consume_topic, consume_subscription, produce_topic)
        self._load_model(model_path, model_type)
        self._register_signals()

        self.logger.info(Worker {} started..format(self.id))

    def __del__(self):
        if self.mq_client:
            self.mq_client.close()

    @property
    def status(self):
        with self._mutex:
            return self._status

    @status.setter
    def status(self, status):
        with self._mutex:
            self._status = status

    def _init_logger(self, log_queue):
        queue_handler = QueueHandler(log_queue)
        self.logger = logging.getLogger(job.worker)
        self.logger.addHandler(queue_handler)

    def _register_signals(self):
        signal.signal(signal.SIGINT, self.stop)
        signal.signal(signal.SIGTERM, self.stop)

 

基于queue的python多进程日志

原文:https://www.cnblogs.com/zcsh/p/14338526.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!