首页 > 其他 > 详细

2021/6/6 并发编程+网络编程

时间:2021-06-06 10:06:41      阅读:14      评论:0      收藏:0      [点我收藏+]
‘‘‘
python
一、进程理论
1. 前置知识
程序:存放在硬盘上的一大堆代码,是‘死‘的
进程:程序正在运行的过程,是‘活‘的
2. 进程调度
对于多进程我们需要合理的分配CPU资源,这依赖于进程调度,常见的进程调度算法有:
先来先服务调度算法
根据进程抢到CPU资源的先后顺序,先抢到CPU资源的先执行
短作业优先调度算法
根据调度算法计算出每个进程的长度,然后优先分配CPU资源给短进程
时间片轮转调度算法
将CPU的可处理时间分为一个一个固定大小的时间片,系统分配给每个进程不同数量的时间片
如果一个进程在消耗完时间片的情况下,依然没有执行完成,那么进入就绪队列,重新等待时间片分配
多级反馈优先队列
将CPU的可处理时间分为一个一个队列,每个队列的优先级各不相同,高优先级队列时间要小于低优先级队列
对于每个新创建的进程,都按照先来先服务原则,并分配到高优先级队列中,如果在高优先级队列依然没有执行完毕
那么分配到低优先级队列
如果低优先级队列中的任务正在执行,且高优先级队列中存在任务的话,会立刻跳转到高优先级队列,去执行
时间片轮转+多级反馈队列
3. 进程运行的三状态图
创建 --提交--》 就绪态 --进程调度--》 运行态 --释放--》 退出
运行态 --任务未执行完--》 就绪态 --等待时间片分配--》 运行态
运行态 --任务中途发生事件--》 阻塞态 --事件完成--》 就绪态
就绪态:进程任务已经获取到除CPU以外的所有必要资源,等待进程调度
运行态:进程任务已经获取到CPU资源,正在执行任务
阻塞态:进程任务中途触发事件,如IO操作,申请缓冲区满等,从而释放CPU资源
4. 同步/异步
同步:任务提交后,原地阻塞等待任务返回结果
异步:任务提交后,不会原地阻塞(依靠回调机制获取返回结果),去执行其他任务
5. 阻塞/非阻塞
阻塞:三状态图中的阻塞态
非阻塞:三状态图中的就绪态和运行态
二、进程操作
1. 创建进程的两种方式
类的继承
from multiprocessing import Process
class zzw(Process):
def __init__(self):
super().__init__()
def run(self):
xxx
if __name__ == ‘__main__‘:
p = zzw()
p.start()
类的实例化
from multiprocessing import Process
def zzw():
pass
if __name__ == ‘__main__‘:
p = Process(target=zzw, args=(), kwargs={}, name=Process-N, group=None)
p.start()
2. multiprocessing模块
Process类
p = Process(target=func, args=(), kwargs={}, name=, group=None, daemon=False)
p.start() 创建子进程
p.join([timeout]) 阻塞主进程,等待子进程运行结束
p.terminate() 立即终结当前进程
p.is_alive() 获取进程存活状态,True/False
p.name 获取进程别名,默认Process-N
p.pid 获取进程id
p.exitcode 获取进程退出状态码,正常退出为0
p.authkey 获取用户验证吗 os.urandom(32)
current_process().pid() 获取当前进程的pid
3. 进程间通信IPC
默认进程间相互隔离,这体现在内存空间上的隔离
如果要实现进程间相互通信,有两种方法:
队列 = 管道 + 锁
管道
队列Queue-先入先出
from multiprocessing import Queue, JoinableQueue
q = Queue(队列长度,不填默认几乎无限)
q.qsize() 返回当前队列长度
q.empty() 如果队列为空,返回True
q.full() 如果队列满了,返回True
q.put(obj, block=True, timeout=None)
向队列存入数据,如果队列满,则阻塞原地,等待队列能够存入数据
q.put_nowait(obj) --> 相当于 .put(obj, block=False)
向队列存入数据,如果队列满,则直接报错
q.get(block=True, timeout=None)
q.get_nowait(block=False)
q.task_done()
q.join()
JoinableQueue()类存在一个计数器,当调用一次task_done计数器-1,调用一次put计数器+1
当计数器为0时,join方法才不会阻塞当前进程
如果使用进程池时,需要使用from multiprocessing import Manager(Manager.Queue)

管道-Pipe
from multiprocessing import Pipe
a, b = Pipe(duplex=False)
设置管道的工作模式 duplex=False为半双工,duplex=True为全双工
a.send(obj)
b.recv()
阻塞原地等待数据,如果对端关闭端口,则会抛出异常EOFError
a.close()

4. windows创建进程操作需要写入到 __name__语句中,因为对于windows来说,创建进程
相当于使用import语句将父进程代码导入到子进程,因此,如果不将创建进程代码写入__name__
语句中,则会不停递归创建

5. 僵尸进程
子进程运行结束后,会留下一个Zombie僵尸进程的数据结构,其中包含进程ID、运行事件、退出状态等
如果父进程没有调用wait/waitpid方法来查看子进程状态,从而回收进程资源,那么Zombie数据结构
就会一直占用进程资源,损耗进程资源
解决方法:
1. 杀死父进程
2. 对开启的子进程使用.join()方法,因为该方法内部调用了wait方法

6. 孤儿进程
父进程创建子进程后突然死亡,子进程还在运行状态,此刻的子进程就是一个孤儿进程
在linux中,孤儿进程会统一被init进程1收养,init进程会不停循环调用wait/waitpid
方法,因此,子进程一旦执行完任务,则会被立刻回收进程资源

7. 守护进程
p.daemon = True
设置子进程为守护进程,则子进程随父进程的结束而结束运行,且守护进程不可继续创建进程

8. 互斥锁Lock
对于多进程同时访问同一个公共资源的情况,需要保证公共资源的数据安全,因此引入了互斥锁
互斥锁原理是同一时刻只能有一个进程对公共资源进行操作,将并行变为串行,保证数据的安全性
from multiprocessing import Lock
mutex = Lock() 生成一个锁对象
mutex.acquire() 加锁
mutex.release() 释放锁

9. 递归锁RLock
对于多个进程同时访问同一份公共资源时,如果使用Lock进行多重锁,那么会容易出现死锁现象
RLock提供多重锁机制,其内部存在一个计数器,没调用一次acquire,那么计数器+1,每调用一次
release计数器-1,只有当计数器为0时,锁才能被抢,否则一直锁定公共资源
from multiprocessing import RLock
mutex = RLock()
mutex.acquire()
mutex.release()


10. 进程池Pool
无需手动创建进程,只需将需要执行的任务丢入进程池即可手动创建进程
from multiprocessing import Pool
p = Pool(Processes=os.cpu_count())
p.apply_async(func, *args, **kwargs)
异步提交进程任务,返回ApplyResult对象
.get([timeout]) 原地阻塞获取返回结果
.ready() 进程任务执行完成返回True
.successful() 进程任务正常执行完成返回True,如果在进程任务还未执行完毕使用该方法,则会抛出异常
.wait([timeout]) 等待结果变为可用
p.apply(func, *args, **kwargs)
同步提交,原地阻塞等待返回结果
源码:apply_async().get()
p.close() 关闭进程池
p.terminate() 立刻终结进程池中所有任务
p.join([timeout]) 原地阻塞等待进程池任务执行完毕,需要在close/terminate方法之后

11. 进程池ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor
p = ProcessPoolExecutor(int=os.cpu_count())
p.submit(func, *args, **kwargs)
异步提交,返回future类
p.shutdown()
等价于 close join,关闭进程池,并原地阻塞等待进程任务执行完毕
.cancel() 终结当前进程任务,如果终结成功,返回True,否则Fasle
.cancelled() 如果当前进程任务终结成功,则返回True
.running() 如果当前进程正在运行,则返回True
.done() 如果当前进程运行结束或者被终结,返回Fasle
.result(timeout) 原地阻塞,获取进程任务的返回结果
.exception([timeout]) 原地阻塞获取当前进程异常,如果进程正常结束,则返回None
.add_done_callback(fn) 回调函数,进程任务执行完毕自动触发,将future类传递给
回调函数
p.map(func, iterable, timeout=None, chunksize=1)

三 线程理论
1. 线程/进程
进程:资源单位,创建一个进程就是向系统申请各种资源
线程:执行单位,线程就是程序的执行过程,执行过程中所需资源都向进程索要
2. 每一个进程内部都有一个主线程,负责进程任务的执行
3. 线程模块 _thread(作为thread的兼容),threading(建议使用)
4. 内核线程/用户线程
内核线程:由操作系统内核创建 撤销
用户线程:由用户在进程内部手动创建 撤销
四 线程操作
1. 开启线程的两种方式
类的继承
from threading import Thread
class zzw(Thread):
def __init__(self):
super().__init__()
def run(self):
pass
if __name__ == ‘__main__‘:
p = Thread(target=zzw, args=(), kwargs={}, name=Thread-N, daemon=False, group=None)
p.start()
类的实例化
from threading import Thread
def zzw():
pass
if __name__ == ‘__main__‘:
p = Thread(target=zzw, args=(), kwargs={}, name=Thread-N, daemon=False, group=None)
p.start()

2. threading模块
Thread类
p = Thread(target=func, args=(), kwargs={}, name=Thread-N, daemon=False, group=None)
p.start() 创建线程
p.join() 阻塞当前线程,等待子线程执行完毕
p.is_alive()/isAlive() 子线程存活返回True
p.name/getName()/setName() 获取name参数值,默认Thread-N,N从1开始
p.daemon/isDaemon()/setDaemon() 默认为False,设置守护线程

current_thread()/currentThread()
获取当前线程对象
enumerate()
[父线程, 子线程,...]
active_count()/activeCount()
获取存活线程数

3. 同进程下子线程共享全局变量

4. 守护线程
p = Thread()
p.daemon = True
设置线程为守护线程,随父线程的结束而结束,守护线程无法继续创建子线程

5. 互斥锁Lock
多个线程同时访问同一个全局变量时,会引起数据安全问题,为了保证数据的安全性
我们需要给公共资源加上一把锁,从而实现同一时间只能有一个线程能够操作公共资源
mutex = Lock()
mutex.acquire()
mutex.release()

6. GIL全局解释器锁
GIL不是python解释器的特点,而是CPython解释器的特点
GIL全局解释器锁是对解释器级数据的安全防护
解释器语言的通病:同一进程下的多线程无法利用多核优势
GIL实现在同一时间只能有一个线程获取到解释器的执行权限

7. 多进程/多线程的优势
多进程:适用于计算密集型,效率远远超于多线程
多线程:适用于IO密集型,开销小,效率不差于进程

8. 死锁现象/递归锁
当我们使用Lock锁实现重锁时,极容易出现死锁现象,即:
线程A获取到锁B,需要获取锁A才能继续执行
线程B获取到锁A,需要获取到锁B才能继续执行
因此进入到死锁现象,从而停滞在当前局面
递归锁RLock,可以实现创建重锁,每当调用一次acquire时,计数+1,
每当调用一次release时,计数-1,只有当计数为0时,才能抢锁,否则阻塞
from threading import RLock
mutex = RLock()
mutex.acquire()
mutex.acquire()
mutex.release()
mutex.release()

9. 信号量 Semaphore
在并发编程中 Semaphore代表锁概念
from threading import Semaphore
mutex = Semaphore(value)
mutex.acquire()
mutex.release()
Semaphore可以创建多个锁,每当调用一次acquire方法,value-1
每当调用一次release方法,value+1,当value为0时,acquire将阻塞线程,直到其他线程
调用release方法

10. 线程同步 Event
对于多线程,每个线程内的状态都是不可预料的,如果对于某个线程需要依赖另外一个线程的莫个状态
才能继续执行,此刻就需要到Event来传递状态
from threading import Event
event = Event()
event.set() 设置Event状态为True
event.clear() 设置Event状态为False
event.wait() 当Event状态为True,则激活线程,进入就绪态
event.isSet()/is_set() 返回Event的状态
对于多进程同步,Event需要使用到队列来传递Event状态

11. 线程queue队列
Queue 先入先出队列/LifoQueue 先入后出队列/PriorityQueue 优先级队列
from queue import Queue, LifoQueue, PriorityQueue
q = Queue()
q = LifoQueue()
q = PriorityQueue()
q.put((优先级别, obj))
优先级别,填入int类型,值越小越优先,可以为负数
通用方法:
q.qsize()
q.empty()
q.full()
q.put(obj, block=True, timeout=None)
q.put_nowait(obj) --> put(obj, block=False)
q.get(block=True, timeout=None)
q.get_nowait() --> get(block=False)
q.join()
q.task_done()

12. 线程池ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor
p = ThreadPoolExecutor()
p.submit(func, *args, **kwargs)
异步提交,返回future类
.cancel() 立即终结当前线程,如果线程正在运行无法终结,则返回False,否则True
.cancelled() 如果终结线程成功,则返回True,否则False
.running() 正在运行,返回True
.done() 运行结束/被终结,返回False
.result([timeout]) 阻塞原地,等待返回结果
.exception([timeout]) 阻塞原地,获取线程执行中抛出异常,如果正常执行完毕,返回None
.add_done_callback(func) 回调函数,线程执行完毕立刻触发,将future作为参数传递给回调函数
p.shutdown()
关闭线程池,并原地阻塞等待其他线程任务执行完毕
close join
p.map(func, iterable, timeout=None, chunksize=1)

13. 协程
核心思想:实现单线程的高并发
1. 监测IO操作 gevent模块 spawn()
2. 及时切换 greenlet模块 greenlet, switch()
3. 记录状态 yield
from gevent import spawn
from gevent import monkey;monkey.patch_all()
p = spawn(func, *args, **kwargs)
p --> Greenlet对象
p.join()
from greenlet import greenlet
g = greenlet(fn)
g.switch(参数)

14. IO模型
阻塞IO
阻塞于等待数据和拷贝数据阶段
非阻塞IO
阻塞于拷贝数据阶段,在等待数据阶段直接返回一个值
recvfrom会不断的向内核请求数据,造成内核负载
多路复用IO
使用select监管机制来代替socket进行数据获取和等待,当内核中存在数据时
再使用recv来获取数据,对于单个链接,效率甚至不如,但是多个链接,可以节省出等待数据的阶段事件
异步IO
向内核提交获取数据请求后,不再原地等待,而是立即去执行其他任务,
当内核将数据拷贝到进程中,使用回调机制触发接收数据,并提醒当前进程
进程再过来处理数据
信号驱动IO

网络编程
一 客户端与服务端架构
C/S:Client --> Server
用户需要下载指定的客户端app,才能访问服务端
B/S:Browser --> Server
用户只需要通过浏览器就可以访问服务端

二 OSI七层模型
应用层:为应用程序提供网络服务
数据层:数据格式化 加密 解密
会话层:创建 维护 管理 会话连接
传输层:创建 维护 管理 端到端链接
网络层:IP寻址和路由选择
数据链路层:控制数据帧在数据链路上的传输
物理层:传输比特流

三 Socket简介
1. Socket就是将传输层及以下的数据传输过程封装成一个Socket模块
我们只需要调用这个模块即可实现基于IP+PORT的网络通信
2. Socket模块用法
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.AF_INET:网络通信
socket.AF_UNIX:本地主机上的不同端口间通信
SOCK_STREAM:TCP链接
SOCK_DGRAM:UDP链接
s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
s.bind((‘IP‘,Port))
s.listen(128)
a, b = accept()
a.send(byte)
a.recv(1024)
a.close()
s.close()
s.sendto(byte, (‘ip‘, port))
s.connect((‘IP‘,Port))
s.setblocking(False) 关闭阻塞,开启非阻塞IO
a, b = recvfrom(1024)
3. TCP服务器/客户端书写方式
TCP服务器
import socket
s = socket.socket()
s.bind(‘0.0.0.0‘, 8000)
s.listen(128)
while True:
a, b = s.accept()
while True:
data = a.recv(1024)
if not data:
break
print(data.decode(‘utf8‘), b)
a.send(data.upper())
a.close()
s.close()
TCP客户端
import socket
s = socket.socket()
s.connect((‘127.0.0.1‘,8000))
while True:
inp = input(‘输入>>>: ‘)
if not inp:
break
s.send(inp.encode(‘utf8‘))
print(s.recv(1024).decode(‘utf8‘))
s.close()
4. UDP客户端/服务器
UDP服务端
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind((‘0.0.0.0‘, 8000))
while True:
a, b = s.recvfrom(1024)
print(a.decode(‘utf8‘), b)
s.sendto(a.upper(), b)
s.close()
UDP客户端
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect((‘127.0.0.1‘, 8000))
while True:
inp = input(‘输入>>>: ‘)
if not inp:
break
s.send(inp.encode(‘utf8‘))
print(s.recv(1024).decode(‘utf8‘))

5. socketserver实现并发
TCP服务器并发
from socketserver import BaseRequestHandler, ThreadingTCPServer
class ceshi(BaseRequestHandler):
def handle(self):
a = self.request
b = self.client_address
while True:
data = a.recv(1024)
if not data:
break
print(data.decode(‘utf8‘), b)
a.send(data.upper())
a.close()
if __name__ == ‘__main__‘:
s = ThreadingTCPServer((‘0.0.0.0‘, 8000), ceshi)
s.serve_forever()
UDP服务器
from socketserver import BaseRequestHandler, ThreadingUDPSever
class ceshi(BaseRequestHandler):
def handle(self):
a, b = self.request
c = self.client_address
b.sendto(a.upper(), c)
print(a.decode(‘utf8‘))
s = ThreadingUDPServer((‘127.0.0.1‘, 8000), ceshi)
s.serve_forever()
6. TCP粘包问题解决
使用struct模块,在传入数据之前,先传入固定长度的数据,数据中包含了后面数据的长度即可
import struct
pack(模式, 数字)
unpack(模式, 数字)

四 HTTP协议
超文本传输协议,用于规定浏览器与服务端之间通信的数据格式
超文本传输协议是用来传输超文本标记语言
1. 四大特性
基于请求响应
基于TCP协议,处于应用层之上
无状态,不保存用户状态
随机字符串:用户相关信息(k:v)
session 将数据保存到服务端
cookie 将数据保存到浏览器
短链接,来一次我响应一次,之后不再建立关系
长连接:双方建立链接后不会断开链接,websocket

2. HTTP请求/响应格式
请求首行(标识请求方式,HTTP版本)
GET / HTTP/1.1
Host:ip地址
请求头 一大堆k-v键值对,需要使用\r\n结尾
\r\n
请求体:浏览器发送给服务端的敏感数据

响应首行(标识HTTP协议版本,状态码)
HTTP/1.1 200 OK
响应头(一大堆k-v键值对)
Content-Type:text/html
Charset:utf8
\r\n
响应体:服务端发送给浏览器用于展示给用户观看的数据

3.请求方式:
GET 浏览器向服务器请求数据
POST 浏览器向服务器提交数据
PUT 浏览器向服务器提交数据,用于全局更新数据
PATCH 浏览器向服务器提交数据,用于局部更新数据
DELETE 浏览器向服务器提交删除数据请求
HEAD OPTIONS TRACE

4. 响应状态码
1xx
2xx
200 OK

201 CREATE

204 NO CONTENT

202 Accepted

3xx
4xx
400 INVALID REQUEST
422 Unprocesable entity

401 Unauthorized
403 Forbidden
404 NOT FOUND

406 Not Acceptable
410 Gone
5xx
500 INTERNAL SERVER ERROR

5. URL
统一资源定位符
第一部分:协议
第二部分:ip/域名
第三部分:站点目录
第四部分:站点资源

绝对URL:HTTP://www.baidu.com/...
相对URL
锚URL

WSGI
WSGI是将socket进一步封装,使用户只需考虑逻辑业务的书写
原理:
第一步:浏览器向服务端发起请求,其中包含HTTP协议以及请求数据
第二步:WSGI Server将请求数据拆包,重新封装为一个environ字典,其中包含了服务器的系统环境变量,请求头,WSGI信息等
第三步:将environ和start_response函数传递给我们自定义的业务函数中,即WSGI APP
第四步:由start_response提供规范化HTTP响应头,并将我们逻辑业务函数的返回正文一同发送给浏览器

from wsgiref.simple_server import make_server
def xxx(environ, start_response):
start_response(‘200 OK‘, [(‘Content-Type‘, ‘text/html‘), (‘Charset‘, ‘utf8‘)])
return [byte,...]
if __name__ == ‘__main__‘:
s = make_server(‘ip‘, port, xxx)
s.serve_forever()

‘‘‘



2021/6/6 并发编程+网络编程

原文:https://www.cnblogs.com/zhangzhuowei/p/14854408.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!