Python的threading模块有一个比较严重的bug:那就是可能会让线程的等待提前结束或者延迟,具体的原因是因为线程的wait操作判断超时时依赖于实时时间,即通过time.time()获取到的时候,为了显示这个问题,请看下面的例子:
from threading import Thread from threading import Event import time e = Event() stop = False class MyThread(Thread): def __init__(self): super(MyThread,self).__init__() def run(self): counter = 0 while not stop: time.sleep(1.0) counter += 1 print counter if counter >= 60: break t = MyThread() t.start() e.wait(60) stop = True print 'done!'
这段代码创建一个计时的线程,每1秒打印一次计数,线程总共等待60,然后结束。为了更好的说明问题,代码并没有用Thread.join来等待,而是用Event.wait来等待,
(其实通过下面的代码分析,可以知道,Event.wait和Thread.join的等待都是调用的Condition.wait,所以情况是一样的)
按照正常情况,一段程序会打印60次计数,然后结束。
现在,把系统的时候调前60秒或者调后60秒,你会发现一个很有意思的情况:
1、当系统调前60秒时,计时打印输出会立即停止,等待会立即结束,并输出done!
2、当系统的时间被调后60秒时,计时打印输出也会立即停止,过60秒后才恢复打印,总共要等待120秒程序才会结束。
然后你可以想到,当系统的时候改变被调后超过一小时,二小时时,甚至一天,二天时,就会发生死锁的情况(其实这里的死锁也不太准确,
系统在等待超过改变的时间后还是会返回的,但是这种长时间的等待几乎可以认可为死锁了)
好,说明了问题,接下来就要来说说这个问题有多严重了。也许有人会说这个问题出现的概率几乎为零,现在的机器的时间一般都是很准的,不会出现大的时间调整。
但是凡事都有例外,例如你的机器的给rtc供电的钮扣电池没电了,这时候必须会在改变时间的时候出现比较大的时间变化,再如有些设备上根本就没有rtc,在开机的时候通过NTP服务器来获取时间等等。如果问题出现在终端设备可能影响会小些,但是如果代码是运行在服务端的,那就可能会是灾难性的,有可能会直接造成服务器的宕机。
关键这个问题很隐密,很难重现,所以,如果不知道问题之所在,查起来比较麻烦。当然,这个问题还是被我给解决掉了,要不然也不会有这篇文章的产生。
好,说了这么一大堆废话,接下来我们来看下具体的原因:
找到threading.py,我把关键的代码帖出来:
from time import time as _time, sleep as _sleep
class _Condition(_Verbose): ... def wait(self, timeout=None): if not self._is_owned(): raise RuntimeError("cannot wait on un-acquired lock") waiter = _allocate_lock() waiter.acquire() self.__waiters.append(waiter) saved_state = self._release_save() try: # restore state no matter what (e.g., KeyboardInterrupt) if timeout is None: waiter.acquire() if __debug__: self._note("%s.wait(): got it", self) else: # Balancing act: We can't afford a pure busy loop, so we # have to sleep; but if we sleep the whole timeout time, # we'll be unresponsive. The scheme here sleeps very # little at first, longer as time goes on, but never longer # than 20 times per second (or the timeout time remaining). endtime = _time() + timeout delay = 0.0005 # 500 us -> initial delay of 1 ms while True: gotit = waiter.acquire(0) if gotit: break remaining = endtime - _time() if remaining <= 0: break delay = min(delay * 2, remaining, .05) _sleep(delay) if not gotit: if __debug__: self._note("%s.wait(%s): timed out", self, timeout) try: self.__waiters.remove(waiter) except ValueError: pass else: if __debug__: self._note("%s.wait(%s): got it", self, timeout) finally: self._acquire_restore(saved_state) def Condition(*args, **kwargs): return _Condition(*args, **kwargs) class _Event(_Verbose): def __init__(self, verbose=None): _Verbose.__init__(self, verbose) self.__cond = Condition(Lock()) def Event(*args, **kwargs): return _Event(*args, **kwargs)
这个方法首先计算等等的结束时间,
endtime = _time() + timeout
然后不断的判断时间有没有到,如果没到,就等上delay*2的时间,每次等待最多0.05秒:
while True:
gotit = waiter.acquire(0)
if gotit:
break
remaining = endtime - _time()
if remaining <= 0:
break
delay = min(delay * 2, remaining, .05)
_sleep(delay)
这里,如果在等待时时间发生了改变,就会出现等待提前结束或者延迟结束的问题。
另外,Thread.join里的等待也是调用的_Condition.wait,所以也会有这个问题,具体的看下面的代码:
class Thread(_Verbose): self.__block = Condition(Lock()) def join(self, timeout=None): ... self.__block.acquire() try: if timeout is None: while not self.__stopped: self.__block.wait() if __debug__: self._note("%s.join(): thread stopped", self) else: deadline = _time() + timeout while not self.__stopped: delay = deadline - _time() if delay <= 0: if __debug__: self._note("%s.join(): timed out", self) break self.__block.wait(delay) else: if __debug__: self._note("%s.join(): thread stopped", self) finally: self.__block.release()
好,知道了问题,接下来看下怎么去解决:
这里也没有什么好卖关子的,解决的方法很简单,就是用开机时间来替代实时时间。
在linux下面,通过clock_gettime来获取开机时间,
在windows下面,通过GetTickCount来获取开机时间,代码如下:
startup_linux.pyx
cdef extern from "time.h": enum: CLOCK_MONOTONIC struct timespec: int tv_sec long tv_nsec cdef int clock_gettime(int type,timespec* ts) def getboottime(): cdef timespec ts clock_gettime(CLOCK_MONOTONIC,&ts) return <double>ts.tv_sec + ts.tv_nsec/1000000000.0
cdef extern from "Windows.h": int GetTickCount() def getboottime(): return GetTickCount()/1000.0
Setup.py
from distutils.core import setup from distutils.extension import Extension from Cython.Build import cythonize import os if os.name == 'nt': print 'build startup_win32' sources = ["startup_win32.pyx"] setup( name = 'startup_win32', ext_modules=cythonize([ Extension("startup_win32", sources), ]), ) else: sources = ["startup_linux.pyx"] setup( name = 'startup', ext_modules=cythonize([ Extension("startup_linux", sources, libraries = ['rt'] ), ]), )
接下来,看下怎么使用:
替换threading.py开始的如下代码:
from time import time as _time, sleep as _sleep
为:
from time import sleep as _sleep import os if os.name == 'nt': import startup_win32.getboottime as _time else: import startup_linux.getboottime as _time
其它的代码都不用改。
当然,不只是threading这个模块有这个问题,系统的Queue模块也同样也有这个问题,解决方法和threading模块一样。
Python 官方代码threading模块的一个死锁的bug,布布扣,bubuko.com
Python 官方代码threading模块的一个死锁的bug
原文:http://blog.csdn.net/i2cbus/article/details/38349655