首页 > 其他 > 详细

基于解决高并发生的产者消费者模型

时间:2020-08-25 15:45:55      阅读:92      评论:0      收藏:0      [点我收藏+]

生产者消费着模型

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

例:基于队列实现生产者消费者模型

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
  while True:
      res=q.get()
      time.sleep(random.randint(1,3))
      print(‘\033[45m%s 吃 %s\033[0m‘ %(os.getpid(),res))
?
def producer(q):
  for i in range(10):
      time.sleep(random.randint(1,3))
      res=‘包子%s‘ %i
      q.put(res)
      print(‘\033[44m%s 生产了 %s\033[0m‘ %(os.getpid(),res))
?
if __name__ == ‘__main__‘:
  q=Queue()
  #生产者们:即厨师们
  p1=Process(target=producer,args=(q,))
?
  #消费者们:即吃货们
  c1=Process(target=consumer,args=(q,))
?
  #开始
  p1.start()
  c1.start()
  print(‘主‘)

此时的问题是主进程永远不会结束,原因是:生产者p在生产完后就结束了,但是消费者c在取空了q之后,则一直处于死循环中且卡在q.get()这一步。

解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以break出死循环

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
  while True:
      res=q.get()
      if res is None:break #收到结束信号则结束
      time.sleep(random.randint(1,3))
      print(‘\033[45m%s 吃 %s\033[0m‘ %(os.getpid(),res))
?
def producer(q):
  for i in range(10):
      time.sleep(random.randint(1,3))
      res=‘包子%s‘ %i
      q.put(res)
      print(‘\033[44m%s 生产了 %s\033[0m‘ %(os.getpid(),res))
  q.put(None) #发送结束信号
if __name__ == ‘__main__‘:
  q=Queue()
  #生产者们:即厨师们
  p1=Process(target=producer,args=(q,))
?
  #消费者们:即吃货们
  c1=Process(target=consumer,args=(q,))
?
  #开始
  p1.start()
  c1.start()
  print(‘主‘)
   
    p1.join() #结束信号None,不一定要由生产者发,主进程里同样可以发,但主进程需要等生产者结束后才应该发送该信号
  q.put(None) #发送结束信号
  print(‘主‘)

但上述解决方式,在有多个生产者和多个消费者时,我们则需要用一个很low的方式去解决

其实我们的思路无非是发送结束信号而已,有另外一种队列提供了这种机制 如下:

技术分享图片
from multiprocessing import Process,JoinableQueue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print(\033[45m%s 吃 %s\033[0m %(os.getpid(),res))

        q.task_done() #向q.join()发送一次信号,证明一个数据已经被取走了

def producer(name,q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res=%s%s %(name,i)
        q.put(res)
        print(\033[44m%s 生产了 %s\033[0m %(os.getpid(),res))
    q.join()


if __name__ == __main__:
    q=JoinableQueue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=(包子,q))
    p2=Process(target=producer,args=(骨头,q))
    p3=Process(target=producer,args=(泔水,q))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))
    c2=Process(target=consumer,args=(q,))
    c1.daemon=True
    c2.daemon=True

    #开始
    p_l=[p1,p2,p3,c1,c2]
    for p in p_l:
        p.start()

    p1.join()
    p2.join()
    p3.join()
    print() 
    
    #主进程等--->p1,p2,p3等---->c1,c2
    #p1,p2,p3结束了,证明c1,c2肯定全都收完了p1,p2,p3发到队列的数据
    #因而c1,c2也没有存在的价值了,应该随着主进程的结束而结束,所以设置成守护进程
View Code

 

 

基于解决高并发生的产者消费者模型

原文:https://www.cnblogs.com/kylin5201314/p/13559806.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!