前些天在公司这边写了个豌豆荚的爬虫,用到了分区思想和自己实现的线程池,我自己觉得从这个过程中学到了很多东西,包括如何去设计接口和方便扩展以及代码的规范化。之前用小数据量测试了发现没什么问题,后来拿了W级以上的问题,发现插入的数码条目的量级和输入量级有很大差异,就算算上失效的URL也不应出现这样的情况,于是开始排查。反反复复看各个模块的代码,对应日志信息查看,最后发现时死锁问题导致的。
死锁(英语:Deadlock),又译为死锁,计算机科学名词。当两个以上的运算单元,双方都在等待对方停止运行,以取得系统资源,但是>没有一方提前退出时,这种状况,就称为死锁。在多任务操作系统中,操作系统为了协调不同进程,能否取得系统资源时,为了让系统运
作,就必须要解决这个问题。
这里指的是进程死锁,是个计算机技术名词。它是操作系统或软件运行的一种状态:在多任务系统下,当一个或多个进程等待系统资源,
而资源又被进程本身或其它进程占用时,就形成了死锁。
上面的解释引自维基,死锁有进程间的死锁和线程间的死锁,只要是并发情况,并且双方都在占有资源的情况下等待对方的资源,就会发生死锁。在实际情况下,很容易不注意锁,条件变量的时候而导致死锁。
这次死锁发生在什么情况下呢?在最开始写线程池的时候,我设计了线程是可重用的,主要是通过Event信号实现,通过在每个线程核心工作代码执行完毕后会将自己归还到池中,然后等待Event信号。主线程会以循环超时阻塞的方式监视一个任务队列,当发现有任务时便会从线程池中取出一个线程,并设置它的任务和目标函数,然后去start或者resume,resume就是会设置Event信号让线程不再阻塞,这里,从池中取线程的方法_get和归还线程方法returnThread都已经加锁,_get和returnThread使用同一把互斥锁,因为在_get和returnThread方法里面对线程池对象以及分区对象都有状态修改并且有些操作有条件判断,因此必须加锁保证线程安全和同步。
这样就真正线程安全了吗?可以顺利按照预期执行了吗?看起来好像没有问题,并且我这里设置的分区数目是4,分区的初始容量是5,最大容量为20,故池的总大小为4*20=80,这样对于小数目的测试确实发现不了死锁问题。
考虑下面一种情况:
我们通过上面的描述,发现死锁的发生是因为条件等待时没有释放锁资源,仔细思考这句话,会发现其实我们也熟知的Condition就是为了解决这个问题的。
python里面的threading.Condition里面会内置Lock/RLock锁,并且可以条件等待时释放锁资源,这样,将之前的淡出的互斥锁改成condition,并且在queue.get方法时,先判断条件是否满足(有可用线程),如果可用则直接往后执行,否则cond.wait阻塞并且释放锁;另一方面,正在运行的thread通过returnThread时,也是通过cond.acquire来加锁,然后这样当主线程cond.wait的时候能够有机会获得锁,然后执行,当returnThread快要结束,已经归还后,cond.notify/conf.notify_all来通知在等待该条件的主线程。这样就能够顺利执行。
def get(self):
self.cond.acquire()
try:
if self._shutdown:
raise RuntimeError(‘ThreadPool already shutdown.‘)
else:
for partition in self.partitions:
logger.info("parition #%d status:" % partition.get_partition_no())
logger.info("current load: %.2f" % partition.get_load())
logger.info("used size: %d" % partition.get_used_size())
logger.info("max size: %d" % partition.get_max_size())
partition = self.get_best_partition()
logger.info("best partition: %d" % partition.get_partition_no())
if partition.get_load() >= self.config.get_partition_upper_load_factor():
self.expand_pool_for_partition(partition, self.config.get_partition_increase_step())
logger.debug("partition avail size: %d" % partition.get_avail_size())
if partition.get_avail_size() == 0:
self.cond.wait()
tid = partition.take()
thread = self.object_pool[tid]
del self.object_pool[tid]
#update access time
thread.set_atime(time.time())
partition.increase_active_thread_count()
logger.debug("active thread count after get: %d" % self.get_active_thread_count())
return thread
finally:
self.cond.release()
def _return(self,thread):
self.cond.acquire()
logger.info("return back...")
try:
if (time.time() - thread.get_atime()) > self.config.get_timeout():
logger.info("destroy thread #%d" % thread.ident)
self.factory.destroy(thread)
else:
self.object_pool[thread.tid] = thread
self.partitions[thread.get_partition()].put(thread.tid)
self.partitions[thread.get_partition()].decrease_active_thread_count()
logger.info("return thread #%d back to pool on partition #%d" % (thread.ident,thread.get_partition()))
for partition in self.partitions:
logger.info("partition #%d status:" % partition.get_partition_no())
logger.info("current load: %.2f" % partition.get_load())
logger.info("used size: %d" % partition.get_used_size())
logger.info("max size: %d" % partition.get_max_size())
logger.debug("active thread count after return: %d" % self.get_active_thread_count())
self.cond.notify()
except Exception,e:
print e
#if return error, we should kill this thread
self.factory.destroy(thread)
finally:
self.cond.release()
原文:http://blog.csdn.net/liupc123123/article/details/38063253