Tenacity是一个通用的retry库,简化为任何任务加入重试的功能。
它还包含如下特性:
很多时候,我们都喜欢为代码加入retry功能。比如oauth验证,有时候网络不太灵,我们希望多试几次。这些retry应用的场景看起来不同,其实又很类似。都是判断代码是否正常运行,如果不是则重新开始。那么,有没有一种通用的办法来实现呢?
1)简单的使用方法,是直接给需要重试的代码加上@retry修饰器,代码抛出异常会被装饰器捕获并进行重试,异常抛出时会不断重试直到代码执行成功
from tenacity import retry import requests @retry def conn_req(): response = requests.get(url="http://www.baidu.com") if response.status_code == 200: return response.text raise Exception res = conn_req() print(res)
2)加上终止条件的retry
rom tenacity import retry,stop_after_attempt import requests @retry(stop=stop_after_attempt(3)) def conn_req(): response = requests.get(url="http://www.baibai.com") if response.status_code == 200: return response.text raise Exception res = conn_req() print(res)
from tenacity import retry,stop_after_attempt,stop_after_delay import requests @retry(stop=stop_after_delay(5)) def conn_req(): response = requests.get(url="http://www.baibai.com") if response.status_code == 200: return response.text raise Exception res = conn_req() print(res)
from tenacity import retry, stop_after_delay, stop_after_attempt import requests @retry(stop=stop_after_delay(5) |stop_after_attempt(5)) def stop_after_5_s(): response = requests.get(url=‘http://www.baidu.com‘) if response.status_code == 200: return response.text raise Exception res = stop_after_5_s() print(res)
3)代码重试前等待间隔
from tenacity import retry, wait_fixed import requests @retry(wait=wait_fixed(2)) def wait_2_s(): response = requests.get(url=‘http://www.baibai.com‘) if response.status_code == 200: return response.text raise Exception res = wait_2_s() print res
from tenacity import retry, wait_random import requests @retry(wait=wait_random(min=1, max=2)) def wait_2_s(): response = requests.get(url=‘http://www.baibai.com‘) if response.status_code == 200: return response.text raise Exception res = wait_2_s() print res
from tenacity import retry, wait_exponential import requests @retry(wait=wait_exponential(multiplier=2, min=3, max=100)) # 重试时间间隔满足:2^n * multiplier, n为重试次数,但最多间隔10秒 def wait_2_s(): response = requests.get(url=‘http://www.baibai.com‘) if response.status_code == 200: return response.text raise Exception res = wait_2_s() print res
4)带有触发条件的retry语句
from tenacity import retry, retry_if_exception_type, retry_if_result @retry(retry=retry_if_exception_type(IOError)) def might_io_error(): print("Retry forever with no wait if an IOError occurs, raise any other errors") raise Exception def is_none_p(value): """Return True if value is None""" return value is None @retry(retry=retry_if_result(is_none_p)) def might_return_none(): print("Retry with no wait if return value is None") @retry(retry=(retry_if_result(is_none_p) | retry_if_exception_type())) def might_return_none(): print("Retry forever ignoring Exceptions with no wait if return value is None")
5)在retry前后增加log
ty import retry, stop_after_attempt, before_log, after_log, before_sleep_log import logging logger = logging.getLogger(__name__) @retry(stop=stop_after_attempt(3), before=before_log(logger, logging.DEBUG)) def raise_my_exception(): raise MyException("Fail") @retry(stop=stop_after_attempt(3), after=after_log(logger, logging.DEBUG)) def raise_my_exception(): raise MyException("Fail") @retry(stop=stop_after_attempt(3), before_sleep=before_sleep_log(logger, logging.DEBUG)) def raise_my_exception(): raise MyException("Fail")
转自:https://blog.csdn.net/qq_37287621/article/details/95055718
原文:https://www.cnblogs.com/tjp40922/p/14127429.html