首页 > 编程语言 > 详细

python有关并发编程的部分

时间:2021-04-30 21:10:59      阅读:24      评论:0      收藏:0      [点我收藏+]

并发编程

基础

多道技术:单核实现并发的效果,即程序一遇到I/O操作就切换CUP使用者
并发:看起来像同时运行的就可以称之为并发
并行:真正意义上的同时进行
技术分享图片

空间复用:多个进程复用内存空间
时间复用:多个进程复用CPU时间

进程三状态:当前进程退出运行状态的原因(CPU切换)
1.遇到IO操作时,就要让出CPU让其他进程执行
2.进程占用CPU时间过长,或者优先级更高的进程需要调用
技术分享图片

同步和异步:描述的是任务的提交方式
同步:任务提交之后,原地等待任务的返回结果,等待过程中不做任何事情
异步:任务提交之后,不等待任务的返回结果,直接去做其他事情,任务的返回结果会有一个异步回调机制自动处理

阻塞和非阻塞:描述程序的运行状态
阻塞:阻塞态
非阻塞:就绪态和运行态

两组概念组合效率最高的是异步非阻塞

开启进程的两种方式

‘‘‘第一种方法‘‘‘
from multiprocessing import Process
import time


def task(name):
    print(f‘{name}‘)
    time.sleep(3)
    print(‘end‘)
‘‘‘
windows操作系统下,创建进程一定要在main内创建
‘‘‘

if __name__ == "__main__":
    # 实例化一个进程对象
    p = Process(target=task,args=(‘zhao‘,))
    # 开启进程
    p.start()
    p.join() # join方法是让主进程等待子进程p运行结束之后再继续往后执行,其他子进程不受影响
    print(‘mmm‘)

    
‘‘‘第二种方法‘‘‘
from multiprocessing import Process
import time


class MyProcess(Process):
    def run(self): # 必须包含一个run方法
        print(‘start‘)
        time.sleep(1)
        print(‘end‘)

if __name__ == "__main__":
    p = MyProcess()
    p.start()
    print(‘end2‘)
    
# 创建进程就是在内存中申请一块内存空间将需要运行的代码丢进去
# 一个进程对应在内存中就是一块独立的内存空间
# 多个进程对应在内存中就是多块独立的内存空间
# 进程之间数据默认隔离,若要交互需要借助第三方模块
# 容器类型的数据如列表,字典,元组等,最好在最后一个元素的后面也加上逗号

进程对象的其他方法

from multiprocessing import Process, current_process
import time
import os


def task():
    print(f‘{current_process().pid} is running‘) # 查看当前进程的进程号
    print(f‘{os.getppid()} 子进程的父进程id‘)
    time.sleep(3)

if __name__ == "__main__":
    p = Process(target=task)
    p.start()
    p.terminate() # 杀死该进程
    print(p.is_alive()) # 判断进程是否存活
    print(f‘{os.getpid()} is running‘) # 查看当前进程号
    print(f‘{os.getppid()} 父进程id‘) # 查看父进程id
# 一般情况下我们会将存储bool值的变量名和返回的结果是bool值的方法名,用is_作为开头

僵尸进程与孤儿进程

僵尸进程:当某个进程开设子进程后,该进程死后不会立刻释放占用的资源,所有的子进程都会步入僵尸进程
孤儿进程:子进程存活,父进程意外死亡

守护进程

# 被守护的子进程会随着主进程的结束而结束,不管其是否运行完
from multiprocessing import Process
import time
import os


def task():
    print(‘start‘)
    time.sleep(3)
    print(‘end‘)

if __name__ == "__main__":
    p = Process(target=task)
    p.daemon = True # 设置守护进程
    p.start()
    print(‘go‘)

线程互斥锁

# 多个线程在操作同一份数据的时候可能会造成数据的混乱,这个时候为了保证数据的安全我们通常会加锁处理
from threading import Thread, Lock
import time

l = Lock() # 实例化一把锁
money = 100
def task():
    global money
    l.acquire() # 获取锁
    tem = money
    time.sleep(0.1)
    money = tem - 1
    l.release() # 释放锁

if __name__ == ‘__main__‘:
    t_list = []
    for i in range(100):
        t = Thread(target=task)
        t.start()
        t_list.append(t)
    for i in t_list:
        i.join()
    print(money)

GIL全局解释器锁

# GIL不是python的特点,而是CPython解释器的特点
# GIL在CPython解释器中是一把互斥锁,会导致同一个进程下的多个线程无法同时执行,即无法利用多核优势
# GIL是保证解释器级别的数据安全
import threading

global_num = 0

lock = threading.Lock()


def test1():
    global global_num
    lock.acquire()
    for i in range(1000000):
        global_num += 1
    lock.release()
    print("test1", global_num)


def test2():
    global global_num
    lock.acquire()
    for i in range(1000000):
        global_num += 1
    lock.release()
    print("test2", global_num)


t1 = threading.Thread(target=test1)
t2 = threading.Thread(target=test2)

t1.start()
t2.start()
# 在Cpython解释器中,当我们的python代码有一个线程开始访问解释器的时候,GIL会把这个大锁给锁上,此时此刻其他的线程只能干等着,无法对解释器的资源进行访问,这一点就跟我们的互斥锁相似。而只是这个过程发生在我们的Cpython中,同时也需要等这个线程分配的时间到了,这个线程把gil释放掉,类似我们互斥锁的lock.release()一样,另外的线程才开始跑起来,说白了,这无疑也是一个单线程。
‘‘‘
执行过程:
		1.thread1拿到全局变量count 
		2.thread1申请到python解释器的gil
		3.解释器调用系统原生线程
		4.在cpu1上执行规定的时间
		5.执行时间到了,要求释放gil等下一次得到gil的时候,程序从这里接着这一次开始执行
		6.thread2拿到了全局变量,此时thread1对全局count的操作并未完成,所以thread1拿到的和thread2拿到的count其实是相同的,这样也很好解释为什么结果不是200万 而是少于200万
		7.thread2申请到了gil锁
		8.调用原生的线程
		9.如果是单核cpu则会在cup1上执行,(不是重点),如图在cpu3上执行
		10.执行规定的时间,此时完成了对count的加一操作
		11.执行时间还未到,线程2执行完对count操作,并给count加一,并且释放了gil锁
		12.线程1又申请到了gil锁,重复之前的操作。
		13.线程1执行对count的操作,执行时间到,释放gil锁

哪些情况适合用多线程呢:

    只要在进行耗时的IO操作的时候,能释放GIL,所以只要在IO密集型的代码里,用多线程就很合适,相比于多进程,多线程更节省资源,时间上面差不了多少

哪些情况适合用多进程呢:

    用于计算密集型,比如计算某一个文件夹的大小。
‘‘‘

技术分享图片

# 同一个进程内的多个线程数据是共享的
# 进程是资源单位,线程是执行单位

死锁与递归锁

‘‘‘死锁‘‘‘
from threading import Thread, Lock
import time


lock1 = Lock()
lock2 = Lock()


class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        lock1.acquire()
        print(f‘{self.name} 获取到1锁‘)
        lock2.acquire() # 此时1锁在func1上,并且需要获得2锁
        print(f‘{self.name} 获取到2锁‘)
        lock2.release()
        lock1.release()
    def func2(self):
        lock2.acquire()
        print(f‘{self.name} 获取到2锁‘)
        time.sleep(2) # 此时2锁在func2上,并且func2要获取1锁
        lock1.acquire()
        print(f‘{self.name} 获取到1锁‘)
        lock1.release()
        lock2.release()

if __name__ == ‘__main__‘:
    for i in range(10):
        t = MyThread()
        t.start()
# 整个程序卡住,产生死锁现象


‘‘‘递归锁‘‘‘
# 可以被连续的acquire和release
# 但是只能被第一个抢到这把锁的执行上述操作
# 它的内部有一个计数器,每acquire一次计数加一,每release一次技术减一
# 只要计数不为0,那么其他人都无法抢到该锁
from threading import Thread, RLock
import time

lock1 = lock2 = RLock()

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        lock1.acquire()
        print(f‘{self.name} 获取到1锁‘)
        lock2.acquire()
        print(f‘{self.name} 获取到2锁‘)
        lock2.release()
        lock1.release()
    def func2(self):
        lock2.acquire()
        print(f‘{self.name} 获取到2锁‘)
        time.sleep(2)
        lock1.acquire()
        print(f‘{self.name} 获取到1锁‘)
        lock1.release()
        lock2.release()

if __name__ == ‘__main__‘:
    for i in range(10):
        t = MyThread()
        t.start()

信号量

# 在并发编程中信号量指的是锁
# 信号量在不同的阶段可能对应不同的技术点
from threading import Thread, Semaphore
import time
import random


sm = Semaphore(5)  # 设置五把锁,同时可以运行五个线程获取到锁

def task(name):
    sm.acquire()
    print(f‘{name}正在吃屎‘)
    time.sleep(random.randint(1,5))
    sm.release()

if __name__ == ‘__main__‘:
    for i in range(20):
        t = Thread(target=task,args=(f‘毛芋头{i}号‘, ))
        t.start()

Event事件

# 让某些进程/线程,等待另一些进程/线程运行完毕,再运行
from threading import Thread, Event
import time

event = Event()  # 创建一个Event事件


def light():
    print(‘red‘)
    time.sleep(3)
    print(‘green‘)
    event.set()  # 触发事件


def car():
    print(‘stop‘)
    event.wait()  # 等待事件,等待触发事件被触发
    print(‘run‘)

if __name__ == ‘__main__‘:
    t = Thread(target=light)
    t.start()
    for i in range(20):
        t = Thread(target=car)
        t.start()

线程Q

import queue
# 队列 = 管道 + 锁
# 使用队列还是为了数据安全

# 先进先出队列
q = queue.Queue(3)

# 后进先出队列
q1 = queue.LifoQueue()

# 优先级队列
q2 = queue.PriorityQueue(3)
q2.put((10, ‘zhao‘))  # 参数是一个元组第一个数字表示优先级(数字越小优先级越高),第二个是放入的值 

进程池与线程池

# 池是用来保证计算机硬件安全的情况下最大限度的利用计算机
# 它降低了程序的运行效率,但是保证了你的程序能够正常运行

‘‘‘线程池/进程池‘‘‘
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time

# 函数可以传参数数字,如果不传,默认开启 os.cpu_count() * 5 数量的线程(os.cpu_count()数量的进程)
pool = ThreadPoolExecutor(5)  # 池子里面开设五个线程
# pool = ProcessPoolExecutor() # 开设进程池

def task(name):
    print(f‘{name} start‘)
    time.sleep(2)
    return f‘{name} end‘


def call_back(n):
    print(‘回调函数执行‘, n.result())


for i in range(20):
    res = pool.submit(task, ‘zhao‘)  # 提交任务,异步非阻塞
    res.add_done_callback(call_back)  # 绑定回调函数,只要res对象获取了数据,回调函数就立即执行
    
# 这种提交方式,也可用res = pool.map(task,range(20)),不过这个方法只适合传递简单的参数
    
# pool.shutdown()  # 关闭线程池,等待线程池中所有的线程执行完毕
# res.result()  # 获取执行函数返回值,类似join函数,会等待task函数执行结束

# 池中的线程是异步执行的

协程

# 单线程实现并发
# 代码层面上:遇到IO自动切换,能检测的io是十分有限的,比较常见的是时间模块的io和socket模块的io
from gevent import spawn
from gevent import monkey
import time


monkey.patch_all() # 猴子补丁,添加IO类型


def func1():
    print(‘func1 start‘)
    time.sleep(2)
    print(‘func1 end‘)


def func2():
    print(‘func2 start‘)
    time.sleep(3)
    print(‘func2 end‘)


g1 = spawn(func1) # 监控IO,func1为监控对象,可以传参,直接在后面加上,spawn(func1, ‘zhao‘)
g2 = spawn(func2)
g1.join()  # 阻塞,直到g1结束
g2.join()
# gevent.joinall([g1,g2])

# 协程是代码层面上的规避IO操作
# 本质就是一条线程,多个任务在一条线程上来回切换,来规避IO操作

asyncio模块

import asyncio


async def func(): # async修饰运行的函数
    print(‘start‘)
    # await可能会发生阻塞的方法
    await asyncio.sleep(1)
    print(‘end‘)

loop = asyncio.get_event_loop()
# loop.run_until_complete(func())
loop.run_until_complete(asyncio.wait([func(),func()])) # 启动协程

队列生产者消费者模型

# 进程之间数据隔离
# 进程之间通信(IPC):基于文件,基于网络(memcache,redis,rabbitmq,kafka)
from multiprocessing import Queue,Process

def son(q):
    q.put(‘hello‘) # 向队列q里面丢内容
def func(q):
    x = q.get() # 从队列q里面取出内容
    print(x)

if __name__ == ‘__main__‘:
    q = Queue() # 实例化一个队列对象,可携带参数,表示队列大小
    p = Process(target=son,args=(q,))
    p.start()
    p1 = Process(target=func,args=(q,))
    p1.start()
    p.join() # 让生产者生产完所有的数据
    q.put(None) # 给队列最后放上一个None,作为消费者判断结束的标志

    
# 生产者消费者模型
# 本质就是让生产数据和消费数据的效率达到平衡并且最大化效率
# put和get的速度比

python有关并发编程的部分

原文:https://www.cnblogs.com/zhaoxulu/p/14722972.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!