进程:狭义定义:进程是正在运行的程序的实例(an instance of a computer program that is being executed)。
简而言之,就像qq要以一个整体的形式暴露给操作系统管理,里面包含对各种资源的调用,内存的管理,网络接口的调用等,对各种资源管理的集合就可以称为 进程。
进程要操作CPU必须要先创建一个线程,在当代面向线程设计的计算机结构中,进程是线程的容器。
线程:是程序执行流的最小单元,是一串指令的集合
区别:
1,线程共享内存空间和资源(对于线程之间而言),而进程内存是独立的
2,一个进程中的线程之间可以直接交流(读取全局变量),两个进程想通信,必须借助中间代理
3,创建新线程很简单,创建新进程需要对父进程进行一次克隆。
4,一个线程可以控制和操作同一个进程中的其他线程,但是进程只能操作子进程。
5,线程启动速度比进程快,线程的上下文切换要比进程快的多
对于IO操作,一般使用多线程来提高并发。
对于计算性操作,一般使用多进程来提高并发。
归根结底是因为下面的计算机规定:IO操作不占用CPU,计算性操作占用CPU,另外对于一个原因就是GIL
GIL(全称Global Interpreter Lock)全局解释器锁(java和C#中没有这一机制)
隐含的意思:对于任何Python程序,不管有多少的CPU和线程,任何时候都总是只有一个线程在执行。
总结:多线程和多进程的目的在于提高并发,IO密集型用多线程,计算密集型用多进程。
import threading import time def f1(*args): time.sleep(5) print(args) # 单进程,单线程 t = threading.Thread(target=f1, args=(123,)) t.setDaemon(True) # 设置为True,表示主线程不等子线程执行完就结束整个程序 # 设置为False,表示主进程等子线程结束后才会结束整个程序 t.start() # 一个线程 t.join() # 不加参数,默认表示主线程一直等待,等到子线程执行完毕,才会往下走。 # 加参数,参数表示主线程在此最多等待n秒 print("end")
注意:t.join() 和 t.setDaemon(False)意义不同,t.join()会停住等到子线程执行完才往下走。t.setDaemon(False)不会停止会先执行下面的程序,然后再等待子线程执行完才结束。
import threading import time def f1(*args): time.sleep(5) print(args) t = threading.Thread(target=f1, args=(123,)) t.setDaemon(True) # 设置为True,表示主线程不等子线程执行完就结束整个程序 # 设置为False,表示主进程等子线程结束后才会结束整个程序 t.start() # 一个线程
注意:其实上述线程的创建和调用在内部执行了,run()方法,自己动手去找一边。
class MyThread(threading.Thread): def __init__(self, func, args): self.func = func self.args = args super(MyThread, self).__init__() def run(self): self.func(self.args) obj = MyThread(f1, 123) obj.start()
注意:第二种方式采用了自定义类的方式创建了线程,两种方法的本质其实都是一样的。都需要执行run()方法,第一种执行内部的run()方法,第二种执行自定义的run()方法。
put放数据,是否阻塞(默认阻塞),阻塞时的超时时间
get取数据,是否阻塞(默认阻塞),阻塞时的超时时间
队列长度
qsize()真实的个数
maxsize 最大支持的个数
join,task_done, join方法会阻塞进程,当队列中任务执行完毕之后,不再阻塞(即每次执行完任务,后来要加task_done)
empty() 检查是否为空
full() 检查是否已满
例1.
import queue q = queue.Queue(2) # 里面参数表示最多可以放的元素的个数(默认为没有个数限制),如果超过这个个数,默认会阻塞。 q.put(1) # 放数据 q.put(2) print(q.qsize()) # 队列中元素的个数 q.put(6, block=False) # 放数据时默认会阻塞,即block为True;否则,block为False时,如果放不进去,会直接报错! q.put(3, block=False, timeout=2) # 后面还可以接超时时间,如果2秒后仍然放不进去,则直接报错! print(q.qsize()) # 队列中元素的个数 print(q.get()) # 取数据
例2.
import queue q = queue.Queue(2) q.put(1) q.put(2) q.get() q.task_done() q.get() q.task_done() q.join()
注意:这里是放到队列中两个数据,然后再取出两个数据,程序执行完毕,队列清空。如果直接再后面加上join,会阻塞程序。如果每次取出数据都告诉join(即在每次取完时在q.task_done),直到取完程序立刻终止,不会阻塞。用的比较少。
queue.Queue() # 先进先出队列
queue.deque() # 双向队列(两头进两头出)
queue.LifoQueue() # 后进先出队列(last in first out)
queue.PriorityQueue() # 优先级队列
例1:先进后出
import queque q = queue.LifoQueue() # 后进先出队列(last in first out) q.put(123) q.put(456) r = q.get() print(r) #执行结果: #456
例2:优先级队列
import queque q = queue.PriorityQueue() # 优先级队列 q.put((0, ‘tom‘)) # 两个参数,前面一个是优先级,后面一个是放进去的数据,优先级越高,先出 q.put((5, ‘lily‘)) q.put((2, ‘yn‘)) r = q.get() print(r) #执行结果: #tom
例3:双向队列
import queue q = queue.deque() # 双向队列(两头进两头出) q.append("hello") q.append("world") q.appendleft("tom") r = q.pop() r1 = q.popleft() print(r) print(r1) #执行结果: #world #tom
注意:这些队列都是在python内存中创建的,程序退出队列清空!
比如:用户访问12306网站春运时并发非常的高,大量的用户来连接服务器很可能引起服务器的宕机。此时引入生产者消费者模型,就好比在中间加了个管道,如有大并发的连接,全部放入管道,等待服务器到管道获取连接进行排队处理,服务器处理完,用户在进行查询来获取处理结果。
import threading import time NUM = 10 def f1(l): global NUM # 上锁 l.acquire() NUM -= 1 time.sleep(2) print(NUM) # 开锁 l.release() # lock = threading.Lock() lock = threading.RLock() for i in range(10): t = threading.Thread(target=f1, args=(lock,)) t.start()
总结:
1,线程在操作系统执行的最小单元,同一个进程中的线程之间共享内存等各种资源,如何同一个进程中的
很多线程都在修改同一个数据,就是造成数据的混乱,不统一的情况,所以需要线程锁,也叫互斥锁。这样
线程每个线程在执行操作之前都在加锁,所以保证了数据的一致性。
如果上述程序不加锁的输出为:10个0,正常输出应该是:9-0
2,注意Rlock 和 lock的区别,Rlock为递归锁,锁多层,lock只能锁一次。Rlock里面支持多层锁的嵌套,
一般最好使用Rlock,好处显而易见。
互斥锁,同时只允许一个线程更改数据,而semaphore是同时允许一定数量的线程更改数据,比如,有5个收费口高速公路收费站只允许同时5辆车进行缴费,前面5辆车过去,后面的车才能进来。
import threading import time NUM = 10 def f1(l): global NUM # 上锁 l.acquire() # 这时每次只放行2个 NUM -= 1 time.sleep(2) print(NUM) # 开锁 l.release() # lock = threading.Lock() # lock = threading.RLock() lock = threading.BoundedSemaphore(2) # 允许同时最多有2个同时运行 for i in range(10): t = threading.Thread(target=f1, args=(lock,)) t.start()
python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。
事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
clear:将“Flag”设置为False
set:将“Flag”设置为True
import threading def f1(i, e): print(i) e.wait() # 检测是什么灯,如果是红灯,停;如果是绿灯,行。 print(i + 100) event = threading.Event() for i in range(10): t = threading.Thread(target=f1, args=(i, event)) t.start() # ------------------ event.clear() # 设置为红灯,全部暂停 inp = input(‘>>>‘) if inp == ‘1‘: event.set() # 设置为绿灯,设置为绿灯,全部放行。
执行流程:首先10个线程首先print(i),然后阻塞,等待放行,然后一次性全部通过,执行print(i + 100)
让线程只有在满足某个条件的情况下才允许n个线程通过。
import threading def f1(i, conn): print(i) conn.acquire() conn.wait() print(i + 100) conn.release() c = threading.Condition() for i in range(10): t = threading.Thread(target=f1, args=(i, c)) t.start() while True: inp = input(">>>") if inp == ‘q‘: break c.acquire() # ---| c.notify(int(inp)) # ------> 注意这三句必须放在一起,为固定用法 c.release() # ---|
执行流程:首先10个线程首先print(i),接着线程阻塞在conn.acquire()这里,主线程接着向下执行,执行到input时,等待用户输入,当输入n,就会放行n个线程去执行print(i + 100)
import threading def condition(): ret = False r = input(">>>") if r == "true": ret = True else: ret = False return ret def f1(i, conn): print(i) conn.acquire() conn.wait_for(condition) print(i + 100) conn.release() c = threading.Condition() for i in range(10): t = threading.Thread(target=f1, args=(i, c)) t.start()
和上述例子notify(1)的结果相同,只有满足某些条件才会执行。
from threading import Timer def f1(): print("hello, world !") t = Timer(10, f1) t.start()
等待10秒中才会执行f1函数,写监控,客户端等才会用到。
线程池非常重要,但是python2没有提供,python3中提供了这个功能,但是很弱。可以利用第三方库,最好自己去写!
思路:维护一个容器(要有最大连接数),取一个少一个线程,无线程时等待,线程执行完毕,交还线程。需要用到队列!
1,低版本的线程池
import queue import threading import time class ThreadPoll: def __init__(self, maxsize=5): self.maxsize = maxsize self._q = queue.Queue(maxsize) for i in range(maxsize): self._q.put(threading.Thread) # [threading.Thread, threading.Thread, threading.Thread, threading.Thread] def get_thread(self): return self._q.get() def add_thread(self): self._q.put(threading.Thread) # 最多有5的允许5个线程 poll = ThreadPoll(5) def task(args, p): print(args) time.sleep(1) p.add_thread() # 如果没有最后这个步的话,到第六个get_thread()会因为获取不到线程而阻塞住 # 所以在取走一个线程之后再添加一个线程。 for i in range(100): t = poll.get_thread() # 此时的t为threading.Tread类 obj = t(target=task, args=(i, poll,)) # 创建一个threading.Tread对象 obj.start()
执行流程:会5个5个的执行,注意线程执行没有先后顺序!
分析:通过上述线程池会发现一些问题。
a, 线程不能重复利用,只能开垃圾回收机制进行回收!
b, 线程池直接开到了最大。当线程个数小于5个时,也会创建5个,导致线程的浪费!
2,高级线程池
#!/usr/bin/python # _*_ coding:utf-8 _*_ import queue import threading import contextlib import time # 队列里面直接放任务,[(函数名,参数1),(函数名,参数2),(函数名,参数3)] StopEvent = object() class ThreadPool(object): def __init__(self, max_num, max_task_num=None): if max_task_num: self.q = queue.Queue(max_task_num) else: self.q = queue.Queue() self.max_num = max_num self.cancel = False self.terminal = False self.generate_list = [] # 当前已经创建了多少线程 self.free_list = [] # 当前还空闲多少线程 def run(self, func, args, callback=None): """ 线程池执行一个任务 :param func: 任务函数 :param args: 任务函数所需参数 :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数) :return: 如果线程池已经终止,则返回True否则None """ if self.cancel: return if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: # 当没有空闲的线程数,而且已经能够创建的线程数小于最大的线程数时,则创建线程 self.generate_thread() w = (func, args, callback,) self.q.put(w) # 如果不满足上述判断的话,只把任务放在队列中而不会去创建线程 def generate_thread(self): """ 创建一个线程 """ t = threading.Thread(target=self.call) t.start() def call(self): """ 循环去获取任务函数并执行任务函数 """ current_thread = threading.currentThread self.generate_list.append(current_thread) event = self.q.get() while event != StopEvent: func, arguments, callback = event try: result = func(*arguments) success = True except Exception as e: success = False result = None if callback is not None: try: callback(success, result) except Exception as e: pass with self.worker_state(self.free_list, current_thread): if self.terminal: event = StopEvent else: event = self.q.get() else: self.generate_list.remove(current_thread) def close(self): """ 执行完所有的任务后,所有线程停止 """ self.cancel = True full_size = len(self.generate_list) while full_size: self.q.put(StopEvent) full_size -= 1 def terminate(self): """ 无论是否还有任务,终止线程 """ self.terminal = True while self.generate_list: self.q.put(StopEvent) self.q.empty() @contextlib.contextmanager def worker_state(self, state_list, worker_thread): """ 用于记录线程中正在等待的线程数 """ state_list.append(worker_thread) try: yield finally: state_list.remove(worker_thread) # ----------------执行-------------------------- pool = ThreadPool(5) def callback(status, result): # 定义的任务 # status, execute action status # result, execute action return value pass def action(i): print(i) for i in range(300): ret = pool.run(action, (i,), callback) # time.sleep(1) # print(len(pool.generate_list), len(pool.free_list)) # print(len(pool.generate_list), len(pool.free_list))
from multiprocessing import Process def f1(i): print("hello,{}".format(i)) if __name__ == ‘__main__‘: # 注意在windows必须加这一段,否则会报错!但是,在linux,mac没有上述一行代码依然可以执行! # 所以,在windows下慎用多进程! for m in range(10): p = Process(target=f1, args=("tom",)) # p.daemon = True p.start() # p.join()
# 进程里面的这些方法和线程一样,这里不再一一赘述
进程中默认数据是不共享的
from multiprocessing import Process import threading import time def f1(i, args): args.append(i) print("hello,{}".format(i), args) if __name__ == ‘__main__‘: # 注意在windows必须加这一段,否则会报错!但是,在linux,mac没有上述一行代码依然可以执行! li = [] for m in range(5): p = Process(target=f1, args=(m, li)) # p = threading.Thread(target=f1, args=(m, li)) # p.daemon = True p.start() # p.join()
执行结果:
hello,0 [0]
hello,1 [1]
hello,2 [2]
hello,4 [4]
hello,3 [3]
由此可以看出进程之间,默认数据是不共享的。如果共享列表中的元素应该不断增加
数据共享的种三种方式:
a. 第一种,运用特殊的queques方式
from multiprocessing import Process from multiprocessing import queues import multiprocessing def f1(i, args): args.put(i) print(args.qsize()) if __name__ == ‘__main__‘: li = queues.Queue(20, ctx=multiprocessing) for m in range(5): p = Process(target=f1, args=(m, li)) p.start()
b. 第二种,运用数组的方式
from multiprocessing import Process from multiprocessing import Array def f1(i, args): args[i] = i + 100 for n in args: print(n) print("---------") if __name__ == ‘__main__‘: li = Array(‘i‘, 6) # 创建数组,规定好是字符型,数量为6个,这里的数组是C语言的格式 for m in range(5): p = Process(target=f1, args=(m, li)) p.start()
数组和链表的关系,列表就是基于链表来实现的。链表在内存中的数据不一定是连续的,但是每一个数据块都会记录上一个和下一个块的位置。而数组则在内存中的数据块是连续,类型、数量都是规定好的。注意所有语言里面的数据都是一样的。这种方式不常用。
c. 第三种方式,利用特殊字典的方式
from multiprocessing import Process from multiprocessing import Manager def f1(i, args): args[i] = i + 100 print(args.values()) if __name__ == ‘__main__‘: obj = Manager() li = obj.dict() for m in range(5): p = Process(target=f1, args=(m, li)) p.start() p.join() # 重要
注意,最后的p.join()必须加上,否则会报错!主进程执行完之后会停止,而此时创的子进程需要修改主进程的数据,而此时主进程已经关闭。这种方式比较常用!
from multiprocessing import Process, Array from multiprocessing import RLock, Lock, Event, Condition, Semaphore import time def f1(i, lis, lc): lc.acquire() lis[0] = lis[0] - 1 time.sleep(1) print(‘hello,‘,lis[0]) lc.release() if __name__ == ‘__main__‘: li = Array(‘i‘, 1) li[0] = 10 lock = RLock() for m in range(5): p = Process(target=f1, args=(m, li, lock)) p.start() p.join()
注意:如果没有进程锁,多个子进程都要修改数据,会造成混乱,需要加锁
总结:进程中同样有RLock, Lock, Event, Condition, Semaphore这个方法,和线程相同这里不再一一赘述!
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。进程池python给定义好了,不需要自己去定义,只需要会用即可。
from multiprocessing import Pool import time def f1(args): print(args) if __name__ == ‘__main__‘: pool = Pool(5) for i in range(30): # pool.apply(func=f1, args=(i,)) # 这是串行来执行的 pool.apply_async(func=f1, args=(i, )) # 这是异步执行的 pool.close() # 所有的任务执行完毕,才关闭 # time.sleep(1) # pool.terminate() # 立即终止当前任务! pool.join() # 等待所有的任务执行完毕才会终止
注意:close和terminate的区别,以及串行和异步的区别
IO密集型用多线程,计算或者CPU密集型用多进程。如果要写爬虫,会产生http请求,http请求也叫IO请求,用多线程比较合适!而对于IO密集型的也比较适合用协程(协程不适合有大量cpu操作的)
注意:线程和进程都是计算机给提供的,而协程则是在程序级别做的。
原理:利用一个线程,分解一个线程为多个微线程。
greenlet 底层的
gevent 高级的,是对greenlet的封装
例1,简单的协程操作-greenlet
from greenlet import greenlet def test1(): print(12) gr2.switch() print(34) gr2.switch() def test2(): print(56) gr1.switch() print(78) gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch()
执行结果:(来回切换进行操作)
12
56
34
78
例2:简单的协程操作-gevent
import gevent def f1(): print("welcome to f1") gevent.sleep(0) print("welcome to f1 agin !") def f2(): print("welcome to f2") gevent.sleep(0) print("welcome to f2 agin !") gevent.joinall([gevent.spawn(f1), gevent.spawn(f2)])
执行结果:
welcome to f1
welcome to f2
welcome to f1 agin !
welcome to f2 agin !
例3:涉及到IO操作,gevent就是一个高性能的代名词
from gevent import monkey import gevent import requests monkey.patch_all() # 把原来的socket功能修改了,发完请求的会告诉你发送完了,用上它,才能用协程 def f1(url): print("GET %s" % url) resp = requests.get(url) data = resp.text print("%d bytes received from %s." % (len(data), url)) gevent.joinall([ gevent.spawn(f1, "https://www.baidu.com/"), gevent.spawn(f1, "https://www.python.org/"), gevent.spawn(f1, "https://www.sina.com/"), ])
python对memcache 和 redis 操作,详见:http://www.cnblogs.com/wupeiqi/articles/5132791.html
1,安装软件
2,程序:安装其对应的客户端(API)
原文:http://www.cnblogs.com/yang-ning/p/6426210.html