进程:
进程是正在进行的一个过程或者一个任务。负责执行这个任务的是cpu
进程与程序的区别:
程序:仅仅只是一堆代码
进程:是程序运行的过程
并发与并行
并发只能达到伪并行,看上去是同时进行的,其实并不是,单个cpu+多道技术就可以实现并发
并行:是同时运行,必须具备多个cpu,才能实现并行的功能
进程的状态
其实在两种情况下会导致一个进程在逻辑上不能运行,
1、进程挂起是自身原因,遇到I/O阻塞,便要让出CPU让其他进程去执行,这样保证CPU一直在工作
2、与进程无关,是操作系统层面,可能会因为一个进程占用时间过多,或者优先级等原因,而调用其他的进程去使用CPU。
开启进程的两种方式
# 1、第一种方式
from multiprocessing import Process,current_process
def task(name):
print("%s is running, 进程名:%s" % (name, current_process().name))
if __name__ == ‘__main__‘:
p = Process(target=task, args=(1,) ,name="子进程")
p.start() # 向操作系统发送一个开启一个进程的请求
print("主")
‘‘‘
打印结果:
主
1 is running, 进程名:子进程
‘‘‘
# 2、第二种方式
class MyProcess(Process):
def __init__(self, process, name):
super().__init__()
self.process = process
self.name = name
def run(self): # 将需要调用进程的方法写入run中, 函数名run不能修改
print("%s is running, 进程名:%s" % (self.process, current_process().name))
if __name__ == ‘__main__‘:
p = MyProcess("xu","子进程")
p.start()
print("主")
‘‘‘
打印结果:
主
1 is running, 进程名:子进程
‘‘‘
进程的join方法:
# join方法
from multiprocessing import Process, current_process
import time
def task(name):
print("%s is running, 进程名:%s" % (name, current_process().name))
time.sleep(1)
‘‘‘
执行结果:
1 is running, 进程名:子进程1
1 is end, 进程名:子进程1
主
‘‘‘
进程的p.join()方法是当p进程结束之后,才会执行join方法后面的程序
# Process的其他属性和方法
1、terminate和is_alive
from multiprocessing import Process
import time
import random
def task(name):
print(‘%s is piaoing‘ %name)
time.sleep(random.randrange(1,5))
print(‘%s is piao end‘ %name)
if __name__ == ‘__main__‘:
p1=Process(target=task,args=(‘egon‘,))
p1.start()
p1.terminate()#关闭进程,不会立即关闭,所以is_alive立刻查看的结果可能还是存活
print(p1.is_alive()) #结果为True
# is_alive()为判断进程是否还存活
print(‘主‘)
print(p1.is_alive()) # 此时p1进程已经关闭,所以结果为False
‘‘‘
执行结果:
True
主
False
‘‘‘
2、name与pid
from multiprocessing import Process
import time
import random
import os
def task(name):
print(‘%s is running‘ %name)
time.sleep(random.randrange(1,5))
print("父进程pid:%s, 子进程pid:%s" %(os.getppid(), os.getpid()))
print(‘%s is end‘ % name)
if __name__ == ‘__main__‘:
p1=Process(target=task,args=(‘xu‘,), name="子进程1") # name属性设置进程的名称
p1.start()
print(p1.name) # 打印p1进程的名称
print(‘主‘, os.getpid())
‘‘‘
打印结果:
子进程1
主 15752
xu is running
父进程pid:15752, 子进程pid:15753
xu is end
‘‘‘
守护进程
1、守护进程会在主进程执行结束后结束。
2、守护进程内无法再开启子进程,否则会抛出异常。
from multiprocessing import Process,current_process
import time
def task():
print("%s is running" % current_process().name)
time.sleep(2)
print("%s is end" % current_process().name)
if __name__ == ‘__main__‘:
p1 = Process(target=task, name="进程1")
p1.daemon = True # 开启守护进程,开启的动作必须要在start之前
p1.start()
print("主") # 当此程序执行后,主进程便执行结束了,那么守护线程也需要结束
‘‘‘
打印结果:
主
‘‘‘
互斥锁
进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,而共享带来的是竞争,竞争带来的结果就是错乱
from multiprocessing import Process, current_process
import time
def task():
print("%s is running" % current_process().name)
time.sleep(2)
print("%s is end" % current_process().name)
if __name__ == ‘__main__‘:
for i in range(3):
p = Process(target=task, name="进程%s" %i)
p.start()
‘‘‘
打印结果:
进程0 is running
进程1 is running
进程2 is running
进程0 is end
进程1 is end
进程2 is end
‘‘‘
如果没有发生错乱,那么应该打印结果为:
进程0 is running
进程0 is end
进程1 is running
进程1 is end
进程2 is running
进程2 is end
如何控制其行为呢,就需要加锁,而这把锁就为互斥锁
from multiprocessing import Process, current_process, Lock
import time
def task(mutex):
mutex.acquire()
print("%s is running" % current_process().name)
time.sleep(2)
print("%s is end" % current_process().name)
mutex.release()
if __name__ == ‘__main__‘:
mutex = Lock() # 创建互斥锁对象,需要保证每个进程里面的锁是同一把,所以需要将此对象传给task
for i in range(3):
p = Process(target=task, args=(mutex, ), name="进程%s" %i)
p.start()
‘‘‘
打印结果:
进程0 is running
进程0 is end
进程1 is running
进程1 is end
进程2 is running
进程2 is end
‘‘‘
互斥锁模拟抢票功能
# 模拟抢票功能
from multiprocessing import Process, Lock
import time
import json
def show_ticket():
time.sleep(1)
ticket_dict = json.load(open("ticket_num.json", "r", encoding="utf-8"))
print("票数为:%s" %(ticket_dict["count"]))
def pay_ticket(mutex):
show_ticket()
mutex.acquire()
time.sleep(1)
dict = json.load(open("ticket_num.json", "r", encoding="utf-8"))
if dict["count"] > 0:
print("购票成功")
dict["count"] -= 1
json.dump(dict, open("ticket_num.json", "w", encoding="utf-8"))
else:
print("没有票了, 购票失败")
mutex.release()
if __name__ == ‘__main__‘:
mutex = Lock()
for i in range(10):
p = Process(target=pay_ticket, args=(mutex, ))
p.start()
队列
from multiprocessing import Process,Queue
q=Queue(3) # 3为队列中允许最大项数,省略则无大小限制
#put ,get ,put_nowait,get_nowait,full,empty
q.put(1) # 在队列中放置数据
q.put(2)
q.put(3)
print(q.full()) #满了
# q.put(4) #再放就阻塞住了
print(q.get())
print(q.get())
print(q.get())
print(q.empty()) #空了
# print(q.get()) #再取就阻塞住了
ps:
1、队列内存放的是消息而非大数据
2、队列占用的是内存空间,因而maxsize即便是无大小限制也受限于内存大小
# 两个队列形成一个栈 ‘‘‘ 队列特性:先进先出 堆栈特性:先进后出 实现思路: # 存数据阶段 1、先创建两个队列 2、将数据存入第一个队列中 # 取数据阶段 1、判断第一个队列是否为空, 若为空,则返回None 2、将第一个队列中的数据与挨个取出存入第二个队列,当第一个队列中的数量为最后一个的时候,这最后一个数据便是最后存储进队列中的,将其取出,然后返回,然后将第一个队列和第二个队列对调,一直循环,这样就能形成先进后出 ‘‘‘ import queue class Stack(object,): def __init__(self): self.one_queue = queue.Queue() self.two_queue = queue.Queue() def push(self, data): """ 将数据放入第一个队列中 :return: """ self.one_queue.put(data) def getdata(self): """ 将数据取出 :return: """ if self.one_queue.qsize() == 0: return None while 1: if self.one_queue.qsize() == 1: value = self.one_queue.get() break self.two_queue.put(self.one_queue.get())
# 将队列对换 temp = self.one_queue self.one_queue = self.two_queue self.two_queue = temp return value if __name__ == ‘__main__‘: stack = Stack() stack.push("xu") stack.push("sy") stack.push("songy") print(stack.getdata()) print(stack.getdata()) print(stack.getdata())
生产者消费者模型
用多进程和队列实现生产者消费者模型
from multiprocessing import Process, Queue
import time
def producer(name, q):
for i in range(3):
print("生产者%s生产了%s包子" %(name, i))
q.put({name: i})
time.sleep(0.5)
def consumer(name, q):
while True:
package = q.get()
print("消费者%s吃了包子%s" %(name, package))
time.sleep(1)
if __name__ == ‘__main__‘:
q = Queue()
producers = ["xu", "sy", "songyuan"]
consumers = ["guang", "chong",]
# 生产者们
for one_producer in producers:
p = Process(target=producer, args=(one_producer, q, ))
p.start()
# 消费者们
for one_consumer in consumers:
p = Process(target=consumer, args=(one_consumer, q, ))
p.start()
这就产生了一个问题, 如果消费者将队列中的包子取完了, 那么消费者就会一直阻塞在q.get()阶段,无发退出。
所以就有了一个模块JoinableQueue
from multiprocessing import Process, JoinableQueue
import time
def producer(name, q):
for i in range(3):
print("生产者%s生产了%s包子" %(name, i))
q.put({name: i})
time.sleep(0.5)
q.join() # q没取完, 则进程便会在此阻塞住
def consumer(name, q):
while True:
package = q.get()
print("消费者%s吃了包子%s" %(name, package))
time.sleep(1)
q.task_done() # 给q.join()发送一个已经取走一个包子的消息
if __name__ == ‘__main__‘:
q = JoinableQueue()
producers = ["xu", "sy", "songyuan"]
consumers = ["guang", "chong",]
# 生产者们
p1 = Process(target=producer, args=("xu", q, ))
p2 = Process(target=producer, args=("sy", q, ))
p3 = Process(target=producer, args=("songy", q, ))
# 消费者们
c1 = Process(target=consumer, args=("guang", q, ))
c2 = Process(target=consumer, args=("chong", q, ))
c1.daemon = True
c2.daemon = True
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p1.join()
p2.join()
p3.join()
原文:https://www.cnblogs.com/zrxu/p/11755286.html