首页 > 编程语言 > 详细

41---线程与进程高阶

时间:2020-04-27 21:16:28      阅读:53      评论:0      收藏:0      [点我收藏+]

一 死锁与递归锁

1.1 死锁

1.1.1 死锁现象

即便是知道锁的使用抢到锁之后要释放锁,在操作锁的时候也极容易产生死锁现象---整个程序卡死(阻塞)

1.1.2 代码验证死锁现象
from threading import Thread,Lock
import time
mutexA = Lock()
mutexB = Lock()

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        mutexA.acquire()
        print(f‘{self.name}抢到A锁‘)
        # self.name获取当前线程的名字
        mutexB.acquire()
        print(f‘{self.name}抢到B锁‘)
        mutexB.release()
        mutexA.release()
        
    def func2(self):
        mutexB.acquire()
        print(f‘{self.name}抢到B锁‘)
        time.sleep(2)
        mutexA.acquire()
        print(f‘{self.name}抢到A锁‘)
        mutexA.release()
        mutexB.release()
        
if __name__ == ‘__main__‘:
    for i in range(10):
        t = MyThread()
        t.start()
        
# 运行结果
Thread-1抢到A锁
Thread-1抢到B锁
Thread-1抢到B锁
Thread-2抢到A锁
....这里阻塞住了,出现死锁现象

# 结果分析
1.共有10个线程开启,开启后会自动执行run()
2.首先执行func1功能
3.执行func1,10个线程中会有一个首先抢到A锁,另外的9个线程需要等A锁释放才能争抢A锁
4 抢到A锁的线程会很顺利的抢到B锁,然后依次释放B锁和B锁
5 A锁释放完毕后,第一个线程就可以执行到func2,再次抢到B锁,与此同时其他的9个线程执行func1时,又会有一个线程抢到A锁
6 第二个线程拿着A锁想要抢到B锁,而此时正在执行func2的线程拿着B锁想要抢到A锁
7 由此产生了死锁现象

1.2 递归锁

1.2.1 递归锁的特点
  • 可以被连续的acquire和release
  • 但是只能被第一个抢到这把锁的任务执行上述操作
  • 内部有一个计数器,每acquire一次计数加一,每release一次计数减一
  • 只要计数不为0,那么其他人都无法抢到该锁
1.2.2 递归锁的代码验证
  • 将死锁代码修改一下就可以变成递归锁
导入RLock
将两把锁变为一把锁:
	mutexA = Lock()
	mutexB = Lock()
    # 换成
    mutexA = mutexB = RLock()
  • 实际代码
from threading import Thread, RLock
import time

mutexA = mutexB = RLock()


class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        mutexA.acquire()
        print(f‘{self.name}抢到A锁‘)
        # self.name获取当前线程的名字
        mutexB.acquire()
        print(f‘{self.name}抢到B锁‘)
        mutexB.release()
        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print(f‘{self.name}抢到B锁‘)
        time.sleep(2)
        mutexA.acquire()
        print(f‘{self.name}抢到A锁‘)
        mutexA.release()
        mutexB.release()


if __name__ == ‘__main__‘:
    for i in range(10):
        t = MyThread()
        t.start()
        
# 程序不会发生阻塞,可以正常结束

二 信号量

2.1 信号量的注意点

信号量在不同的阶段可能对应不同的技术点

在并发编程中信号量指的是锁!!!

2.2 信号量的代码验证

2.2.1 信号量的通俗理解

将互斥锁比喻成一个厕所的话,那么信号量就相当于多个厕所??

2.2.2 代码验证
from threading import Thread,Semaphore
import time
import random

# 括号内写数字,写几就表示开设几个厕所...
sm = Semaphore(5)

def task(name):
    # 抢锁---抢厕所
    sm.acquire()
    print(f‘{name}正在蹲坑...‘)
    # 模拟蹲坑的事件
    time.sleep(random.randint(1,5))
    # 释放锁---从厕所出去
    sm.release()

if __name__ == ‘__main__‘:
    # 模拟20个人抢厕所
    for i in range(20):
        t = Thread(target=task,args=(f‘鸵鸟{i}号‘,))
        t.start()

三 Event事件

3.1 Event事件介绍

一些进程/线程需要等待另外一些进程/线程运行完毕之后才能运行,类似于发射信号(我结束了,你可以开始了)

3.2 代码实现

from threading import Thread,Event
import time

# 造一个红绿灯
event = Event()

def light():
    print(‘红灯状态‘)
    time.sleep(3)
    print(‘绿灯亮‘)

    # 告诉等绿灯的车可以走了
    event.set()

def car(name):
    print(f‘{name}正在等绿灯到来‘)

    # 等待信号灯发可以走了的信号
    event.wait()
    print(f‘{name}加油门,噔噔蹬开车跑了...‘)


if __name__ == ‘__main__‘:
    # 开启红绿灯
    t = Thread(target=light)
    t.start()

    # 模拟20辆车在等绿灯
    for i in range(20):
        t = Thread(target=car,args=(f‘伞兵{i}‘,))
        t.start()

四 线程队列q

4.1 同一个进程下的线程为何还要使用队列

同一个进程下的多个线程数据是共享的,那么还需要队列的原因就是为了为了数据安全

? 因为队列是:管道+锁

我们现在使用的队列都是只能在本地测试使用

4.2 三种队列介绍

4.2.1 先进先出队列Queue
import queue

q = queue.Queue()
q.put(1)
q.get()
q.get_nowait()
q.get(timeout=3)
q.full()
q.empty()
4.2.2 后进先出队列LifoQueue
import queue

q = queue.LifoQueue(3)
q.put(1)
q.put(2)
q.put(3)
print(q.get()) # 3
print(q.get()) # 2
print(q.get()) # 1
4.2.3 优先级PriorityQueue
import queue

q = queue.PriorityQueue(3)
q.put((1,111))
q.put((0,222))
q.put((-1,333))
print(q.get()) # 333
print(q.get()) # 222 
print(q.get()) # 222

# put括号内放小元组,第一个放数字表示优先级
# 数字越小优先级越高

五 进程池与线程池

5.1 之前TCP服务端的并发效果是如何实现的

5.1.1 实现方式

每有一个客户端请求连接服务端,服务端就开设一个进程或者线程去处理

5.1.2 代码实现
  • 服务端
import socket
from threading import Thread


def communicate(conn):
    while True:
        try:
            data = recv(1024)
            if noe data:break
            conn.send(data.upper())
        except Exception:
            break
    conn.close
    
def sever(ip,port):
    sever = socket.socket()
    sever.bind((ip,port))
    sever.listen(5)
    while True:
        conn,addr = accept()
        # 开设线程为客户端服务
        t = Thread(target=communicate,args=(conn,))
        t.start()

if __name__ == ‘__main__‘:
    s = Thread(target=sever,args=(‘127.0.01.1‘,8080))
    s.start()
    
  • 客户端
import socket

client = socket.socket()
client.connect((‘127.0.0.1‘,8080))

while True:
    msg = input(‘》》‘).strip()
    if len(msg) == 0:
        continue
    if msg == ‘q‘:
        break
    client.send(msg.encode(‘utf-8‘))
    data = client.recv(1024)
    print(data.decode(‘utf-8‘))
client.close()
5.1.3 上述实现并发效果存在的问题
无论是开设进程还是线程,都需要消耗资源,只不过是开设线程的消耗比假设进程的消耗小一些

作为程序的开发者,不可能做到无限制的开设进程和线程,因为计算机硬件资源跟不上,而硬件的开发速度远远赶不上软件!

所以开发的宗旨应该是在保证计算机硬件能够正常工作的情况下最大限度的利用它。

因此引入了‘池’的概念

5.2 池的概念

是用来保证计算机硬件安全的情况下最大限度的利用计算机

降低了程序的运行效率但是保证了计算机硬件的安全,从而让开发的程序能够正常运行

5.3 任务的提交方式---同步和异步

任务的提交方式:
同步:提交任务后原地等待任务的返回结果
异步:不等待任务结果,继续往下执行
返回的结果如何获取?
异步提交任务的返回结果,应该通过回调机制来获取
回调机制:
就相当于给每个异步任务绑定了一个计时炸弹,一旦任务有结果就会触发爆

5.4 线程池的基本使用

from concurrent.futures import ThreadPoolExecutor
import os
import time

# 创建线程池对象
pool = ThreadPoolExecutor(5)
‘‘‘
括号内可以传数字,不传的话默认会开设当前计算机CPU个数的5倍的线程数。
池子造出来后,会固定存在括号内数字的线程个数,这些线程不会出现重复创建和销毁的现象
池子的使用:
    只需将需要执行的任务(嫖客)往池子中提交即可,会自动有线程(tank)执行该任务
‘‘‘


def task(n):
    print(n,os.getpid())
    time.sleep(2)


‘‘‘
pool.submit(task,1)
print(‘主‘)
向线程池中提交任务,是异步提交任务,主线程不会等子线程执行完毕之后再执行
pool.submit()会返回一个Future对象
‘‘‘

# 需求:等待线程池中的所有任务执行完毕之后才继续往下运行---将异步变为同步


res_list = []
for i in range(20):
    res = pool.submit(task,i)
    res_list.append(res)
# 关闭线程池,等待线程池中所有任务运行完毕
pool.shutdown()
# 拿到res的结果,res是任务(task)的返回值,也就是异步提交任务的返回结果
for res in res_list:
    print(res.result()) # result方法:将异步变为同步
    ‘‘‘
    程序由并发变为串行,即结果不是返回值与输出值交叉出现,而是先输出任务的打印内容,再输出任务的返回值
‘‘‘

‘‘‘
异步回调机制拿到返回值的方式之高级版
‘‘‘


def call_back(n):
    print(‘call_back‘, n.result())

if __name__ == ‘__main__‘:


    for i in range(20):
        res = pool.submit(task,i).add_done_callback(call_back)

5.5 进程池的基本使用

from concurrent.futures import ProcessPoolExecutor
import os
import time

# 创建线程池对象
pool = ProcessPoolExecutor()
‘‘‘
括号内可以传数字,不传的话默认会开设当前计算机CPU个数的进程数。
池子造出来后,会固定存在括号内数字的进程个数,这些进程不会出现重复创建和销毁的现象
池子的使用:
    只需将需要执行的任务(嫖客)往池子中提交即可,会自动有进程(tank)执行该任务
‘‘‘


def task(n):
    print(n,os.getpid())
    time.sleep(2)


‘‘‘
pool.submit(task,1)
print(‘主‘)
向线程池中提交任务,是异步提交任务,主进程不会等子进程执行完毕之后再执行
pool.submit()会返回一个Future对象
‘‘‘

# 需求:等待线程池中的所有任务执行完毕之后才继续往下运行---将异步变为同步

if __name__ == ‘__main__‘:

    res_list = []
    for i in range(20):
        res = pool.submit(task,i)
        res_list.append(res)
    # 关闭线程池,等待线程池中所有任务运行完毕
    pool.shutdown()
    # 拿到res的结果,res是任务(task)的返回值,也就是异步提交任务的返回结果
    for res in res_list:
        print(res.result()) # result方法:将异步变为同步
        ‘‘‘
        程序由并发变为串行,即结果不是返回值与输出值交叉出现,而是先输出任务的打印内容,再输出任务的返回值
        ‘‘‘

‘‘‘
异步回调机制拿到返回值的方式之高级版
‘‘‘


def call_back(n):
    print(‘call_back‘, n.result())

if __name__ == ‘__main__‘:


    for i in range(20):
        res = pool.submit(task,i).add_done_callback(call_back)
        

5.6 总结---需要掌握的知识

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
pool = ProcessPoolExecutor(5)
pool.submit(task, i).add_done_callback(call_back)

六 协程

6.1 协程引入

进程:资源单位
线程:执行单位
协程:完全是程序员自己意淫出来的,根本不存在
	单线程实现并发
	程序员自己在代码层面上检测IO操作,一旦程序遇到IO操作,就会在代码级别完成切换。
	这样给CPU的感觉就是程序一直在运行,没有IO操作,从而提升程序的运行效率
	
	
多道技术:单核实现并发:切换+保存状态
CPU两种切换
	1 程序遇到IO操作
	2 程序长时间占用CPU
	
代码如何做到:切花+保存状态

切换:
	切换不一定能提升效率,也有可能降低效率
	IO切换:提升效率
	没有IO切换:降低效率
保存状态:
	保存上一次执行的状态,下一次接着上一次的结果继续运行

6.2 验证切换是否一定能够提升效率

import time
#
def func():
    for i in range(10000):
        i + 1
def foo():
    for i in range(10000):
        i + 1


start = time.time()
func()
foo()
print(time.time()-start)


def func():
    while True:
        10000 + 1
        yield
def foo():
    g = func()
    for i in range(10000):
        i + 1
        next(g)

s = time.time()
foo()
print(time.time()-s)

# 运行结果
0.0009970664978027344
0.0009989738464355469
分析:切换后效率变低

6.3 gevent模块

6.3.1 基本使用方法
gevent模块本身无法检测常见的一些io操作
在使用的时候需要你额外的导入一句话
from gevent import monkey
monkey.patch_all()
又由于上面的两句话在使用gevent模块的时候是肯定要导入的
所以还支持简写
from gevent import monkey;monkey.patch_all()
6.3.2 代码验证
from gevent import monkey;monkey.patch_all()
import time
from gevent import spawn

def heng():
    print(‘哼‘)
    time.sleep(2)
    print(‘。heng 。‘)

def ha():
    print(‘ha‘)
    time.sleep(3)
    print(‘。ha 。‘)
start_time = time.time()
g1 = spawn(heng)
g2 = spawn(ha)
g1.join()  # 等待被检测的任务执行完毕 再往后继续执行
g2.join()
print(time.time()-start_time)

# 运行结果
哼
ha
。heng 。
。ha 。
3.0066308975219727

6.4 单线程实现TCP服务端的并发

  • 服务端
from gevent import monkey;monkey.patch_all()
import socket
from gevent import spawn

def communicate(conn):
    while True:
        try:
            data = conn.recv(1024)
            if not data:break
            conn.send(data.upper())
        except Exception:
            break
    conn.close()


def sever(ip,port):
    sever = socket.socket()
    sever.bind((ip,port))
    sever.listen(5)
    while True:
        conn,addr = sever.accept()
        spawn(communicate,conn)

if __name__ == ‘__main__‘:
    g1 = spawn(sever,‘127.0.0.1‘,8080)
    g1.join()
  • 客户端
import socket
from threading import Thread,current_thread
def client():
    client = socket.socket()
    client.connect((‘127.0.0.1‘,8080))
    while True:
        n = 0
        while True:
            msg = ‘%s say hello %s‘ % (current_thread().name, n)
            n += 1

            client.send(msg.encode(‘utf-8‘))
            data = client.recv(1024)
            print(data.decode(‘utf-8‘))

if __name__ == ‘__main__‘:
    for i in range(100):
        t = Thread(target=client)
        t.start()

七 总结

理想状态:
	我们可以通过
	多进程下面开设多线程
	多线程下面再开设协程序
	从而使我们的程序执行效率提升

41---线程与进程高阶

原文:https://www.cnblogs.com/Kathrine/p/12789604.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!