首页 > 其他 > 详细

Barrier

时间:2020-10-25 23:25:22      阅读:55      评论:0      收藏:0      [点我收藏+]

  

name     meaning
Barrier(parties,action=None,timeout=None) 构建Barrier对象,指定参与方数目,timeout是wait方法未指定超时的默认值
n_waiting 当前在屏障中等待的线程数
parties 各方数,需要多少个等待
wait(timeout=None) 等待通过屏障,返回 0 到 线程数 -1 的 integer,每个线程返回不同,如果wait方法设置了超时,并触发超时发送,屏障处于broken状态

 

import threading,logging
logging.basicConfig(level=logging.DEBUG,format=%(asctime)s %(threadName)s %(message)s)

def worker(barrier:threading.Barrier):
    logging.info(n_waiting= {}.format(barrier.n_waiting))
    try:
        bid=barrier.wait(timeout=2)
        logging.info(after barrier.format())
    except threading.BrokenBarrierError:
        logging.info(Broken Barrier in {}.format(threading.current_thread().name))

barrier=threading.Barrier(3,)
# barrier=threading.Barrier(3,timeout=3)

for b in range(5): # try 3 4 5
    threading.Thread(target=worker,name=worker-{}.format(b),args=(barrier,)).start()

 

name       implication
broken             如果屏障处于broken状态,返回True
abort() 将屏障至于broken状态,等待中的线程或者调用等待方法的线程都会抛出BrokenBarrierError异常,知道reset方法恢复屏障
reset() 恢复屏障,重新开始拦截

 

import threading,logging
logging.basicConfig(level=logging.DEBUG,format=%(asctime)s %(threadName)s %(message)s)

def worker(barrier:threading.Barrier):
    logging.info(n_waiting= {}.format(barrier.n_waiting))
    try:
        # bid=barrier.wait(timeout=2)
        bid=barrier.wait()
        logging.info(after barrier {}.format(bid))
    except threading.BrokenBarrierError:
        logging.info(Broken Barrier in {}.format(threading.current_thread().name))

barrier=threading.Barrier(3,)
# barrier=threading.Barrier(3,timeout=3)

for b in range(5): # try 3 4 5
    threading.Thread(target=worker,name=worker-{}.format(b),args=(barrier,)).start()

print(barrier.broken,111111111111111)
threading.Event().wait(2)
barrier.abort()
print(barrier.broken,222222222222222222)
barrier.reset()
print(barrier.broken,3333333333333333333)

技术分享图片

 

 

import threading,logging
FORMAT=%(asctime)-15s\t [ %(threadName)s %(thread)8d ] %(message)s
logging.basicConfig(format=FORMAT,level=logging.WARNING)

def worker(barrier:threading.Barrier):
    logging.error(waiting threads: {}.format(barrier.n_waiting))
    try:
        barrier_id=barrier.wait()
        logging.critical(my barrier id: {}.format(barrier_id))
    except threading.BrokenBarrierError:
        logging.error(Broken Barrier)

barrier=threading.Barrier(3)

for b in range(0,9):
    if b==2:
        barrier.abort()
    elif b==6:
        barrier.reset()
    threading.Event().wait(1)
    threading.Thread(target=worker,name=work-{}.format(b),args=(barrier,)).start()

 

wait方法超时实例

如果wait方法超时发生,屏障将处于broken状态知道reset

import threading,logging
FORMAT=%(asctime)-15s\t[ %(threadName)s %(thread)8d ] %(message)s
logging.basicConfig(level=logging.INFO,format=FORMAT)

def w(barrier:threading.Barrier,i:int):
    logging.info(waiting threads: {}.format(barrier.n_waiting))
    try:
        logging.info(barrier.broken)
        if i<3:
            barrier_id=barrier.wait(1)
        else:
            if i==6:
                barrier.reset()
            barrier_id=barrier.wait()
        logging.info(barrier id: {}.format(barrier_id))
    except threading.BrokenBarrierError:
        logging.info(Broken Barrier)

barrier=threading.Barrier(3)

for b in range(0,9):
    threading.Event().wait(2)
    threading.Thread(target=w,name=w-{}.format(b),args=(barrier,b)).start()

Barrier应用:

并发初始化

所有线程都必须初始化完成后才能继续工作,例如运行前加载数据,检查,如果这些工作没完成,就开始运行,将不能正常工作

10个线程做10种准备工作,每个线程负责一种工作,只有这10个线程都完成后,才能继续工作,先完成的要等待后完成的线程

例如,启动一个程序,要先加载磁盘文件,缓存预热,初始化连接池等,这些工作可以并发,但是只有全部准备工作完成后,才能继续,假设,数据库连接失败,则初始化工作失败,需abort(),屏障broken,所有线程收到异常退出

 

Barrier

原文:https://www.cnblogs.com/dissipate/p/13875884.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!