首页 > 编程语言 > 详细

python语法基础-并发编程-进程-进程锁和进程间通信

时间:2020-02-28 11:19:52      阅读:70      评论:0      收藏:0      [点我收藏+]

###############   守护进程  ##############

"""
守护进程

父进程中将一个子进程设置为守护进程,那么这个子进程会随着主进程的结束而结束。
主进程创建守护进程
其一:守护进程会在主进程代码执行结束后就终止
其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children
注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止

"""
# 第一版:主进程结束了,子进程还没有结束,
# import time
# from multiprocessing import Process
#
# def func():
#     while True:
#         time.sleep(1)
#         print("我还活着")
#
#
# if __name__ == ‘__main__‘:
#     p=Process(target=func)
#     p.start()
#     i = 0
#     while i<10:
#         time.sleep(1)
#         i+=1
#     print("主进程结束")


# 守护进程,就是主进程代码结束了而结束,记住不是主进程彻底结束,而是代码结束,
import time
from multiprocessing import Process


def func():
    while True:
        time.sleep(1)
        print("我还活着")


if __name__ == __main__:
    p = Process(target=func)
    p.daemon = True  # 设置子进程为守护进程, #一定要在p.start()前设置,设置p为守护进程
    p.start()
    i = 0
    while i < 5:
        time.sleep(1)
        i += 1
    print("主进程代码结束")

其他的方法:

from multiprocessing import Process
import time
def func(name):
    print("%s在test...."%name)

if __name__ == "__main__":
    p = Process(target=func,args=("andy",))
    p.start()
    print(p.is_alive())  # # 判断一个进程是否活着
    p.terminate()  # 结束一个进程,
    time.sleep(1)
    print(p.is_alive())

 

##################       进程锁              #####################

"""
互斥锁:
通过刚刚的学习,我们千方百计实现了程序的异步,让多个任务可以同时在几个进程中并发处理, 他们之间的运行没有顺序,一旦开启也不受我们控制。 尽管并发编程让我们能更加充分的利用IO资源,但是也给我们带来了新的问题。 当多个进程使用同一份数据资源的时候,就会因为竞争而引发数据安全或顺序混乱问题。
"""

下面的代码演示了不同的任务争抢一个资源(终端输出)的场景。

from multiprocessing import Process
import time
import random


def task1():
    print(这是 task1 任务.center(30, -))
    print(task1 进了洗手间)
    time.sleep(random.randint(1, 3))
    print(task1 办事呢...)
    time.sleep(random.randint(1, 3))
    print(task1 走出了洗手间)


def task2():
    print(这是 task2 任务.center(30, -))
    print(task2 进了洗手间)
    time.sleep(random.randint(1, 3))
    print(task2 办事呢...)
    time.sleep(random.randint(1, 3))
    print(task2 走出了洗手间)


def task3():
    print(这是 task3 任务.center(30, -))
    print(task3 进了洗手间)
    time.sleep(random.randint(1, 3))
    print(task3 办事呢...)
    time.sleep(random.randint(1, 3))
    print(task3 走出了洗手间)


if __name__ == __main__:
    p1 = Process(target=task1)
    p2 = Process(target=task2)
    p3 = Process(target=task3)

    p1.start()
    p2.start()
    p3.start()

"""
---------这是 task1 任务----------
task1 进了洗手间
---------这是 task2 任务----------
task2 进了洗手间
---------这是 task3 任务----------
task3 进了洗手间
task3 办事呢...
task1 办事呢...
task3 走出了洗手间
task2 办事呢...
task2 走出了洗手间
task1 走出了洗手间
"""

通过加锁来控制

from multiprocessing import Process, Lock
import time
import random

# 生成一个互斥锁
mutex_lock = Lock()


def task1(lock):
    # 锁门
    lock.acquire()
    print(这是 task1 任务.center(30, -))
    print(task1 进了洗手间)
    time.sleep(random.randint(1, 3))
    print(task1 办事呢...)
    time.sleep(random.randint(1, 3))
    print(task1 走出了洗手间)
    # 释放锁
    lock.release()


def task2(lock):
    # 锁门
    lock.acquire()
    print(这是 task2 任务.center(30, -))
    print(task2 进了洗手间)
    time.sleep(random.randint(1, 3))
    print(task2 办事呢...)
    time.sleep(random.randint(1, 3))
    print(task2 走出了洗手间)
    # 释放锁
    lock.release()


def task3(lock):
    # 锁门
    lock.acquire()
    print(这是 task3 任务.center(30, -))
    print(task3 进了洗手间)
    time.sleep(random.randint(1, 3))
    print(task3 办事呢...)
    time.sleep(random.randint(1, 3))
    print(task3 走出了洗手间)
    # 释放锁
    lock.release()


if __name__ == __main__:
    p1 = Process(target=task1, args=(mutex_lock, ))
    p2 = Process(target=task2, args=(mutex_lock, ))
    p3 = Process(target=task3, args=(mutex_lock, ))

    # 释放新建进程的信号,具体谁先启动无法确定
    p1.start()
    p2.start()
    p3.start()

"""
---------这是 task2 任务----------
task2 进了洗手间
task2 办事呢...
task2 走出了洗手间
---------这是 task1 任务----------
task1 进了洗手间
task1 办事呢...
task1 走出了洗手间
---------这是 task3 任务----------
task3 进了洗手间
task3 办事呢...
task3 走出了洗手间


"""

 

买票的案例:

并发出错:

from multiprocessing import Process, Lock
import json
import time
import random
import os


def search():
    time.sleep(0.5)
    with open(db.json, r, encoding=utf8) as f:
        data = json.load(f)
        print(剩余票数:{}.format(data.get(count)))


def buy():

    with open(db.json, r, encoding=utf8) as f:
        data = json.load(f)
    if data.get(count, 0) > 0:
        data[count] -= 1
        time.sleep(random.randint(1, 3))
        with open(db.json, w, encoding=utf8) as f2:
            json.dump(data, f2)
        print({}购票成功!.format(os.getpid()))
    else:
        print(购票失败)


def task():
    search()  # 查票并发
    buy()  # 串行买票


if __name__ == __main__:
    for i in range(10):
        p = Process(target=task)
        p.start()

加上锁

from multiprocessing import Process, Lock
import json
import time
import random
import os

# 设置互斥锁
mutex_lock = Lock()


def search():
    time.sleep(0.5)
    with open(db.json, r, encoding=utf8) as f:
        data = json.load(f)
        print(剩余票数:{}.format(data.get(count)))


def buy():

    with open(db.json, r, encoding=utf8) as f:
        data = json.load(f)
    if data.get(count, 0) > 0:
        data[count] -= 1
        time.sleep(random.randint(1, 3))
        with open(db.json, w, encoding=utf8) as f2:
            json.dump(data, f2)
        print({}购票成功!.format(os.getpid()))
    else:
        print(购票失败)


def task(lock):
    search()  # 查票并发
    lock.acquire()
    buy()  # 串行买票
    lock.release()


if __name__ == __main__:
    for i in range(10):
        p = Process(target=task, args=(mutex_lock, ))
        p.start()

 

###############         进程间的通信         ##############

"""
进程间的三种通信(IPC)方式:

方式一:队列(推荐使用)

方式二:管道(不推荐使用,了解即可)
管道相当于队列,但是管道不自动加锁

方式三:共享数据(不推荐使用,了解即可)
共享数据也没有自动加锁的功能,所以还是推荐用队列的。感兴趣的可以研究研究管道和共享数据

"""

 

###############    进程间的通信---队列   ##############

"""
Queue介绍
我们可以创建一个共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。 Queue的实例q常用方法: ################################### Queue([maxsize]) 创建共享的进程队列。 参数 :maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。 底层队列使用管道和锁定实现。 q.get( [ block [ ,timeout ] ] ) 返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。 block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。 timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。 q.get_nowait( ) 同q.get(False)方法。 q.put(item [, block [,timeout ] ] ) 将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。 block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。 timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。 q.qsize() 返回队列中目前项目的正确数量。 此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。 在某些系统上,此方法可能引发NotImplementedError异常。 q.empty() 如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。 也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。 q.full() 如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。。
"""

基本的队列操作:

‘‘‘
multiprocessing模块支持进程间通信的两种主要形式:管道和队列
都是基于消息传递实现的,但是队列接口
‘‘‘

from multiprocessing import Queue
q=Queue(3)

#put ,get ,put_nowait,get_nowait,full,empty
q.put(3)
q.put(3)
q.put(3)
# q.put(3)   # 如果队列已经满了,程序就会停在这里,等待数据被别人取走,再将数据放入队列。
           # 如果队列中的数据一直不被取走,程序就会永远停在这里。
try:
    q.put_nowait(3) # 可以使用put_nowait,如果队列满了不会阻塞,但是会因为队列满了而报错。
except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去,但是会丢掉这个消息。
    print(队列已经满了)

# 因此,我们再放入数据之前,可以先看一下队列的状态,如果已经满了,就不继续put了。
print(q.full()) #满了

print(q.get())
print(q.get())
print(q.get())
# print(q.get()) # 同put方法一样,如果队列已经空了,那么继续取就会出现阻塞。
try:
    q.get_nowait(3) # 可以使用get_nowait,如果队列满了不会阻塞,但是会因为没取到值而报错。
except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去。
    print(队列已经空了)

print(q.empty()) #空了

上面还没有设计到进程间的通信,下面看一个简单的主进程和子进程之间通信的例子:

import time
from multiprocessing import Process, Queue

def f(q):
    q.put([time.asctime(), hi, hello])  #调用主函数中p进程传递过来的进程参数 put函数为向队列中添加一条数据。

if __name__ == __main__:
    q = Queue() #创建一个Queue对象
    p = Process(target=f, args=(q,)) #创建一个进程
    p.start()
    print(q.get())
    p.join()

 

##################################################

"""
生产者消费者模型
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。
该模式通过平衡生产数据和消费数据的工作能力来提高程序的整体处理数据的速度。

为什么要使用生产者和消费者模式?
生产者就是生产数据的一方,消费者就是消费数据的一方。
通常生产者和消费者的能力很难协调,例如:如果生产者处理速度很快,而消费者处理速度很慢,
那么生产者就必须等待消费者处理完,才能继续生产数据。
同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。
为了解决这个问题于是引入了生产者和消费者模式。

什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,
所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,
消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力

"""

 

 

###############    进程间的通信---队列   ##############

# 队列的生产者和消费者模型
# 买包子的例子
# 有蒸包子的人,这就是生产者,有买包子的人,这就是消费者,
# 实际中,可能会有数据供需不平衡的问题,
# 就是数据生产的多了没有消费,所以我们要增加消费者,或者减少生产
# 数据消费的多了,我们要增加生产者,来解决这个问题,

# 我们把生产者作为一个进程,把消费者作为一个进程

from multiprocessing import Process,Queue,JoinableQueue
import time, random
def producer(name,food,q):  # 三个参数,就是谁生产,生产了什么,放到哪里
    for i in range(10):
        time.sleep(random.randint(1,3))  # 1-3秒生产1个,
        f = "%s生产了%s%s"%(name,food,i)
        print(f)
        q.put(f)
    q.join()  # 阻塞, 这是感知一个队列中的数据全部都处理完毕,
    # 这种相当于把生产的生命周期拉长了,就是说你是生产完了还没有结束,你还要等待消费者把你生产的所有的东西都消费了,才能结束,

def consumer(q,name):
    while True:
        food =q.get()
        if food is None:  # 问题:有多个消费者的时候,只有一个消费者拿到这个值,然后结束了,但是拿不到的消费者,就还没有结束,
            print("获取到一个空")
            break
        print("%s消费了%s"%(name,food))
        time.sleep(random.randint(1,3))  # 1-3秒消费1个,
        q.task_done()  # 队列的计数器 -1


if __name__ == ‘__main__‘:
    # q = Queue()

    q = JoinableQueue()

    p1 = Process(target=producer,args=("andy","包子",q))
    p1.start()

    p2 = Process(target=producer,args=("Lucy","油条",q))
    p2.start()

    c1 = Process(target=consumer,args=(q,"xiaoxiao"))
    c1.daemon =True
    # 意味着,主进程的代码执行结束之后,子进程就结束了,
    # 而主进程又是依赖两个生产者结束才结束的,
    # 而我在生产者的地方加了一个阻塞,直到消费者全都消费了之后才结束,
    # 所以这个设计是非常的巧妙的,
    c1.start()
    # 只有一个消费者,两个生产者, 所以会有供给过大,需要加一个消费者,

    c2 = Process(target=consumer,args=(q,"meimei"))
    c2.daemon =True
    c2.start()

    # 因为只会生产10个,所以怎么能够,没有生产了,但是消费的地方还在get,怎么办?
    p1.join()
    p2.join()
    # q.put(None)
    # q.put(None)
    # 为什么是两个none?
    # 问题:有多个消费者的时候,只有一个消费者拿到这个值,然后结束了,但是拿不到的消费者,就还没有结束,
    # 这种写法太麻烦了,如果有1000个还得了,怎么办?使用新的一个模块:JoinableQueue
    # 做了三件事;
    # 1,把c1,c2,改成守护进程
    # 2,把生产者加一个q.join(),直到消费者全部消费结束
    # 3,加了一个        q.task_done()  # 队列的计数器 -1

 # 基于队列实现生产者消费者模型

# 基于队列实现生产者消费者模型

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print(\033[45m%s 吃 %s\033[0m %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res=包子%s %i
        q.put(res)
        print(\033[44m%s 生产了 %s\033[0m %(os.getpid(),res))

if __name__ == __main__:
    q=Queue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=(q,))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))

    #开始
    p1.start()
    c1.start()
    print()

此时的问题是主进程永远不会结束,原因是:生产者p在生产完后就结束了,但是消费者c在取空了q之后,则一直处于死循环中且卡在q.get()这一步。

解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以break出死循环。

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到结束信号则结束
        time.sleep(random.randint(1,3))
        print(\033[45m%s 吃 %s\033[0m %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res=包子%s %i
        q.put(res)
        print(\033[44m%s 生产了 %s\033[0m %(os.getpid(),res))
    q.put(None) #发送结束信号
if __name__ == __main__:
    q=Queue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=(q,))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))

    #开始
    p1.start()
    c1.start()
    print()

注意:结束信号None,不一定要由生产者发,主进程里同样可以发,但主进程需要等生产者结束后才应该发送该信号

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到结束信号则结束
        time.sleep(random.randint(1,3))
        print(\033[45m%s 吃 %s\033[0m %(os.getpid(),res))

def producer(q):
    for i in range(2):
        time.sleep(random.randint(1,3))
        res=包子%s %i
        q.put(res)
        print(\033[44m%s 生产了 %s\033[0m %(os.getpid(),res))

if __name__ == __main__:
    q=Queue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=(q,))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))

    #开始
    p1.start()
    c1.start()

    p1.join()
    q.put(None) #发送结束信号
    print()

但上述解决方式,在有多个生产者和多个消费者时,我们则需要用一个很low的方式去解决

 
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到结束信号则结束
        time.sleep(random.randint(1,3))
        print(\033[45m%s 吃 %s\033[0m %(os.getpid(),res))

def producer(name,q):
    for i in range(2):
        time.sleep(random.randint(1,3))
        res=%s%s %(name,i)
        q.put(res)
        print(\033[44m%s 生产了 %s\033[0m %(os.getpid(),res))

if __name__ == __main__:
    q=Queue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=(包子,q))
    p2=Process(target=producer,args=(骨头,q))
    p3=Process(target=producer,args=(泔水,q))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))
    c2=Process(target=consumer,args=(q,))

    #开始
    p1.start()
    p2.start()
    p3.start()
    c1.start()

    p1.join() #必须保证生产者全部生产完毕,才应该发送结束信号
    p2.join()
    p3.join()
    q.put(None) #有几个消费者就应该发送几次结束信号None
    q.put(None) #发送结束信号
    print()

JoinableQueue([maxsize]) 
创建可连接的共享进程队列。这就像是一个Queue对象,但队列允许项目的使用者通知生产者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。 

"""
JoinableQueue的实例p除了与Queue对象相同的方法之外,还具有以下方法:

q.task_done()
使用者使用此方法发出信号,表示q.get()
返回的项目已经被处理。如果调用此方法的次数大于从队列中删除的项目数量,将引发ValueError异常。

q.join()
生产者将使用此方法进行阻塞,直到队列中所有项目均被处理。阻塞将持续到为队列中的每个项目均调用q.task_done()
方法为止。
下面的例子说明如何建立永远运行的进程,使用和处理队列上的项目。生产者将项目放入队列,并等待它们被处理。

"""

from multiprocessing import Process, JoinableQueue
import time, random, os


def consumer(q):
    while True:
        res = q.get()
        time.sleep(random.randint(1, 3))
        print(\033[45m%s 吃 %s\033[0m % (os.getpid(), res))
        q.task_done()  # 向q.join()发送一次信号,证明一个数据已经被取走了


def producer(name, q):
    for i in range(10):
        time.sleep(random.randint(1, 3))
        res = %s%s % (name, i)
        q.put(res)
        print(\033[44m%s 生产了 %s\033[0m % (os.getpid(), res))
    q.join()  # 生产完毕,使用此方法进行阻塞,直到队列中所有项目均被处理。


if __name__ == __main__:
    q = JoinableQueue()
    # 生产者们:即厨师们
    p1 = Process(target=producer, args=(包子, q))
    p2 = Process(target=producer, args=(骨头, q))
    p3 = Process(target=producer, args=(泔水, q))

    # 消费者们:即吃货们
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))
    c1.daemon = True
    c2.daemon = True

    # 开始
    p_l = [p1, p2, p3, c1, c2]
    for p in p_l:
        p.start()

    p1.join()
    p2.join()
    p3.join()
    print()

    # 主进程等--->p1,p2,p3等---->c1,c2
    # p1,p2,p3结束了,证明c1,c2肯定全都收完了p1,p2,p3发到队列的数据
    # 因而c1,c2也没有存在的价值了,不需要继续阻塞在进程中影响主进程了。应该随着主进程的结束而结束,所以设置成守护进程就可以了。

 

###################################################

python语法基础-并发编程-进程-进程锁和进程间通信

原文:https://www.cnblogs.com/andy0816/p/12289717.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!