首页 > 编程语言 > 详细

python并发编程-进程池线程池-协程-I/O-04

时间:2019-08-15 16:48:57      阅读:76      评论:0      收藏:0      [点我收藏+]

进程池线程池的使用*****

无论是开线程还是开进程都会消耗资源,即使开线程消耗的资远比开进程的少

而物理设备的性能是有限的,虽然可以加设备来提升上限,但如果像淘宝双十一那样,只有很少的时刻需要大量的资源,为了满足这个去买一大堆服务器显然是不划算的

(计算机中)池的目的:在保证计算机硬件安全的情况下最大限度的利用计算机硬件,池其实是降低了程序的运行效率,但是保证了计算机硬件的安全(硬件的发展跟不上软件的速度)

进程池线程池的目的:为了限制开设的进程数和线程数,从而保证计算机硬件的安全

进程池/线程池的创建和提交回调

import random
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor


def task(i):
    time.sleep(random.random())
    print(f"{i} is over...")
    return f"{i}2 = {i * i}"


if __name__ == '__main__':  # 进程池的时候一定要放在这里面
    '''不放报错 concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.'''

    # -------------------------------------------------
    # 1.实例化进程池/线程池对象,并限制进程池/线程池中进程/线程数量
    # -------------------------------------------------
    # pool = ThreadPoolExecutor(3, 'MyThread-')  # 不指定参数的情况下,默认是当前 CPU个数*5 , 也可以指定线程个数
    pool = ProcessPoolExecutor(3)  # 不指定参数的情况下,默认是当前 CPU个数 , 也可以指定进程个数(创进程不能传第二个参数)

    # for i in range(5):
    #     # -------------------------------------------------
    #     # 2.线程池对象.submit() 异步提交任务
    #     #   提交任务的两种方式
    #     #       同步:提交完任务之后,在原地等待任务的返回结果,再继续执行下一步代码
    #     #       异步:提交任务之后,不等待任务的返回结果(这个结果怎么拿?),直接进行下一步操作
    #     # -------------------------------------------------
    #     pool.submit(task, i)
    # print("主")
    #
    # # 0 is running...
    # # 1 is running...
    # # 2 is running...
    # # 主
    # # 1 is over...
    # # 3 is running...
    # # 0 is over...
    # # 4 is running...
    # # 4 is over...
    # # 3 is over...
    # # 2 is over...

    # for i in range(5):
    #     future = pool.submit(task, i)
    #     # print(future)  # <Future at 0x21a130dbb00 state=running>   <Future at 0x21a1321ec50 state=pending>
    #     # -------------------------------------------------
    #     # future = pool.submit(task, i)
    #     # future.result()       接收返回值并获取回调值
    #     # -------------------------------------------------
    #     print(future.result())
    # print("主")
    # # 0 is running...
    # # 0 is over...
    # # 02 = 0
    # # 1 is running...
    # # 1 is over...
    # # 12 = 1
    # # 2 is running...
    # # 2 is over...
    # # 22 = 4
    # # 3 is running...
    # # 3 is over...
    # # 32 = 9
    # # 4 is running...
    # # 4 is over...
    # # 42 = 16
    # # 主

    # future_list = []
    # for i in range(5):
    #     future = pool.submit(task, i)
    #     future_list.append(future)
    #
    # for future in future_list:
    #     print(f">>:{future.result()}")  # 依次等每个 future的结果,所以是绝对有序的
    # print("主")
    # # 0 is running...
    # # 1 is running...
    # # 2 is running...
    # # 0 is over...
    # # 3 is running...
    # # >>:02 = 0
    # # 1 is over...
    # # 4 is running...
    # # >>:12 = 1
    # # 4 is over...
    # # 2 is over...
    # # >>:22 = 4
    # # 3 is over...
    # # >>:32 = 9
    # # >>:42 = 16
    # # 主

    future_list = []
    for i in range(5):
        future = pool.submit(task, i)
        future_list.append(future)

    pool.shutdown()  # 关闭池子且等待池子中所有的任务运行完毕

    for future in future_list:
        print(f">>:{future.result()}")  # 依次等每个 future的结果,所以是绝对有序的
    print("主")
    # 0 is running...
    # 1 is running...
    # 2 is running...
    # 2 is over...
    # 3 is running...
    # 0 is over...
    # 4 is running...
    # 4 is over...
    # 1 is over...
    # 3 is over...
    # >>:02 = 0
    # >>:12 = 1
    # >>:22 = 4
    # >>:32 = 9
    # >>:42 = 16
    # 主

验证复用池子里的线程或进程

池子中创建的进程或线程创建一次就不会再创建了,至始至终用的都是最初的那几个,这样的话就可以节省反复开辟进程或线程的资源了

不是动态创建动态销毁的(如果是好几百个,可想而知)

import random
import time
import os
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from threading import current_thread


def task(i):
    time.sleep(random.random())
    # print(f"{os.getpid()} {i} is over...")
    print(f"{os.getpid()} {current_thread().name} {i} is over...")
    return f"{i}2 = {i * i}"


if __name__ == '__main__':  # 进程池的时候一定要放在这里面
    # pool = ProcessPoolExecutor(3)
    pool = ThreadPoolExecutor(3, 'MyThreading')

    future_list = []
    for i in range(5):
        future = pool.submit(task, i)
        future_list.append(future)

    pool.shutdown()  # 关闭池子且等待池子中所有的任务运行完毕

    for future in future_list:
        print(f">>:{future.result()}")  # 依次等每个 future的结果,所以是绝对有序的
    print("主")
# 11000 0 is over...  # 复用了进程号(即没有去开辟新的内存空间)
# 8024 2 is over...
# 10100 1 is over...
# 11000 3 is over...
# 8024 4 is over...
# >>:02 = 0
# >>:12 = 1
# >>:22 = 4
# >>:32 = 9
# >>:42 = 16
# 主


# 使用线程池的打印结果
# 13024 MyThreading_1 1 is over...  # 1.复用了线程
# 13024 MyThreading_1 3 is over...  # 2.复用了线程
# 13024 MyThreading_2 2 is over...
# 13024 MyThreading_0 0 is over...
# 13024 MyThreading_1 4 is over...
# >>:02 = 0
# >>:12 = 1
# >>:22 = 4
# >>:32 = 9
# >>:42 = 16
# 主

异步回调机制

这(.add_done_callback())其实是 .submit() 返回结果对象的方法

异步回调机制:当异步提交的任务有返回结果之后,会自动触发回调函数的执行

import random
import time
import os
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from threading import current_thread


def callback(future):
    print(f"我拿到了回调结果:{future.result()}")


def task(i):
    time.sleep(random.random())
    # print(f"{os.getpid()} {i} is over...")
    print(f"{os.getpid()} {current_thread().name} {i} is over...")
    return f"{i}2 = {i * i}"


if __name__ == '__main__':  # 进程池的时候一定要放在这里面
    # pool = ProcessPoolExecutor(3)
    pool = ThreadPoolExecutor(3, 'MyThreading')

    future_list = []
    for i in range(5):
        # -----------------------------------------------------
        # .submit().add_done_callback() 自动调用回调函数
        #   会自动将 .submit()的返回结果作为参数传给.add_done_callback() 中传入的函数去调用执行
        #       .add_done_callback() 其实是 .submit()返回对象自身的方法
        # -----------------------------------------------------
        future = pool.submit(task, i).add_done_callback(callback)
        future_list.append(future)

    pool.shutdown()  # 关闭池子且等待池子中所有的任务运行完毕

    print("主")

# 11348 MyThreading_0 0 is over...
# 我拿到了回调结果:02 = 0
# 11348 MyThreading_2 2 is over...
# 我拿到了回调结果:22 = 4
# 11348 MyThreading_0 3 is over...
# 我拿到了回调结果:32 = 9
# 11348 MyThreading_1 1 is over...
# 我拿到了回调结果:12 = 1
# 11348 MyThreading_2 4 is over...
# 我拿到了回调结果:42 = 16
# 主

通过闭包给回调函数添加额外参数(扩展)

# 省略导模块等
# 线程池/进程池对象.submit() 会返回一个 future对象,该对象有.add_done_callback()方法(是一个对象绑定函数),参数是一个函数名(除了对象自身默认传入,无法为该函数传参)
# 这里利用闭包函数返回内部函数名的特点 直接调用这个闭包函数,达到传参的效果,可为回调函数添加更多的扩展性
def outter(*args, **kwargs):
    def callback(res):
        # 可以拿到 *args, **kwargs 参数做一些事情
        print(res.result())
    return callback


pool_list = []
for i in range(15):
    pool_list.append(pool.submit(task, i).add_done_callback(outter(1, 2, 3, a=1, c=3)))  # 朝线程池中提交任务(异步)

协程***

后期项目支持高并发可能才会用到

概念回顾(协程这里再理一下)

进程:资源单位(车间)

线程:操作系统的最小执行单位(流水线)

协程:单线程下实现并发的效果(完全是技术人员编造出来的名词)

并发:看起来像同时执行(多道技术核心:切换+保存状态)

协程:通过代码层面自己监测程序中的I/O行为,自己实现切换,让操作系统误认为这个线程没有I/O,从而保证程序在运行态和就绪态来回切换(不进入阻塞态),更大限度地利用CPU,最大程度上提高线程的执行效率

切换+保存状态就一定能够提升效率吗?

? 切换+保存状态 不一定能提升程序的效率

  • 当任务是计算密集型,反而会降低效率
  • 如果是IO密集型,会提升效率

如何实现协程

生成器的yield 可以实现保存状态(行不通)

但,效率更低了

# # 串行执行
# import time
#
#
# def func1():
#     for i in range(10000000):
#         i + 1
#
#
# def func2():
#     for i in range(10000000):
#         i + 1
#
#
# start = time.time()
# func1()
# func2()
# stop = time.time()
# print(stop - start)
# # 1.2481744289398193


# 基于yield并发执行
import time


def func1():
    while True:
        10000000 + 1
        yield


def func2():
    g = func1()
    for i in range(10000000):
        i + 1
        next(g)


start = time.time()
func2()
stop = time.time()
print(stop - start)
# 1.9084477424621582

gevent模块实现

模块安装下载

技术分享图片

搜索并下载(这里是因为我配了两个镜像源,所以出来了两个选项,随便选一个)

技术分享图片

gevent基本介绍

from gevent import spawn, monkey
monkey.patch_all()  # 一般这个要写在很前面(例如导socket模块之前)
# 两行亦可写成一行 from gevent import monkey;monkey.patch_all()

g1 = spawn(eat, 1, 2, 3, x=4, y=5)
# 创建一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面是该函数(eat)所需要的参数
g2 = spawn(func2)

g1.join()  # 等待协程g1结束
g2.join()  # 等待协程g2结束
# 上述两步亦可合作一步:joinall([g1,g2])

g1.value  # 拿到func1的返回值

通过gevent实现遇到 IO自动切换状态(单线程下并发)

import time

from gevent import spawn
# gevent 本身识别不了time.sleep() 等不属于该模块内的I/O操作
# 使用下面的操作来支持
from gevent import monkey
monkey.patch_all()  # 监测代码中所有 I/O 行为


def heng(name):
    print(f"{name} 哼")
    time.sleep(2)
    print(f"{name} 哼 ...")


def ha(name):
    print(f"{name} 哈")
    time.sleep(3)
    print(f"{name} 哈 ...")


# start_time = time.time()
# heng('egon')
# ha('jason')
# print(f"主 {time.time() - start_time}")
# # 主 5.005069732666016


start_time = time.time()
s1 = spawn(heng, 'egon')
s2 = spawn(ha, 'jason')
s1.join()
s2.join()

print(f"主 {time.time() - start_time}")
# 主 3.0046989917755127

在计算密集型任务中使用

from gevent import spawn, monkey

monkey.patch_all()

import time


def func1():
    for i in range(10000000):
        i + 1


def func2():
    for i in range(10000000):
        i + 1


start = time.time()
g = spawn(func1)
g2 = spawn(func2)
g.join()
g2.join()
stop = time.time()
print(stop - start)
# 1.1324069499969482

# 与前面普通的串行执行时间 1.2481744289398193 相近

利用gevent在单线程下实现并发

服务端

import socket
from gevent import spawn
from gevent import monkey  # 让 gevent 能够识别python的 IO
monkey.patch_all()

server = socket.socket()
server.bind(('127.0.0.1', 8080))
server.listen(5)


def talk(conn):
    while True:
        try:
            data = conn.recv(1024)
            if len(data) == 0: break
            print(data.decode('utf-8'))
            conn.send(data.upper())
        except ConnectionResetError as e:
            print(e)
            break
    conn.close()


def wait_client_connect():
    while True:
        conn, addr = server.accept()
        spawn(talk, conn)


if __name__ == '__main__':
    g1 = spawn(wait_client_connect)
    g1.join()  # 别忘了加上

客户端

import socket
from threading import Thread, current_thread


def create_client():
    client = socket.socket()
    client.connect(('127.0.0.1', 8080))
    n = 0
    while True:
        data = '%s %s' % (current_thread().name, n)
        client.send(data.encode('utf-8'))
        res = client.recv(1024)
        print(res.decode('utf-8'))
        n += 1


for i in range(400):  # 手动开400个线程连接客户端(测试的是服务端单线程实现并发)
    t = Thread(target=create_client)
    t.start()

最大程度下提高代码的执行效率(实现高并发)

  • 多进程下使用多线程
  • 多线程下使用多协程

大前提

IO密集型任务

I/O 模型

阻塞I/O模型

技术分享图片

程序间数据交互,本质上数据都是从内存中取的

技术分享图片

recvfrom 会告诉系统拿数据,系统(可以用RP把这一块重画下)

非阻塞I/O模型

技术分享图片

技术分享图片

多路复用I/O模型

技术分享图片

技术分享图片

信号驱动I/O模型

暂不了解

异步I/O模型

技术分享图片

技术分享图片

python并发编程-进程池线程池-协程-I/O-04

原文:https://www.cnblogs.com/suwanbin/p/11358189.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!