首页 > 编程语言 > 详细

python标准库--并发编程

时间:2021-05-05 17:47:43      阅读:24      评论:0      收藏:0      [点我收藏+]

进程和线程概念

  1. 什么是进程?or 线程?

    进程:一个任务就是一个进程,一个进程可以有多个线程
    ‘线程:把进程内的子任务称为线程,线程是进程的最小单元。
    
  2. 什么是同步?or 异步?

    同步:就是一个任务的完成需要依赖另外一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,这是一种可靠的任务序列。要么成功都成功,失败都失败,两个任务的状态可以保持一致。
    ‘异步:是不需要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工作,依赖的任务也立即执行,只要自己完成了整个任务就算完成了。至于被依赖的任务最终是否真正完成,依赖它的任务无法确定,所以它是不可靠的任务序列。
    
  3. 什么是阻塞?or 非阻塞?

    阻塞/非阻塞:阻塞和非阻塞这两个概念与程序(线程)等待消息通知(无所谓同步或者异步)时的状态有关。也就是说阻塞与非阻塞主要是程序(线程)等待消息通知时的状态角度来说的
    
  4. 什么是并发?or 并行?

    #并行:是指多个任务同时执行,比如两个男人同时在给自己女朋友发微信。
    ‘并发:并发是多个任务交替轮流使用资源,比如一个男人在给他7个女朋友发微信,只要他发的够快,宏观上来说他在同时聊7个人。
    
  5. 死锁?可重用锁?

    1. 死锁:是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程;解决:使用可重用锁
      
    2. #可重用锁:在Python中为了支持在同一线程中多次请求同一资源,提供了可重入锁RLock。这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。
      
  6. 全局解释器锁(GIL):

    CPython 解释器所采用的一种机制,它确保同一时刻只有一个线程在执行 Python bytecode。此机制通过设置对象模型(包括 dict 等重要内置类型)针对并发访问的隐式安全简化了 CPython 实现。给整个解释器加锁使得解释器多线程运行更方便,其代价则是牺牲了在多处理器上的并行性。
    
    不过,某些标准库或第三方库的扩展模块被设计为在执行计算密集型任务如压缩或哈希时释放 GIL。此外,在执行 I/O 操作时也总是会释放 GIL。
    
    创建一个(以更精细粒度来锁定共享数据的)“自由线程”解释器的努力从未获得成功,因为这会牺牲在普通单处理器情况下的性能。据信克服这种性能问题的措施将导致实现变得更复杂,从而更难以维护。
    
  7. 信号量(BoundedSemaphore类) :互斥锁同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据,

multiprocess模块简介

  • multiprocessing模块就是跨平台版本的多进程模块。

Process类

  • 模块:multiprocess模块

  • 使用Process类: from multiprocess import Process

  • multiprocessing 中,通过创建一个 Process 对象(进程对象表示在单独进程中运行的活动)然后调用它的 start() 方法来生成进程。

  • class Process():创建进程对象

  • 语法:

    class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
    
    注意:
    1.需要使用关键字的方式来指定参数
    2.args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号。
    
  • 参数说明:应始终使用关键字参数调用构造函数。

    • group:应该始终是 None
    • target:是由 run() 方法调用的可调用对象。它默认为 None ,意味着什么都没有被调用。
    • name:是进程名称
    • args:是目标调用的参数元组
    • kwargs:是目标调用的关键字参数字典。如果提供,则键参数 daemon 将进程 daemon 标志设置为 TrueFalse 。如果是 None (默认值),则该标志将从创建的进程继承。
  • 注意:如果子类重写构造函数,它必须确保它在对进程执行任何其他操作之前调用基类构造函数( Process.__init__() ),这句话的意思是如果创建类继承Process类,就必须调用Process.__init__()

Process类方法

  • p.start():启动进程,并调用该子进程中的p.run()

  • p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法

  • p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁

    注意:如果在关联进程使用管道或队列时使用此方法,则管道或队列可能会损坏,并可能无法被其他进程使用。类似地,如果进程已获得锁或信号量等,则终止它可能导致其他进程死锁。

  • p.is_alive():如果p仍然运行,返回True

  • p.join([timeout]):主线程等待p进程终止(强调:是主线程处于等待的状态,而p是处于运行的状态)。timeout是可选的超时时间,单位是秒,需要强调的是,p.join只能joinstart()开启的进程,而不能joinrun()开启的进程

Process类变量

  • p.daemon:进程的守护标志,一个布尔值。这必须在 p.start() 被调用之前设置。
    • 初始值继承自创建进程。当进程退出时,它会尝试终止其所有守护进程子进程。不允许在守护进程中创建子进程。这是因为当守护进程由于父进程退出而中断时,其子进程会变成孤儿进程。
    • 默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置。
  • p.name:进程的名称
  • p.pid:进程的pid
  • p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)
  • p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)

在Windows操作系统中由于没有fork(linux操作系统中创建进程的机制),在创建子进程的时候会自动 import 启动它的这个文件,而在 import 的时候又执行了整个文件。因此如果将process()直接写在文件中就会无限递归创建子进程报错。所以必须把创建子进程的部分使用if name ==‘main’ 判断保护起来,import 的时候 ,就不会递归运行了。

使用Process类创建进程

from multiprocessing import Process
import time


def task(name):
    print(‘{} is running!‘.format(name))
    time.sleep(3)
    print(‘{} is done!‘.format(name))


if __name__ == ‘__main__‘:
    for i in range(10):
        p = Process(target=task, args=(i, ))
        p.start()
    print(‘--- 主进程 ----‘)

守护进程

  • 守护进程:父进程中将一个子进程设置为守护进程,那么这个子进程会随着主进程的结束而结束。

  • 主进程创建守护进程

    • 其一:守护进程会在主进程代码执行结束后就终止
    • 其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

注意:进程之间是互相独立的(数据不互通),主进程代码运行结束,守护进程随即终止

import os
from multiprocessing import Process
import time

def run1():
    print(f"{os.getpid()}进程开始")
    print(f"{os.getpid()}进程结束")

# 主进程
if __name__ == "__main__":
    p = Process(target=run1)
    p.daemon =True # 开启p进程为守护进程
    p.start() # p进程开始执行
    # time.sleep(2) 当主进程阻塞2秒后,run1执行完成。
    print("____主进程_____") # 由于主进程和守护进程同时执行完成,所以如果主进程完成时,守护进程没有完成,那么就不会完成守护进程的任务。

Pool类

  • 模块:multiprocess.pool模块

  • 使用Pool类:from multiprocess import Pool

  • multiprocessing 中,通过创建一个 Pool 对象(进程池对象创建多个子进程)然后调用它的 apply_async() 方法来生成子进程。

  • class Pool():一个进程池对象,它控制可以提交作业的工作进程池。

  • 语法:

    class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
    
  • 参数:

    • processes :要使用的工作进程数目。
    • initializer :如果 initializer 不为 None,则每个工作进程将会在启动时调用 initializer(*initargs)
    • maxtasksperchild :是一个工作进程在它退出或被一个新的工作进程代替之前能完成的任务数量,为了释放未使用的资源。默认的 maxtasksperchildNone,意味着工作进程寿与池齐。
    • context :可被用于指定启动的工作进程的上下文。通常一个进程池是使用函数 multiprocessing.Pool() 或者一个上下文对象的 Pool() 方法创建的。在这两种情况下, context 都是适当设置的。
  • 注意:

    • 进程池对象的方法只有创建它的进程能够调用。
    • multiprocessing.pool 对象具有需要正确管理的内部资源 (像任何其他资源一样),具体方式是将进程池用作上下文管理器,或者手动调用 close()terminate()。 未做此类操作将导致进程在终结阶段挂起。
    • 请注意依赖垃圾回收器来销毁进程池是 不正确 的做法,因为 CPython 并不保证会调用进程池终结程序

Pool类方法

apply_async()apply() 方法的一个变种,返回一个 对象所属的类。

  • 语法:

    apply_async(func[, args[, kwds[, callback[, error_callback]]]])
    
  • 参数:

    • func :需要调用的工作进程名称,就是函数名称
    • args:参数元组
    • kwds:参数字典
    • callback :如果指定了 callback , 它必须是一个接受单个参数的可调用对象。当执行成功时, callback 会被用于处理执行后的返回结果,否则,调用 error_callback
    • error_callback :如果指定了 error_callback , 它必须是一个接受单个参数的可调用对象。当目标函数执行失败时, 会将抛出的异常对象作为参数传递给 error_callback 执行。

close():阻止后续任务提交到进程池,当所有任务执行完成后,工作进程会退出。

terminate():不必等待未完成的任务,立即停止工作进程。当进程池对象被垃圾回收时, 会立即调用 terminate()

join():等待工作进程结束。调用 join() 前必须先调用 close() 或者 terminate()

使用Pool类创建多个子进程

注意:当python版本为3.7.2时,使用pycharm进行并发编程时,由pycharm的evn与python3.7.2不兼容原因导致进程池在运行过程中会出现死锁,也就是永远不会再跳出来。

import os
import time
from multiprocessing import Pool

def run_task(name):
    print("该 %s 进程开始运行" %(name,))
    start = time.time()
    time.sleep(2)
    end = time.time()
    print("该 %s 进程运行时间是 %d" %(name,end-start))
# 主进程(父进程)
if __name__ == "__main__":
    print("父进程 %d" %(os.getpid()),)
    p = Pool(processes = 4) # 创建4个子进程
    for i in range(5): # i 的值 为 [0,5)
        p.apply_async(func=run_task,args=(i,)) # 调用运行进程,并且传入参数
    print("等待所有子进程完成")
    p.close() # 由于使用了close我们还可以使用with为我们上下文管理
    p.join()  # 主进程等待所有子进程结束
    print("主进程,所有子进程完成")

subprocess模块(了解)

  • 很多时候,子进程并不是自身,而是一个外部进程。我们创建了子进程后,还需要控制子进程的输入和输出。
  • subprocess模块允许你生成新的进程,连接它们的输入、输出、错误管道,并且获取它们的返回码。
  • 推荐的调用子进程的方式是在任何它支持的用例中使用 run() 函数。对于更进阶的用例,也可以使用底层的 Popen接口。
  • 使用:import subprocess

run方法

  • run():运行被 arg 描述的指令。等待指令完成,然后返回一个 [CompletedProcess]实例,代表一个进程已经结束.。

  • 语法:

    subprocess.run(args, *, stdin=None, input=None, stdout=None, stderr=None, capture_output=False, shell=False, cwd=None, timeout=None, check=False, encoding=None, errors=None, text=None, env=None, universal_newlines=None, **other_popen_kwargs)?
    
  • 参数描述:

    • args:将要被run()运行的指令
    • capture_output:如果 capture_output 设为 true,stdout 和 stderr 将会被捕获。在使用时,内置的 Popen 对象将自动用 stdout=PIPEstderr=PIPE 创建。stdoutstderr 参数不应当与 capture_output 同时提供。如果你希望捕获并将两个流合并在一起,使用 stdout=PIPEstderr=STDOUT 来代替 capture_output
    • timeouttimeout 参数将被传递给 Popen.communicate()。如果发生超时,子进程将被杀死并等待。 TimeoutExpired 异常将在子进程中断后被抛出。
    • inputinput 参数将被传递给 Popen.communicate() 以及子进程的标准输入。 如果使用此参数,它必须是一个字节序列。 如果指定了 encodingerrors 或者将 text 设置为 True,那么也可以是一个字符串。 当使用此参数时,在创建内部 Popen 对象时将自动带上 stdin=PIPE,并且不能再手动指定 stdin 参数。
    • check:如果 check 设为 True, 并且进程以非零状态码退出, 一个 CalledProcessError 异常将被抛出. 这个异常的属性将设置为参数, 退出码, 以及标准输出和标准错误, 如果被捕获到.
    • encoding:如果 encoding 或者 error 被指定, 或者 text 被设为 True, 标准输入, 标准输出和标准错误的文件对象将通过指定的 encodingerrors 以文本模式打开, 否则以默认的 io.TextIOWrapper 打开. universal_newline 参数等同于 text 并且提供了向后兼容性. 默认情况下, 文件对象是以二进制模式打开的.
    • env:如果 env 不是 None, 它必须是一个字典, 为新的进程设置环境变量; 它用于替换继承的当前进程的环境的默认行为. 它将直接被传递给 Popen.
    import subprocess
    
    print(‘$ nslookup www.python.org‘)
    r = subprocess.run([‘nslookup‘, ‘www.python.org‘])
    print(‘Exit code:‘, r.returncode)
    

CompleteProcess类

  • class CompleteProcess()run() 的返回值, 代表一个进程已经结束.

  • 返回值:

    • args:被用作启动进程的参数. 可能是一个列表或字符串.

    • returncode:子进程的退出状态码. 通常来说, 一个为 0 的退出码表示进程运行正常.一个负值 -N 表示子进程被信号 N 中断 (仅 POSIX).

    • stdout:从子进程捕获到的标准输出. 一个字节序列, 或一个字符串, 如果 run() 是设置了 encoding, errors 或者 text=True 来运行的. 如果未有捕获, 则为 None.如果你通过 stderr=subprocess.STDOUT 运行进程,标准输入和标准错误将被组合在这个属性中,并且 stderr 将为 None

    • stderr:捕获到的子进程的标准错误. 一个字节序列, 或者一个字符串, 如果 run() 是设置了参数 encoding, errors或者 text=True 运行的. 如果未有捕获, 则为 None.

    • check_returncode():如果 returncode 非零, 抛出 CalledProcessError.

Popen构造方法

  • class Popen():在一个新的进程中执行子程序。

  • 语法:

    class subprocess.Popen(args, bufsize=-1, executable=None, stdin=None, stdout=None, stderr=None, preexec_fn=None, close_fds=True, shell=False, cwd=None, env=None, universal_newlines=None, startupinfo=None, creationflags=0, restore_signals=True, start_new_session=False, pass_fds=(), *, group=None, extra_groups=None, user=None, umask=-1, encoding=None, errors=None, text=None)?
    
  • 参数:

    • argsargs 应当是一个程序参数的序列或者是一个单独的字符串或 path-like object。 默认情况下,如果 args 是序列则要运行的程序为 args 中的第一项。 如果 args 是字符串,则其解读依赖于具体平台,如下所述。 请查看 shellexecutable 参数了解其与默认行为的其他差异。 除非另有说明,否则推荐以序列形式传入 args
    import subprocess
    
    print(‘$ nslookup www.python.org‘)
    r = subprocess.Popen([‘nslookup‘, ‘www.python.org‘])
    print(‘Exit code:‘, r.returncode)
    
    import subprocess
    
    print(‘$ nslookup‘)
    p = subprocess.Popen([‘nslookup‘], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    output, err = p.communicate(b‘set q=mx\\npython.org\\nexit\\n‘)
    print(output)
    print(‘Exit code:‘, p.returncode)
    

Queue类

  • queue 模块实现了多生产者、多消费者队列。这特别适用于消息必须安全地在多线程间交换的线程编程。模块中的 Queue 类实现了所有所需的锁定语义。模块实现了三种类型的队列,它们的区别仅仅是条目取回的顺序。在 FIFO 队列中,先添加的任务先取回。在 LIFO 队列中,最近被添加的条目先取回(操作类似一个堆栈)。优先级队列中,条目将保持排序( 使用 heapq 模块 ) 并且最小值的条目第一个返回。在内部,这三个类型的队列使用锁来临时阻塞竞争线程;然而,它们并未被设计用于线程的重入性处理。

  • 模块:multiprocess模块

  • 使用Queue类:from multiprocess import Queue

  • multiprocessing 中,通过创建一个 Queue 对象(FIFO队列,先进先出)然后调用它的put(item)方法存储数据到队列,get()从队列取出数据。

  • class Queue():FIFO队列的构造函数,我们可以创建一个共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。

  • 语法:

    class queue.Queue(maxsize=0)
    
  • 参数描述:

    • maxsize:Maxsize是一个整数,用于设置可以放置在队列中的项目数量的上限。一旦达到这个大小,插入将被阻塞,直到队列项被消耗。如果maxsize小于或等于0,则队列大小为无穷大。底层队列使用管道和锁定实现。另外,还需要运行支持线程以便队列中的数据传输到底层管道中。

队列公共方法

  • q.get(block=True, timeout=None):返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。

    • 参数:
      • block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。
      • timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。
  • q.get_nowait():同q.get(block=False)

  • Queue.put(item, block=True, timeout=None):将 item 放入队列。如果队列已满,此方法将阻塞至有空间可用为止。

    • 参数描述:
      • block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。
      • timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。
  • q.qsize()返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。

  • q.empty()如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。

  • q.full()如果q已满,返回为True。 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。

使用队列进行线程间通信

from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
    print(‘Process to write: %s‘ % os.getpid())
    for value in [‘A‘, ‘B‘, ‘C‘]:
        print(‘Put %s to queue...‘ % value)
        q.put(value)
        time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
    print(‘Process to read: %s‘ % os.getpid())
    for x in range(3):
        value = q.get(timeout=2) #此处可以设置timeout抛出Empty异常
        print(‘Get %s from queue.‘ % value)

if __name__==‘__main__‘:
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw,写入:
    pw.start()
    # 启动子进程pr,读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环,无法等待其结束,只能强行终止:
    # 或者选用for循环,设置循环次数小于或者等于队列数据个数,
    # 因为循环次数大于队列数据时,
    # 当队列没有数据时,
    # get方法会一直阻塞直到有数据存入队列为止
    pr.join()

threading模块简介

  • 多任务可以由多进程完成,也可以由一个进程内的多线程完成。

  • 我们前面所使用的进程是由若干线程组成的,一个进程至少有一个线程

  • 由于线程是操作系统直接支持的执行单元,因此,高级语言通常都内置多线程的支持,Python也不例外,并且,Python的线程是真正的Posix Thread,而不是模拟出来的线程。

  • Python的标准库提供了两个模块:_threadthreading_thread是低级模块,threading是高级模块,对_thread进行了封装。绝大多数情况下,我们只需要使用threading这个高级模块。

模块方法

  • threading.active_count():返回当前存活的 Thread 对象的数量。 返回值与 enumerate() 所返回的列表长度一致。

  • threading.current_thread():返回当前对应调用者的控制线程的 Thread 对象。如果调用者的控制线程不是利用 threading 创建,会返回一个功能受限的虚拟线程对象。

  • threading.excepthook(args, /):处理由 Thread.run() 引发的未捕获异常。

    • args 参数具有以下属性:
      • exc_type: 异常类型
      • exc_value: 异常值,可以是 None.
      • exc_traceback: 异常回溯,可以是 None.
      • thread: 引发异常的线程,可以为 None
    • 如果 exc_typeSystemExit,则异常会被静默地忽略。 在其他情况下,异常将被打印到 sys.stderr。如果此函数引发了异常,则会调用 sys.excepthook() 来处理它
    • threading.excepthook() 可以被重载以控制由 Thread.run() 引发的未捕获异常的处理方式。
    • 使用定制钩子存放 exc_value 可能会创建引用循环。 它应当在不再需要异常时被显式地清空以打破引用循环。使用定制钩子存放 thread 可能会在它设为被终结对象时将其重生。 请避免在定制钩子完成后存放 thread 以避免对象的重生。

Tread类

  • 模块:threading模块

  • 使用Tread类:from threading import Tread

  • threading 中,通过创建一个 Tread 对象(线程对象)然后调用它的 start() 方法来生成线程。

  • class Tread():该构造方法,可以创建线程对象,调用这个构造函数时,必需带有关键字参数。

  • 语法:

    class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
    
  • 参数:

    • groupgroup 应该为 None;为了日后扩展 ThreadGroup 类实现而保留。
    • targettarget 是用于 run() 方法调用的可调用对象。默认是 None,表示不需要调用任何方法。
    • namename 是线程名称。默认情况下,由 "Thread-N" 格式构成一个唯一的名称,其中 N 是小的十进制数。
    • argsargs 是用于调用目标函数的参数元组。默认是 ()
    • kwargskwargs 是用于调用目标函数的关键字参数字典。默认是 {}
    • daemon :如果不是 Nonedaemon 参数将显式地设置该线程是否为守护模式。 如果是 None (默认值),线程将继承当前线程的守护模式属性。
  • 注意:如果子类型重载了构造函数,它一定要确保在做任何事前,先发起调用基类构造器(Thread.__init__())。

Tread类方法

  • t.start():启动线程,并调用该子线程中的t.run()
  • t.run():线程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法
  • t.is_alive():如果t仍然运行,返回True
  • t.join([timeout]):这会阻塞调用这个方法的线程,直到被调用 join() 的线程终结 -- 不管是正常终结还是抛出未处理异常 -- 或者直到发生超时,超时选项是可选的。
    • timeout 参数存在而且不是 None 时,它应该是一个用于指定操作超时的以秒为单位的浮点数(或者分数)。因为 join() 总是返回 None ,所以你一定要在 join() 后调用 is_alive() 才能判断是否发生超时 -- 如果线程仍然存活,则 join() 超时。
    • timeout 参数不存在或者是 None ,这个操作会阻塞直到线程终结。
    • 一个线程可以被 join() 很多次。
    • 如果尝试加入当前线程会导致死锁, join() 会引起 RuntimeError 异常。如果尝试 join() 一个尚未开始的线程,也会抛出相同的异常。
  • t.setName(Name)t.getName():设置或者返回线程名称。主线程默认名称:MainThread。

Tread类变量

  • t.daemon:线程的守护标志,一个布尔值。这必须在 t.start() 被调用之前设置。
    • 一个表示这个线程是(True)否(False)守护线程的布尔值。一定要在调用 start() 前设置好,不然会抛出 RuntimeError 。初始值继承于创建线程;主线程不是守护线程,因此主线程创建的所有线程默认都是 daemon = False
    • 当没有存活的非守护线程时,整个Python程序才会退出。
  • t.name:线程的名称
  • t.ident:这个线程的 ‘线程标识符‘,如果线程尚未开始则为 None 。这是个非零整数。当一个线程退出而另外一个线程被创建,线程标识符会被复用。即使线程退出后,仍可得到标识符。

使用Tread类创建线程

import time, threading

# 新线程执行的代码:
def loop():
    print(‘thread %s is running...‘ % threading.current_thread().name) # 获取当前对象的name属性
    n = 0
    while n < 5:
        n = n + 1
        print(‘thread %s >>> %s‘ % (threading.current_thread().name, n))
        time.sleep(1)
    print(‘thread %s ended.‘ % threading.current_thread().name)

print(‘thread %s is running...‘ % threading.current_thread().name)
t = threading.Thread(target=loop, name=‘LoopThread‘)
t.start() # 启动线程
t.join() # 等待线程完成
print(‘thread %s ended.‘ % threading.current_thread().name)

由于任何进程默认就会启动一个线程,我们把该线程称为主线程,主线程又可以启动新的线程,Python的threading模块有个current_thread()函数,它永远返回当前线程的实例。主线程实例的名字叫MainThread,子线程的名字在创建时指定,我们用LoopThread命名子线程。名字仅仅在打印时用来显示,完全没有其他意义,如果不起名字Python就自动给线程命名为Thread-1Thread-2……

守护线程

无论是进程还是线程,都遵循:守护xx会等待主xx运行完毕后被销毁。 需要强调的是:运行完毕并非终止运行

  • 守护线程:父线程中将一个子线程设置为守护进程,那么这个子线程会随着父线程的结束而结束。

  • #1.对主进程来说,运行完毕指的是主进程代码运行完毕
    #2.对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕
    
  • #1.主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束,
    #2 主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。
    

原始锁

  • 多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了

  • 原始锁是一个在锁定时不属于特定线程的同步基元组件。在Python中,它是能用的最低级的同步基元组件,由 _thread 扩展模块直接实现。

    原始锁处于 "锁定" 或者 "非锁定" 两种状态之一。它被创建时为非锁定状态。它有两个基本方法, acquire()release() 。当状态为非锁定时, acquire() 将状态改为 锁定 并立即返回。当状态是锁定时, acquire() 将阻塞至其他线程调用 release() 将其改为非锁定状态,然后 acquire() 调用重置其为锁定状态并返回。 release() 只在锁定状态下调用; 它将状态改为非锁定并立即返回。如果尝试释放一个非锁定的锁,则会引发 RuntimeError 异常。

    当多个线程在 acquire() 等待状态转变为未锁定被阻塞,然后 release() 重置状态为未锁定时,只有一个线程能继续执行;至于哪个等待线程继续执行没有定义,并且会根据实现而不同。

    所有方法的执行都是原子性的。

  • 模块:threading模块

  • 使用Lock类:from threading import Lock

  • class Lock():实现原始锁对象的类。一旦一个线程获得一个锁,会阻塞随后尝试获得锁的线程,直到它被释放;任何线程都可以释放它。

  • 语法:

    class threading.Lock()
    

Lock类方法

  • lock.acquire():可以阻塞或非阻塞地获得锁。

  • 语法:

    acquire(blocking=True, timeout=-1)
    
  • 参数描述:当调用时参数 blocking 设置为 True (缺省值),阻塞直到锁被释放,然后将锁锁定并返回 True

    • 在参数 blocking 被设置为 False 的情况下调用,将不会发生阻塞。如果调用时 blocking 设为 True 会阻塞,并立即返回 False ;否则,将锁锁定并返回 True
    • 当浮点型 timeout 参数被设置为正值调用时,只要无法获得锁,将最多阻塞 timeout 设定的秒数。timeout参数被设置为 -1 时将无限等待。当 blocking 为 false 时,timeout 指定的值将被忽略。
    • 如果成功获得锁,则返回 True,否则返回 False (例如发生 超时 的时候)。
  • lock.release():释放一个锁。这个方法可以在任何线程中调用,不单指获得锁的线程。当锁被锁定,将它重置为未锁定,并返回。如果其他线程正在等待这个锁解锁而被阻塞,只允许其中一个允许。

    在未锁定的锁调用时,会引发 RuntimeError 异常。没有返回值。

  • lock.locked():如果获得了锁则返回真值。

#不加锁
from threading import Thread, Lock

# 全局变量
money = 0

def add_money(n):
    # 先存后取
    global money # 如果想要在函数内操作全局变量,使用global关键字
    money += n
    money -= n

def run_task(n):
    for i in range(100000):
        add_money(n)

t = Thread(target=run_task,args=(4,))
t1 = Thread(target=run_task,args=(12,))
t.start()
t1.start()
t.join()
t1.join()
print(money)
# 多个线程更改同一个数据,没有数据安全
from threading import Thread, Lock

# 全局变量
money = 0
lock = Lock()
def add_money(n):
    # 先存后取
    global money # 如果想要在函数内操作全局变量,使用global关键字
    with lock:   # 加锁后,一个线程获得锁后,阻塞其他线程,直到获取锁的线程释放锁之后才会获得锁
        money += n
        money -= n

def run_task(n):
    for i in range(100000):
        add_money(n)

t = Thread(target=run_task,args=(4,))
t1 = Thread(target=run_task,args=(12,))
t.start()
t1.start()
t.join()
t1.join()
print(money)

重用锁

  • 重入锁是一个可以被同一个线程多次获取的同步基元组件。在内部,它在基元锁的锁定/非锁定状态上附加了 "所属线程" 和 "递归等级" 的概念。在锁定状态下,某些线程拥有锁 ; 在非锁定状态下, 没有线程拥有它。

  • 若要锁定锁,线程调用其 acquire() 方法;一旦线程拥有了锁,方法将返回。若要解锁,线程调用 release() 方法。 acquire()/release() 对可以嵌套;只有最终 release() (最外面一对的 release() ) 将锁解开,才能让其他线程继续处理 acquire() 阻塞。

  • 模块:threading模块

  • 使用Rlock类:from threading import RLock

  • class RLock():此类实现了重入锁对象。重入锁必须由获取它的线程释放。一旦线程获得了重入锁,同一个线程再次获取它将不阻塞;线程必须在每次获取它时释放一次。

  • 语法:

    class threading.RLock()
    

RLock类方法

  • lock.acquire():可以阻塞或非阻塞地获得锁。

  • 语法:

    acquire(blocking=True, timeout=-1)
    
  • 参数说明:当无参数调用时: 如果这个线程已经拥有锁,递归级别增加一,并立即返回。否则,如果其他线程拥有该锁,则阻塞至该锁解锁。一旦锁被解锁(不属于任何线程),则抢夺所有权,设置递归等级为一,并返回。如果多个线程被阻塞,等待锁被解锁,一次只有一个线程能抢到锁的所有权。在这种情况下,没有返回值。

    当发起调用时将 blocking 参数设为真值,则执行与无参数调用时一样的操作,然后返回 True

    当发起调用时将 blocking 参数设为假值,则不进行阻塞。 如果一个无参数调用将要阻塞,则立即返回 False;在其他情况下,执行与无参数调用时一样的操作,然后返回 True

    当发起调用时将浮点数的 timeout 参数设为正值时,只要无法获得锁,将最多阻塞 timeout 所指定的秒数。 如果已经获得锁则返回 True,如果超时则返回假值。

  • lock.release():释放锁,自减递归等级。

    • 如果减到零,则将锁重置为非锁定状态(不被任何线程拥有),并且,如果其他线程正被阻塞着等待锁被解锁,则仅允许其中一个线程继续。如果自减后,递归等级仍然不是零,则锁保持锁定,仍由调用线程拥有。
    • 只有当前线程拥有锁才能调用这个方法。如果锁被释放后调用这个方法,会引起 RuntimeError 异常。
    • 没有返回值。

信号量

  • 一个信号量管理一个内部计数器,该计数器因 acquire() 方法的调用而递减,因 release() 方法的调用而递增。 计数器的值永远不会小于零;当 acquire() 方法发现计数器为零时,将会阻塞,直到其它线程调用 release() 方法。

  • 使用Semaphore类:from threading import Semaphore

  • class Semaphore() :该类实现信号量对象。信号量对象管理一个原子性的计数器,代表 release() 方法的调用次数减去 acquire()的调用次数再加上一个初始值。如果需要, acquire() 方法将会阻塞直到可以返回而不会使得计数器变成负数。在没有显式给出 value 的值时,默认为1。

    可选参数 value 赋予内部计数器初始值,默认值为 1 。如果 value 被赋予小于0的值,将会引发 ValueError 异常。

  • 语法:

    class threading.Semaphore(value=1)
    

Semaphore类方法

  • acquire(blocking=True, timeout=None)

    获取一个信号量。在不带参数的情况下调用时:如果在进入时内部计数器的值大于零,则将其减一并立即返回 True.如果在进入时内部计数器的值为零,则将会阻塞直到被对 release() 的调用唤醒。 一旦被唤醒(并且计数器的值大于 0),则将计数器减 1 并返回 True。 每次对 release() 的调用将只唤醒一个线程。 线程被唤醒的次序是不可确定的。当发起调用时将 blocking 设为假值,则不进行阻塞。 如果一个无参数调用将要阻塞,则立即返回 False;在其他情况下,执行与无参数调用时一样的操作,然后返回 True。当发起调用时如果 timeout 不为 None,则它将阻塞最多 timeout 秒。 请求在此时段时未能成功完成获取则将返回 False。 在其他情况下返回 True在 3.2 版更改: 新的 timeout 形参。

  • release(n=1)

    释放一个信号量,将内部计数器的值增加 n。 当进入时值为零且有其他线程正在等待它再次变为大于零时,则唤醒那 n 个线程。

with上下文管理

  • 在python中,with lock:即可获取释放锁,信号量,事件。进入代码块时获取(acquire),退出with代码块时释放(release)

全局解释器锁

GIL:全局解释器锁,Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。

线程本地数据

  • 线程本地数据:线程本地数据是特定线程的数据。管理线程本地数据,只需要创建一个 local (或者一个子类型)的实例并在实例中储存属性:

  • 使用local类:from threading import local

  • class local():创建一个线程本地数据类的实例

  • 语法:

    class threading.local()
    
  • 使用:绑定当前线程的属性即可

    import threading
    from threading import Thread,local
    
    # 创建全局线程本地数据
    l = local()
    
    def get():
        # 获取当前线程所关联的数据
        std = l.name
        print("(%s) 当前线程 %s" %(std,(threading.current_thread().name)))
    
    def add(name):
        # 绑定当前线程所关联的数据
        l.name = name # 把传入过来的数据绑定到 l.name
        get()
    
    t1 = Thread(target=add,args=("Task1",),name="线程1")
    t2 = Thread(target=add,args=("Task2",),name="线程2")
    t1.start()
    t2.start()
    t1.join()
    t1.join()
    
  • 一个ThreadLocal变量虽然是全局变量,但每个线程都只能读写自己线程的独立副本,互不干扰。ThreadLocal解决了参数在一个线程中各个函数之间互相传递的问题。

python标准库--并发编程

原文:https://www.cnblogs.com/bright-future/p/14731868.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!