import re,datetime,threading,queue
from pathlib import Path
from user_agents import parse
from collections import defaultdict
log=‘‘‘10.1.1.95 - e800 [18/Mar/2005:12:21:42 +0800] \
"GET /stats/awstats.pl?config=e800 HTTP/1.1" 200 899 "http://10.1.1.1/pv/" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; Maxthon)"‘‘‘
log=‘‘‘100.120.194.182 - - [28/Sep/2020:03:46:37 +0800] "GET /c3/manufacturerlogo/002/036/871.jpg HTTP/1.1" \
200 42348 "-" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:36.0) Gecko/20100101 Firefox/36.0"‘‘‘
pattern=‘‘‘(?P<remote>[\d.]{7,}) - - \[(?P<time>[^][]+)\] "(?P<request>[^"]+)" (?P<code>\d+) (?P<size>\d+) \
"(?P<referer>[^"]+)" "(?P<useragent>[^"]+)"\n‘‘‘
regex=re.compile(pattern,flags=re.M|re.I)
funcs={
‘time‘:lambda b:datetime.datetime.strptime(b,‘%d/%b/%Y:%H:%M:%S %z‘),
‘code‘:int,
‘size‘:int,
‘request‘:lambda b:dict(zip((‘method‘,‘uri‘,‘protocol‘),b.split())),
# ‘useragent‘:lambda ua:parse(ua)
‘useragent‘:lambda ua:(parse(ua).browser.family,parse(ua).browser.version_string)
}
def extract(line:str)->dict:
matcher=regex.fullmatch(line)
# print(matcher)
info=None
if matcher:
info={k:funcs.get(k,lambda m:m)(v) for k,v in matcher.groupdict().items()}
return info
def openfile(path:str):
with open(path,mode=‘rt+‘,encoding=‘utf8‘) as f:
for line in f:
d=extract(line)
if d:
yield d
else:
# todo
continue
def load(*path):
for item in path:
p=Path(item)
if not p.exists():
continue
if p.is_dir():
for file in p.iterdir():
if file.is_file():
yield from openfile(str(file))
elif p.is_file():
yield from openfile(str(p))
def size_handler(iterable:list):
vals=[b[‘size‘] for b in iterable]
return sum(vals)/len(vals)
def status_handler(iterable:list):
status={}
for v in iterable:
key=v[‘code‘]
if key not in status:
status.setdefault(key,0)
status[key]+=1
total=sum(status.values())
return {k:v/total*100 for k,v in status.items()}
ua_dict=defaultdict(lambda :0)
# ua_dict={}
def browser_handler(iterable:list):
for item in iterable:
key=item[‘useragent‘]
ua_dict[key]+=1
# ua_dict[key]=ua_dict.get(key,0)+1
return dict(ua_dict)
# ua_dict=defaultdict(lambda :0)
# def browser_handler(iterable:list):
# for item in iterable:
# ua=item[‘useragent‘]
# key=(ua.browser.family,ua.browser.version_string)
# ua_dict[key]+=1
# return ua_dict
def window(src:queue.Queue,handler,width:int,interval:int):
start=datetime.datetime.strptime(‘19700101 010101 +0800‘,r‘%Y%m%d %H%M%S %z‘)
current=datetime.datetime.strptime(‘19700101 010101 +0800‘,r‘%Y%m%d %H%M%S %z‘)
delta=datetime.timedelta(seconds=width-interval)
buffer=[]
while True:
data=src.get(block=True,timeout=None)
if data:
buffer.append(data)
current=data[‘time‘]
if (current-start).total_seconds() >= interval:
ret=handler(buffer)
print(‘{:}‘.format(ret))
start=current
buffer=[p for p in buffer if p[‘time‘] >= current-delta]
def dispatcher(src):
queues=[]
threads=[]
def reg(handler,width,interval):
q=queue.Queue()
queues.append(q)
t=threading.Thread(target=window,args=(q,handler,width,interval))
threads.append(t)
def run():
for t in threads:
t.start()
for v in src:
for p in queues:
p.put(v)
return reg,run
if __name__ == ‘__main__‘:
import sys as _sys
# path=sys.argv[1]
path=‘c:/vbnm.log‘
reg,run=dispatcher(load(path))
reg(status_handler,5,3)
reg(browser_handler,5,5)
run()
user-agents
import user_agents
ua_string1=‘‘‘"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.121 Safari/537.36"‘‘‘
ua_string2=‘Mozilla/5.0 (iPhone; CPU iPhone OS 5_1 like Mac OS X) AppleWebKit/534.46 (KHTML, like Gecko) Version/5.1 Mobile/9B179 Safari/7534.48.3‘
def convert(ua):
u=user_agents.parse(ua)
return u
ua1=convert(ua_string1)
ua2=convert(ua_string2)
print(ua1.os.family,ua1.os.version,ua1.os.version_string)
print(ua2.device,ua2.device.brand,ua2.device.model)
print(ua2.is_mobile)
print(ua1.is_mobile)
print(ua1.is_touch_capable)
print(ua2.is_touch_capable)
print(ua1.is_pc,ua1.is_bot)
https://blog.csdn.net/qq_326324545/article/details/88934225
原文:https://www.cnblogs.com/dissipate/p/13758771.html