先在utils.py文件中定义一个工具类
1 # utils.py 2 3 import os 4 import threading 5 import codecs 6 7 8 class WriteLog(threading.Thread): 9 def __init__(self, logName): 10 super(WriteLog, self).__init__() 11 self.logName = logName 12 self.lock = threading.Lock() 13 self.contexts = [] 14 self.mkfile() 15 16 def mkfile(self): 17 if not os.path.exists(self.logName): 18 with codecs.open(self.logName, ‘w‘) as f: 19 f.write("This file is log for {0}\n".format(self.logName)) 20 21 def write(self, context): 22 self.contexts.append(context) 23 24 def run(self): 25 while 1: 26 self.lock.acquire() 27 if len(self.contexts) != 0: 28 with codecs.open(self.logName, "a") as f: 29 for context in self.contexts: 30 f.write(context) 31 del self.contexts[:] 32 self.lock.release()
在写主程序main.py,来实现将日志记录到main.log文件中
1 # main.py 2 3 from utils import WriteLog 4 import time 5 import sys 6 # reload(sys) # 这是python2的写法(进行编码转换),但是在python3中这个需要已经不存在了,这么做也不会什么实际意义。 7 # sys.setdefaultencoding("utf-8") 8 9 10 def Xh(): 11 for i in range(100): 12 print(i) 13 # 相当于sys.stdout.write(str(i)) 14 time.sleep(0.1) 15 16 17 def main(): 18 writeLog = WriteLog("main.log") 19 writeLog.start() 20 sys.stdout = writeLog 21 sys.stderr = writeLog 22 Xh() 23 24 25 if __name__ == "__main__": 26 main()
最终生成的日志文件main.log如下:
1 This file is log for main.log 2 0 3 1 4 2 5 3 6 4 7 5 8 6 9 7 10 8 11 9 12 10 13 11 14 12 15 13 16 14 17 15 18 16 19 17 20 18 21 19 22 20 23 21 24 22 25 23 26 24 27 25 28 26 29 27 30 28 31 29 32 30 33 31 34 32 35 33 36 34 37 35 38 36 39 37 40 38 41 39 42 40 43 41 44 42 45 43 46 44 47 45 48 46 49 47 50 48 51 49 52 50 53 51 54 52 55 53 56 54 57 55 58 56 59 57 60 58 61 59 62 60 63 61 64 62 65 63 66 64 67 65 68 66 69 67 70 68 71 69 72 70 73 71 74 72 75 73 76 74 77 75 78 76 79 77 80 78 81 79 82 80 83 81 84 82 85 83 86 84 87 85 88 86 89 87 90 88 91 89 92 90 93 91 94 92 95 93 96 94 97 95 98 96 99 97 100 98 101 99
先写一个utils2.py
1 from threading import Thread, Lock 2 import codecs 3 import os 4 5 6 class TraceLog(Thread): 7 def __init__(self, logName): 8 super(TraceLog, self).__init__() 9 self.logName = logName 10 self.lock = Lock() 11 self.contexts = [] 12 self.isFile() 13 14 def isFile(self): 15 if not os.path.exists(self.logName): 16 with codecs.open(self.logName, ‘w‘) as f: 17 f.write("this log name is: {0}\n".format(self.logName)) 18 f.write("start log\n") 19 20 def write(self, context): 21 self.contexts.append(context) 22 23 def run(self): 24 while 1: 25 self.lock.acquire() 26 if len(self.contexts) !=0: 27 with codecs.open(self.logName, "a") as f: 28 for context in self.contexts: 29 f.write(context) 30 del self.contexts[:] # 注意不能忘记清空 31 self.lock.release()
再在main2.py中定义写主函数
1 ‘‘‘ 2 不断记录服务端输入的日志 3 实现>> 和> 4 ‘‘‘ 5 6 7 8 from utils2 import TraceLog 9 import time 10 import sys 11 12 class Server(object): 13 def printLog(self): 14 print("start server\n") 15 for i in range(100): 16 print(i) 17 time.sleep(0.1) 18 print("end server\n") 19 20 21 22 if __name__ == ‘__main__‘: 23 traceLog = TraceLog("main2.log") 24 traceLog.start() 25 sys.stdout = traceLog 26 sys.stderr = traceLog 27 server = Server() 28 server.printLog() 29 30 # 每当调用print的时候,底层就是在代用sys.stdout.write(str) 31 # sys.stdout.write() = traceLog.write()
生成日志文件main2.log如下:
1 this log name is: main2.log 2 start log 3 start server 4 5 0 6 1 7 2 8 3 9 4 10 5 11 6 12 7 13 8 14 9 15 10 16 11 17 12 18 13 19 14 20 15 21 16 22 17 23 18 24 19 25 20 26 21 27 22 28 23 29 24 30 25 31 26 32 27 33 28 34 29 35 30 36 31 37 32 38 33 39 34 40 35 41 36 42 37 43 38 44 39 45 40 46 41 47 42 48 43 49 44 50 45 51 46 52 47 53 48 54 49 55 50 56 51 57 52 58 53 59 54 60 55 61 56 62 57 63 58 64 59 65 60 66 61 67 62 68 63 69 64 70 65 71 66 72 67 73 68 74 69 75 70 76 71 77 72 78 73 79 74 80 75 81 76 82 77 83 78 84 79 85 80 86 81 87 82 88 83 89 84 90 85 91 86 92 87 93 88 94 89 95 90 96 91 97 92 98 93 99 94 100 95 101 96 102 97 103 98 104 99 105 end server
写一个celery配置文件celeryconfig.py
1 from kombu import Queue, Exchange 2 3 BROKER_URL = "redis://192.168.48.136:6379/1" 4 CELERY_RESULT_BACKEND = "redis://192.168.48.136:6379/2" 5 6 7 CELERY_QUEUES = { 8 Queue("default",Exchange("default"),routing_key="default"), 9 Queue("for_task_A",Exchange("for_task_A"),routing_key="for_task_A"), 10 Queue("for_task_B",Exchange("for_task_B"),routing_key="for_task_B") 11 } 12 13 CELERY_ROUTES = { 14 "demon3.taskA":{"queue":"for_task_A","routing_key":"for_task_A"}, 15 "demon3.taskB":{"queue":"for_task_B","routing_key":"for_task_B"} 16 } 17 CELERY_TIMEZONE = ‘UTC‘ 18 CELERYBEAT_SCHEDULE = { 19 ‘taskA_schedule‘ : { 20 ‘task‘:‘demon3.taskA‘, 21 ‘schedule‘:2, 22 ‘args‘:(5,6) 23 }, 24 ‘taskB_scheduler‘ : { 25 ‘task‘:"demon3.taskB", 26 "schedule":10, 27 "args":(10,20,30) 28 }, 29 ‘add_schedule‘: { 30 "task":"demon3.add", 31 "schedule":5, 32 "args":(1,2) 33 } 34 }
编写tasks.py
1 from celery import Celery 2 3 app = Celery() 4 app.config_from_object("celeryconfig") 5 6 @app.task 7 def taskA(x, y): 8 return x*y 9 10 @app.task 11 def taskB(x, y, z): 12 return x+y+z 13 14 @app.task 15 def add(x, y): 16 return x+y
编写demon.py
1 from celery import Celery 2 3 app = Celery() 4 app.config_from_object("celeryconfig") 5 6 @app.task 7 def taskA(x, y): 8 return x*y 9 10 @app.task 11 def taskB(x, y, z): 12 return x+y+z 13 14 @app.task 15 def add(x, y): 16 return x+y
编写test.py
1 import time 2 3 from demon import * 4 5 re1 = taskA.delay(10, 20) 6 7 re2 = taskB.delay(5, 15, 25) 8 9 re3 = add.delay(8, 18) 10 11 time.sleep(1) 12 print(re1.result) 13 print(re2.result) 14 print(re3.status) 15 print(re3.result)
原文:https://www.cnblogs.com/karl-python/p/9097926.html