首页 > 其他 > 详细

时间窗口函数

时间:2020-10-02 00:18:27      阅读:48      评论:0      收藏:0      [点我收藏+]

  

import datetime,random,time
def source():
    while True:
        yield {value:random.randint(1,100),datetime:datetime.datetime.now(datetime.timezone(datetime.timedelta(seconds=28800)))}
        time.sleep(1)

def window(src,handler,width:int,interval:int):
    ‘‘‘
    window function
    :param src: generator
    :param handler: deal data
    :param width: s
    :param interval: s
    :return:
    ‘‘‘
    start=datetime.datetime.strptime(20170101 00:00:00 +0800,%Y%m%d %H:%M:%S %z)
    current=datetime.datetime.strptime(20170101 01:00:00 +0800,%Y%m%d %H:%M:%S %z)
    delta=datetime.timedelta(seconds=width-interval)
    buffer=[]
    while True:
        data=next(src)
        if data:
            buffer.append(data)
            current=data[datetime]

        if (current-start).total_seconds() >= interval:
            ret=handler(buffer)
            print("ret: {:.2f}\tlen(buffer): {}".format(ret,len(buffer)))
            start=current
            buffer=[b for b in buffer if b[datetime] > current-delta]

def handler(iterable):
    vals=[b[value] for b in iterable]
    return sum(vals)/len(vals)

window(source(),handler,10,5)

 

import re,datetime
log=‘‘‘10.1.1.95 - e800 [18/Mar/2005:12:21:42 +0800] \
"GET /stats/awstats.pl?config=e800 HTTP/1.1" 200 899 "http://10.1.1.1/pv/" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; Maxthon)"‘‘‘

pattern=‘‘‘(?P<remote>[\d.]{7,}) (?P<logname>[\w-]+) (?P<username>[\w-]+) \[(?P<datetime>[^][]+)\] "(?P<request>[^"]+)" (?P<status>\d{3}) \
(?P<size>\d+) "(?P<referer>[^"]+)" "(?P<useragent>[^"]+)"‘‘‘
regex=re.compile(pattern,flags=re.M)

funcs={
    datetime:lambda m:datetime.datetime.strptime(m,r%d/%b/%Y:%H:%M:%S %z),
    status:int,
    size:int,
    request:lambda m:dict(zip((request,uri,protocol),(m.split())))
}

def extract(line):
    matcher=regex.match(line)
    if matcher:
        return {k:funcs.get(k,lambda b:b)(v) for k,v in matcher.groupdict().items()}

def load(path:str):
    with open(path) as f:
        for line in f:
            d=extract(line)
            if d:
                yield d
            else:
                continue
def window(src,handler,width:int,interval:int):
    start=datetime.datetime.strptime(1970/01/01 01:01:01 +0800, %Y/%m/%d %H:%M:%S %z)
    current=datetime.datetime.strptime(1970-01-01 01:01:01 +0800,%Y-%m-%d %H:%M:%S %z)
    delta=datetime.timedelta(seconds=width-interval)
    buffer=[]
    for x in src:
        if x:
            buffer.append(x)
            current=x[datetime]
        if (current-start).total_seconds() >= interval:
            ret=handler(buffer)
            print({:}.format(ret))
            start=current
            buffer=[p for p in buffer if p[datetime]>current-delta]
def no_handler(iterable:list):
    return iterable
def handler(iterable:list):
    # todo
    return sum(iterable)/len(iterable)
def size_handler(iterable:list):
    # todo
    pass

window(load(c:/vbnm.log),no_handler,7,5)

 

import queue,datetime,time,random,threading

def source():
    while True:
        yield {value:random.randint(1,100),datetime:datetime.datetime.now(datetime.timezone(datetime.timedelta(seconds=28800)))}
        time.sleep(1)

def window(src:queue.Queue,handler,width:int,interval:int):
    ‘‘‘
    window function
    :param src:
    :param handler:
    :param windows:
    :param interval:
    :return:
    ‘‘‘

    start=datetime.datetime.strptime(20170101 010101 +0800,r%Y%m%d %H%M%S %z)
    current=datetime.datetime.strptime(20170101 010101 +0800,r%Y%m%d %H%M%S %z)
    delta=datetime.timedelta(seconds=width-interval)
    buffer=[]
    while True:
        data=src.get()
        if data:
            buffer.append(data)
            current=data[datetime]
        if (current-start).total_seconds() >= interval:
            ret=handler(buffer)
            print({:.2f}.format(ret))
            start=current
            buffer=[p for p in buffer if p[datetime] >= current-delta]

def handler(iterable):
    vals=[p[value] for p in iterable]
    return sum(vals)/len(vals)

def dispatcher(src):
    queues=[]
    threads=[]

    def reg(handler,width,interval):
        q=queue.Queue()
        queues.append(q)
        t=threading.Thread(target=window,args=(q,handler,width,interval))
        threads.append(t)

    def run():
        for t in threads:
            t.start()
        for b in src:
            for p in queues:
                p.put(b)
    return reg,run
reg,run=dispatcher(source())

reg(handler,5,3)
reg(handler,5,3)
run()

 

时间窗口函数

原文:https://www.cnblogs.com/dissipate/p/13758738.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!