IO密集型----多线程
计算密集型----多进程
1、单进程
from multiprocessing import Process def foo(i): print(‘你好哈‘,i) if __name__ == ‘__main__‘: #if __name__ == ‘__main__‘:只可做测试调用,不能用于生产,windows不支持,linux中可不用添加if __name__ == ‘__main__‘ for i in range(10): t = Process(target=foo,args=(i,)) #创建进程 t.start() #执行进程
注意:由于进程之间的数据需要各自持有一份,所以创建进程需要的非常大的开销。
由于进程在内存中具有独立的地址空间,且每个地址空间各持有一份数据,默认情况下无法共享数据
#方法一:进程共享数据:Array from multiprocessing import Process from multiprocessing import Array def foo(i,arg): arg[i] = i +100 #每一个进程加100 for item in arg: # print(item) #输出迭代数据 print("----------") if __name__ == "__main__": li = Array(‘i‘,10) #创建数组,i 等于数据格式为整型,并设置10个元素给数据 for i in range(10): #设置10个进程 p = Process(target=foo,args=(i,li,))#创建进程 p.start() #执行进程
#方法二:常用进程共享 from multiprocessing import Process from multiprocessing import Manager def foo(i,arg): arg[i] = i +100 print(arg.values()) if __name__ == "__main__": obj = Manager() #创建对象 li = obj.dict() #创建字典 for i in range(10): p = Process(target=foo,args=(i,li,)) p.start() # time.sleep(0.01) #方式一: p.join() #方式二:join进程之间串联进行
‘c‘: ctypes.c_char, ‘u‘: ctypes.c_wchar, ‘b‘: ctypes.c_byte, ‘B‘: ctypes.c_ubyte, ‘h‘: ctypes.c_short, ‘H‘: ctypes.c_ushort, ‘i‘: ctypes.c_int, ‘I‘: ctypes.c_uint, ‘l‘: ctypes.c_long, ‘L‘: ctypes.c_ulong, ‘f‘: ctypes.c_float, ‘d‘: ctypes.c_double
#创建进程池
from multiprocessing import Process,Queue def f(i,q): print(i,q.get()) #回去队列中元素 if __name__ == "__main__": q=Queue() #创建队列 q.put(123) #将数据put进队列 q.put(456) q.put(789) for i in range(10): #共设置10个进程池 p = Process(target=f,args=(i,q,)) #创建进程 p.start() #执行进程
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。
进程池常用方法:
from multiprocessing import Pool import time def f1(arg): time.sleep(1) print(arg) if __name__ == "__main__": pool = Pool(5) #创建进程池 for i in range(10): #创建10个进程 pool.apply(func=f1,args=(i,)) #apply:让进程去进程池中获取数据,只有等待一个执行完一个元素后,第二个进程接着执行 # pool.apply_async(func=f1,args=(i,)) print("end") #当0个进程执行完之后,输出end
from multiprocessing import Pool import time def f1(arg): time.sleep(1) print(arg) if __name__ == "__main__": pool = Pool(5) #创建进程池,每一次可同时执行5个进程 for i in range(10): #创建10个进程 #pool.apply(func=f1,args=(i,)) #apply:让进程去进程池中获取数据,只有等待一个执行完一个元素后,第二个进程接着执行 pool.apply_async(func=f1,args=(i,)) #apply_async:方法:并行运行所有进程 # print("end") #当0个进程执行完之后,输出end pool.close()#所有任务(即)执行完毕 time.sleep(2) pool.terminate()#当程序运行此时,立即终止所有进程,不管进程池中任务是否执行完 # pool.join()
进程中常用方法由:Lock,Rloack,Event,Condition,Smaphore等于线程中方法一样
1、进程锁:
from multiprocessing import RLock,Lock,Event,Condition,Semaphore from multiprocessing import Process from multiprocessing import queues from multiprocessing import Array import multiprocessing import time def foo(i,list,lc): lc.acquire() #给进程上锁 list[0]=list[0]-1 #每一次进程调用子进程时减一 time.sleep(1) print("say hi",list[0]) #输出每一次进程调用 lc.release() #给进程解锁 if __name__ == "__main__": li = Array(‘i‘,1) #python引用数组,i:表示整型数据,1:带包进程数组中含有一个元素
li[0]=10 #设置数组长度为10 lock = RLock() #创建进程锁 for i in range(10): #创建10个进程 p = Process(target=foo,args=(i,li,lock)) #创建进程 p.start() #执行进程
2、多进程
定义:线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位.线程自己基本上不拥有系统资源,只拥有一点在运行中必 不可少的资源(如程序计数器,一组寄存器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源.
threading模块:提供线程相关操作,线程是应用程序中工作的最小单位
1、单线程
#!/usr/bin/env python # -*- coding: utf-8 -*- # lcj import threading import time #创建一个class类 def show(arg): #arg:传参 time.sleep(2) print(‘thread‘+str(arg)) #将传参转换至字符串形式 for i in range(10): #创建10个线程 t = threading.Thread(target=show,args=(i,)) #创建线程 t.start() #线程就绪,等到CPU调度, # print(‘main thread stop‘)
上述代码创建了10个“前台”线程,然后控制器就交给了CPU,CPU根据指定算法进行调度,分片执行指令
更多方法: •start 线程准备就绪,等待CPU调度 •setName 为线程设置名称 •getName 获取线程名称 •setDaemon 设置为后台线程或前台线程(默认) 如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止 如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止 •join 逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义 •run 线程被cpu调度后自动执行线程对象的run方法
#自定义线程 import threading import time class MyThread(threading.Thread): #继承父类threading.Thread def __init__(self, num): #MyThread类__ini__方法 threading.Thread.__init__(self) self.num = num def run(self): #定义每个线程执行的函数,此时:self == 调用对象(t1或者t2对象) print(‘running on number:%s‘ % self.num) time.sleep(3) #每一个线程睡眠3秒 if __name__ == "__main__": t1 = MyThread(1) #创建t1对象,并将参数1复制给t1对象 t2 = MyThread(2) #创建t2对象,并将参数1复制给t2对象 t1.start() #执行线程 t2.start() #输出 # running on number:1 # running on number:2
import threading def f1(arg): print(arg) for i in range(10): #设置10个线程 t = threading.Thread(target=f1,args=(i,)) #创建线程 t.start() #执行线程 #输出:0-9数据
import queue,time import threading #先进先出 q = queue.Queue(20) #生产者 def productor(arg): """ 买票 :param arg: :return: """ q.put(str(arg)+"-买票") for i in range(30): t = threading.Thread(target=productor,args=(i,)) #创建买票线程生产者,并将动态参数i传递给productor(arg) t.start() #执行买票线程 # if i == 30: # break #消费者 def consumer(arg): """ 服务器后台, :param arg: :return: """ while True: print(arg,q.get()) #获取队列中存在元素 time.sleep(1) sk = threading.BoundedSemaphore(5) for j in range(5): t = threading.Thread(target=consumer,args=(j,))#创建买票线程生产者,并将动态参数j传递给consumer(arg) t.start() #执行c线程
Python中常用队列方法:
put放数据,是否阻塞,阻塞时的超时事件
get取数据(默认阻塞),是否阻塞,阻塞时的超时事件
qsize():获取队列中有多少个元素
maxsize:最大支持的个数
1、先进先出
queue.Queue(2)方法:创建先进先出队列,参数2表示队列中最多可含有两个元素,否则超过队列中设置的最大队列则报队列满(queue.Full)错误,
import queue import time #先进先出 # put放数据,是否阻塞,阻塞时的超时事件 # get取数据(默认阻塞),是否阻塞,阻塞时的超时事件 # qsize():获取队列中有多少个元素 # maxsize 最大支持的个数 q = queue.Queue(2) #创建队列,带参数,则表示队列最大可放多少个数据 q.put(12) q.put(13) print(q.qsize()) #获取队列中有多少个数据 # q.put(15,timeout=2) #当队列中设置长度为2时,此时在往队列中增加元素时,超过2秒,报错queue.Full q.put(15,block=False,timeout=2) print(q.qsize()) #获取队列中有多少个数据 print("------------") print(q.get()) #获取队列中元素 print(q.get(block=False)) #get取数据(默认阻塞),当block=False表示不堵塞 print(q.get(timeout=2)) #设置获取队列中元素超时时间2秒,超过则直接执行下面代码 print(q.get(block=False,timeout=2)) #
2、后进先出
queue.LifoQueue()方法:创建后进先出队列
import queue # 队列后进先出 q= queue.LifoQueue() q.put(123) #将数据put队列中 q.put(234) print(q.get()) #获取队列中的元素 #输出:234
3、队列优先级
queue.PriorityQueue()方法:创建队列优先级队列
import queue #队列优先级,按照下标进行优先级别排序 q = queue.PriorityQueue() #创建队列优先级别q值 q.put((1,"lcj")) #将数据put队列中 q.put((0,"alex")) q.put((4,"xiaoluo")) print(q.get()) #获取队列中的元素 #输出(0, ‘alex‘)
4、双向队列
queue.deque()方法:创建双向队列
import queue #双向队列 q = queue.deque() #创建双向队列 q.append(33) #将元素33加入队列中 q.append(44) q.append(55) q.appendleft(99) print(q.pop()) #pop:在队列中提取一个元素 print(q.popleft()) #99
一个进程下有多个线程,且线程是共享进程资源,每一个线程肯能执行N条语句后,当多个线程同时修改一天数据时可能会出现脏数据,所以,出现线程锁,同一时间只允许一个线程操作
Lock,RLock区别?
Lock是阻塞其他线程对共享资源的访问,且同一线程只能acquire一次,如多于一次就出现了死锁,程序无法继续执行。为了保证线程对共享资源的独占,又避免死锁的出现,就有了RLock。RLock允许在同一线程中被多次acquire,线程对共享资源的释放需要把所有锁都release。即n次acquire,需要n次release
线程在未加锁情况下,主线程回一次性执行所有的线程且会产生多个线程同用一个数据,产生脏数据
#未加锁 import threading import time num = 0 #定义一个全局变量 def show(arg): global num #对全局变量num进行重新赋值 time.sleep(1) num +=1 print(num) for i in range(10): #定义十个线程 t = threading.Thread(target=show,args=(i,))# t.start() # 执行线程
线程加锁特定:每一个线程执行一条数据,防止多个线程重用一条数据产生脏数据
#加锁 import threading import time num = 0 #定义一个全局变量 lock = threading.RLock() #定义一个线程锁 def show(arg): lock.acquire() #枷锁操作 global num #对全局变量num进行重新赋值 num +=1 time.sleep(1) #每一个线程执行时睡眠一秒钟 print(num) lock.release() #解锁操作 for i in range(10): #定义十个线程 t = threading.Thread(target=show,args=(i,))#创建主线程 t.start() # 执行线程
互拆锁:同时只允许一个线程更改数据,而Semapphore是同时允许一定数量的线程更改数据,比如:一个萝卜一个坑,只有空余的坑才能进入占座
BoundedSemaphore(5)方法:表示5个线程同时运行
import threading,time def run(n): k1.acquire() #加锁 time.sleep(1) print("run the threading:%s" %n) k1.release() #解锁 if __name__ == ‘__main__‘: num = 0 k1 = threading.BoundedSemaphore(5) #设置线程最多同时运行5个线程 for i in range(30): #设置线程数为30 t = threading.Thread(target=run,args=(i,)) #主线程 t.start() #执行线程
python线程的事件用于主线程控制其他子线程的执行,事件主要提供是三个方法:set、wait、clear
事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
import threading,time
#代码从上往下执行,由t.start调用子线程执行lcj方法,第一次输出i值,
#当线程执行值wait时,判断线程状态,红灯(clear):停,绿灯(set):行
#主线程继续运行至inp,当用户输如true时,主线程执行set方法,将线程转态设置绿灯,继续运行,则子线程继续执行i+100 def lcj(i,event): print(i) time.sleep(1) event.wait() #检测是什么等,如果是红灯,则停,绿灯:则行 print(i+100) event_obj = threading.Event() #创建一个线程事件 for i in range(10): #设置30个线程 t = threading.Thread(target=lcj,args=(i,event_obj,)) t.start() #调用线程 event_obj.clear() #设置成红灯 inp = input(">>:").strip() if inp ==‘true‘: #如果用户输入:true,系统则经过wait检查之后执行其下面打印语句 event_obj.set() #设置成绿灯
当线程处于等待时,只有满足某条件时,才释放N个线程继续执行子线程下代码
1、条件wait方法用法
import threading def run(i,con): print(i) con.acquire() #加锁 con.wait() #检测主线程状态,当主线程客户端输入3,则子线程输出队列中前三个元素每一个各加100输出 print(i+100) con.release() #解锁 # if __name__ == "__main__": con = threading.Condition() for i in range(10): t = threading.Thread(target=run,args=(i,con,)) #创建主线程 t.start() #执行子线程 while True: inp = input(">>>:").strip() if inp == ‘q‘: break # 此三个必须放在一起、acquire、notify、release con.acquire() #加锁 con.notify(int(inp)) #notify()方法不会释放所占用的锁,输入几就释放几个元素,比如:客户输入2,则子线程输出100,101 con.release() #解锁
2、条件wait_for方法:
import threading def contion(): ret = False i = input(">>>>:") if i =="true": ret = True else: ret = False return ret def run(i,con): print(i) #输出所有线程 con.acquire()#加锁 con.wait_for(contion) #wait_for:等待条件=true成立,则执行contion函数中语句 print(i+100) #当主线程条件成立,则执行i+100 con.release() #解锁 c = threading.Condition() #创建线程条件 for i in range(10): #设置10个线程 t = threading.Thread(target=run,args=(i,c,)) #创建线程 t.start() #执行线程
python中定时器:指定N秒之后执行子线程操作
from threading import Timer def lcj(): print("hello,world") t = Timer(1,lcj) #定时器:1秒后执行lcj函数,并输出hello,world t.start() #执行线程
2、多线程
线程池原理:可理解为是一个容器(容器中可指定大小),在容器中取一个线程则少一个线程,再无没有线程时等待,线程执行完毕,将线程归还给线程池
1、基本线程池
#自定义线程池 import queue import threading import time class ThreadPool: #进程池类 def __init__(self,maxsize=5): #创建构造方法,设置最大线程为5个线程 self.maxsize = maxsize self._q = queue.Queue(maxsize)#_q:下划线 为了区别,创建队列 for i in range(maxsize): #每次循环最大线程 self._q.put(threading.Thread) #将类名put队列中 def get_thread(self): return self._q.get() #获取线程 def add_thread(self): self._q.put(threading.Thread)#队列含有:threading.Thread pool = ThreadPool(5) #线程池最大个数为5,即每次最多只能获取5个线程 def task(arg,p): print(arg) time.sleep(1) # p.add_thread()#从线程池取完一次数据之后 再次想线程池中获取数据,直到执行完100个线程 for i in range(100): #循环100次 #t==threading.Thread类 t = pool.get_thread() obj = t(target=task,args=(i,pool,)) #创建主线程 obj.start() #执行子线程池
2、高级线程池
协程原理:利用一个线程,分解一个线程成为多个“微线程”
线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。
协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。
协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程;
协程常用方法:
greenlet:底层
from greenlet import greenlet def f1(): print(12) lcj2.switch() print(34) lcj2.switch() def f2(): print(56) lcj1.switch() print(78) lcj1 = greenlet(f1) lcj2 = greenlet(f2) lcj1.switch() #第一步:lcj1.switch():执行f1函数并输出12 #第二步:在函数f1中lcj2.switch(),则执行f2函数并输出56 #第三步:在f2函数执行lcj1.switch(),则执行f1函数,并输出34 #第四步:在f1函数执行lcj2.switch(),则执行f2函数,并输出78 #最后输出顺序为:12,56,34,78
gevent:高级
import gevent def foo(): print("running in foo") gevent.sleep(1) print(‘Explicit context switch to foo agein‘) def bar(): print(‘explicit context to bar‘) gevent.sleep(1) print(‘implicit context switch back to bar‘) gevent.joinall([ gevent.spawn(foo), gevent.spawn(bar), ]) # running in foo # explicit context to bar # Explicit context switch to foo agein # implicit context switch back to bar
遇到IO操作自动切换:
from gevent import monkey;monkey.patch_all() #monkey.patch_all() 修改原来socket, 对IO请求进行封装 import gevent import requests def f(url): print(‘GET: %s‘% url) #获取所有URL resp = requests.get(url) #获取URL data = resp.text #获取URL内容 print("%d bytes received from %s." % (len(data),url)) gevent.joinall([ gevent.spawn(f, "https://www.python.org/"), gevent.spawn(f, "https://www.yahoo.com/"), gevent.spawn(f, "https://github.com/"), ])
原文:http://www.cnblogs.com/lcj0703/p/5682035.html