多线程
多线程是程序在同样的上下文中同时运行多条线程的能力。这些线程共享同一个进程的资源,可以在并发模式(单核处理器)或并行模式(多核处理器)下执行多个任务
多线程有以下几个优点:
但是多线程也有以下几个缺点:
通常,多线程技术完全可以在多处理器上实现并行计算。但是Python的官方版本(CPython)有一个GIL限制,GIL会阻止多个线程同时运行Python的字节码,这就不是真正的并行了。假如你的系统有6个处理器,多线程可以把CPU跑到
600%,然而,你能看到的只有100%,甚至更慢一点,这都是GIL造成的
CPython的GIL是有必要的,因为CPython的内存管理不是线程安全的。因此,为了让每个任务都按顺序进行,它需要确保运行过程中内存不被干扰。它可以更快的运行单线程程序,简化C语言扩展库的使用方法,因为它不需要考虑多线程问题。
但是,GIL是可以用一些办法绕过的。例如,由于GIL只阻止多个线程同时运行Python的字节码,所以可以用C语言写程序,然后用Python封装。这样,在程序运行过程中GIL就不会干扰多线程并发了
另一个GIL不影响性能的示例就是网络服务器了,服务器大部分时间都在读数据包,而当发生IO等待时,会尝试释放GIL。这种情况下,增加线程可以读取更多的包,虽然这并不是真正的并行。这样做可以增加服务器的性能,但是不会影响速度。
用_thread模块创建线程
我们先用一个例子快速演示_thread模块的用法:_thread模块提供了start_new_thread方法。我们可以向里面传入以下参数:
import _thread
import time
def print_time(thread_name, delay):
count = 0
while count < 5:
time.sleep(delay)
count += 1
print("%s:%s" % (thread_name, time.ctime(time.time())))
try:
_thread.start_new_thread(print_time, ("thread-A", 1))
_thread.start_new_thread(print_time, ("thread-B", 2))
except:
print("Error: unable to start thread")
while 1:
pass
运行结果:
thread-A:Sun Jul 8 07:39:27 2018 thread-B:Sun Jul 8 07:39:28 2018 thread-A:Sun Jul 8 07:39:28 2018 thread-A:Sun Jul 8 07:39:29 2018 thread-B:Sun Jul 8 07:39:30 2018 thread-A:Sun Jul 8 07:39:30 2018 thread-A:Sun Jul 8 07:39:31 2018 thread-B:Sun Jul 8 07:39:32 2018 thread-B:Sun Jul 8 07:39:34 2018 thread-B:Sun Jul 8 07:39:36 2018
上面的例子很简单,线程A和线程B是并发执行的。
_thread模块还提供了一些容易使用的线程原生接口:
def print_time(thread_name, delay):
count = 0
while count < 5:
time.sleep(delay)
count += 1
if count == 2 and delay == 2:
_thread.interrupt_main()
print("%s:%s" % (thread_name, time.ctime(time.time())))
运行结果:
thread-A:Sun Jul 8 09:12:57 2018
thread-B:Sun Jul 8 09:12:58 2018
thread-A:Sun Jul 8 09:12:58 2018
thread-A:Sun Jul 8 09:12:59 2018
thread-B:Sun Jul 8 09:13:00 2018
Traceback (most recent call last):
File "D:/pypath/hello/test3/test01.py", line 22, in <module>
pass
KeyboardInterrupt
def print_time(thread_name, delay):
count = 0
while count < 5:
time.sleep(delay)
count += 1
if count == 2 and delay == 2:
_thread.exit()
print("%s:%s" % (thread_name, time.ctime(time.time())))
运行结果:
thread-A:Sun Jul 8 09:15:51 2018 thread-B:Sun Jul 8 09:15:52 2018 thread-A:Sun Jul 8 09:15:52 2018 thread-A:Sun Jul 8 09:15:53 2018 thread-A:Sun Jul 8 09:15:54 2018 thread-A:Sun Jul 8 09:15:55 2018
allocate_lock方法可以为线程返回一个线程锁,这个锁可以保护某一代码块从开始运行到运行结束只有一个线程,线程锁对象有三个方法:
下面这段代码用10个线程对一个全局变量增加值,因此,理想情况下,全局变量的值应该是10:
import _thread
import time
global_values = 0
def run(thread_name):
global global_values
local_copy = global_values
print("%s with value %s" % (thread_name, local_copy))
global_values = local_copy + 1
for i in range(10):
_thread.start_new_thread(run, ("thread-(%s)" % str(i),))
time.sleep(3)
print("global_values:%s" % global_values)
运行结果:
thread-(0) with value 0 thread-(1) with value 0 thread-(2) with value 0 thread-(4) with value 0 thread-(6) with value 0 thread-(8) with value 0 thread-(7) with value 0 thread-(5) with value 0 thread-(3) with value 0 thread-(9) with value 1 global_values:2
但是很遗憾,我们没有得到我们希望的结果,相反,程序运行的结果和我们希望的结果差距更远。造成这样的原因,都是因为多个线程操作同一变量或同一代码块导致有的线程不能读到最新的值,甚至是把旧值的运算结果赋给全部局变量
现在,让我们修改一下原先的代码:
import _thread
import time
global_values = 0
def run(thread_name, lock):
global global_values
lock.acquire()
local_copy = global_values
print("%s with value %s" % (thread_name, local_copy))
global_values = local_copy + 1
lock.release()
lock = _thread.allocate_lock()
for i in range(10):
_thread.start_new_thread(run, ("thread-(%s)" % str(i), lock))
time.sleep(3)
print("global_values:%s" % global_values)
运行结果:
thread-(0) with value 0 thread-(2) with value 1 thread-(4) with value 2 thread-(5) with value 3 thread-(3) with value 4 thread-(6) with value 5 thread-(1) with value 6 thread-(7) with value 7 thread-(8) with value 8 thread-(9) with value 9
现在可以看到,线程的执行顺序依旧是乱序的,但全局变量的值是逐个递增的
_thread还有其他一些方法:
用threading模块创建线程
这是目前Python中处理线程普遍推荐的模块,这个模块提供了更完善和高级的接口,我们尝试将前面的示例转化成threading模块的形式:
import threading
import time
global_values = 0
def run(thread_name, lock):
global global_values
lock.acquire()
local_copy = global_values
print("%s with value %s" % (thread_name, local_copy))
global_values = local_copy + 1
lock.release()
lock = threading.Lock()
for i in range(10):
t = threading.Thread(target=run, args=("thread-(%s)" % str(i), lock))
t.start()
time.sleep(3)
print("global_values:%s" % global_values)
对于更复杂的情况,如果要更好地封装线程的行为,我们可能需要创建自己的线程类,这里需要注意几点:
import threading
import time
class MyThread(threading.Thread):
def __init__(self, count):
threading.Thread.__init__(self)
self.total = count
def run(self):
for i in range(self.total):
time.sleep(1)
print("Thread:%s - %s" % (self.name, i))
t = MyThread(2)
t2 = MyThread(3)
t.start()
t2.start()
print("finish")
运行结果:
finish Thread:Thread-2 - 0 Thread:Thread-1 - 0 Thread:Thread-2 - 1 Thread:Thread-1 - 1 Thread:Thread-2 - 2
注意上面主线程先打印了finish,之后才打印其他线程里面的print语句,这并不是什么大问题,但下面的情况就有问题了:
f = open("content.txt", "w+")
t = MyThread(2, f)
t2 = MyThread(3, f)
t.start()
t2.start()
f.close()
我们假设在MyThread中会将打印的语句写入content.txt,但这段代码是会出问题的,因为在开启其他线程前,主线程可能会先关闭文件处理器,如果想避免这种情况,应该使用join方法,join方法会使得被调用的线程执行完毕后,在能返回原先的线程继续执行下去:
f = open("content.txt", "w+")
t = MyThread(2, f)
t2 = MyThread(3, f)
t.start()
t2.start()
t.join()
t2.join()
f.close()
print("finish")
join方法还支持一个可选参数:时限(浮点数或None),以秒为单位。但是join返回值是None。因此,要检查操作是否已超时,需要在join方法返回后查看线程的激活状态,如果线程的状态是激活的,操作就超时了
再来看一个示例,它检查一组网站的请求状态码:
from urllib.request import urlopen
sites = [
"https://www.baidu.com/",
"http://www.sina.com.cn/",
"http://www.qq.com/"
]
def check_http_status(url):
return urlopen(url).getcode()
http_status = {}
for url in sites:
http_status[url] = check_http_status(url)
for key, value in http_status.items():
print("%s %s" % (key, value))
运行结果:
# time python3 test01.py https://www.baidu.com/ 200 http://www.sina.com.cn/ 200 http://www.qq.com/ 200 real 0m1.669s user 0m0.143s sys 0m0.026s
现在,我们尝试着把IO操作函数转变为一个线程来优化代码:
from urllib.request import urlopen
import threading
sites = [
"https://www.baidu.com/",
"http://www.sina.com.cn/",
"http://www.qq.com/"
]
class HttpStatusChecker(threading.Thread):
def __init__(self, url):
threading.Thread.__init__(self)
self.url = url
self.status = None
def run(self):
self.status = urlopen(self.url).getcode()
threads = []
http_status = {}
for url in sites:
t = HttpStatusChecker(url)
t.start()
threads.append(t)
for t in threads:
t.join()
for t in threads:
print("%s %s" % (t.url, t.status))
运行结果:
# time python3 test01.py https://www.baidu.com/ 200 http://www.sina.com.cn/ 200 http://www.qq.com/ 200 real 0m0.237s user 0m0.110s sys 0m0.019s
显然,线程版的程序更快,运行速度几乎是上一版的8倍,性能改善十分显著
通过Event对象实现线程间通信
虽然线程通常是作为独立运行或并行的任务,但是有时也会出现线程间通信的需求,threading模块提供了事件(event)对象实现线程间通信,它包含一个内部标记,以及可以使用set()和clear()方法的调用线程
Event类的接口很简单,它支持的方法如下:
让我们用线程事件对象来演示一个简单的线程通信示例,它们可以轮流打印字符串。两个线程共享同一个事件对象。在while循环中,每次循环时,一个线程设置标记,另一个线程重置标记。
import threading
import time
class ThreadA(threading.Thread):
def __init__(self, event):
threading.Thread.__init__(self)
self.event = event
def run(self):
count = 0
while count < 6:
time.sleep(1)
if self.event.is_set():
print("A")
self.event.clear()
count += 1
class ThreadB(threading.Thread):
def __init__(self, event):
threading.Thread.__init__(self)
self.event = event
def run(self):
count = 0
while count < 6:
time.sleep(1)
if not self.event.is_set():
print("B")
self.event.set()
count += 1
event = threading.Event()
ta = ThreadA(event)
tb = ThreadB(event)
ta.start()
tb.start()
运行结果:
B A B A B A B A B A B
下面总结一下Python多线程的使用时机:
使用多线程:
不使用多线程:
多进程
由于GIL的存在,Python的多线程并没有实现真正的并行。因此,一些问题使用threading模块并不能解决
不过Python为并行提供了一个替代方法:多进程。在多进程里,线程被换成一个个子进程。每个进程都运作着各自的GIL(这样Python就可以并行开启多个进程,没有数量限制)。需要明确的是,线程都是同一个进程的组成部分,它们共享同一块内存、存储空间和计算资源。而进程却不会与它们的父进程共享内存,因此进程间通信比线程间通信更为复杂
多进程相比多线程优缺点如下:
| 优点 | 缺点 |
| 可以使用多核操作系统 | 更多的内存消耗 |
| 进程使用独立的内存空间,避免竞态问题 | 进程间的数据共享变得更加困难 |
| 子进程容易中断 | 进程间通信比线程困难 |
| 避开GIL限制 |
原文:https://www.cnblogs.com/beiluowuzheng/p/9278619.html