在大量微服务所构成的分布式系统中,某个基础服务的不可用,可能导致服务雪崩效应,即:依赖该基础服务的所有其它基础服务及级联的上游服务的级联性不可用故障。
熔断机制是防止服务雪崩的基本技术手段。通过检查依赖服务的失败状况并封装熔断逻辑,阻止在依赖服务暂时出现故障期间的错误反复不断地向上传播。基本思路是快速失败和 Fallback 机制。
熔断的主要目的:防止服务雪崩效应;防止局部次要失败影响整体可用性。
熔断机制的实现思路主要包括:
重点是断路器和事件机制的设计实现。
断路器
熔断情形主要有:强制直接熔断;断路器开启、调用次数达到总数阈值且失败事件达到指定阈值(失败百分比、失败绝对次数等)。主要是断路状态机的设计实现。断路器状态机如图所示:
断路状态机有三个状态: CLOSE (关闭),HALF-OPEN (半开),OPEN (开启)。断路器默认是 CLOSE 状态,此时,正常调用依赖服务。
当调用次数达到总数阈值且失败事件的阈值达到指定值时,进入 OPEN 状态,开启降级逻辑;当断路器位于 OPEN 状态时,将进入一段断路时间窗期,这个时间窗内的请求将不会转发给依赖服务,而是转发给指定的降级逻辑;当断路器位于 OPEN 状态,且过了断路时间窗期,就会进入 HALF-OPEN 状态。
断路器使用称为 HALF-OPEN 状态的监视和反馈机制来了解依赖服务是否以及何时恢复。在 HALF-OPEN 状态,根据规则将部分请求转发给依赖服务(默认是只重试第一次请求),若调用成功则进入 CLOSE 状态,恢复调用依赖服务,若对依赖服务的调用超时或失败,则断路器保持在 OPEN 状态。
事件统计
可以划分为事件窗口和若干个数据桶分别统计,再进行数据聚合。
事件机制
将依赖服务的调用结果转化为事件,触发事件监听器,对成功及失败事件进行统计,判断是否触发断路器的熔断,以决定下一次调用是调用依赖服务还是降级策略或者直接抛出异常。
Hystrix 是业界采用的熔断机制的主要实现之一。
Hystrix 使用起来比较简单:使用 @EnableCircuitBreaker 开启熔断功能,使用 @HystrixCommand 为方法添加熔断降级配置。下面先给出代码示例,再讲讲原理的实现和部分源码的理解。
步骤一:引入 POM 依赖(SpringBoot 版本为 1.5.9.RELEASE )
<!-- 引入 hystrix 熔断 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
<version>1.4.7.RELEASE</version>
</dependency>
步骤二:在应用启动类 Application.java 上开启熔断
package cc.lovesq;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.ImportResource;
@EnableCircuitBreaker // 开启熔断机制
@SpringBootApplication
@Configuration
@ComponentScan(basePackages = {"cc.lovesq.*"})
@MapperScan(basePackages = {"cc.lovesq.dao"})
@ImportResource(locations={"classpath:spring.xml"})
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
步骤三:为方法配置熔断设置。通常是在 Service 层做熔断。
package cc.lovesq.service.impl;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixProperty;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.stereotype.Component;
import java.util.Random;
/**
* @Description 返回随机值
* @Date 2021/1/28 4:37 上午
* @Created by qinshu
*/
@Component
public class RandomValueService {
private static Log log = LogFactory.getLog(RandomValueService.class);
Random rand = new Random(47);
@HystrixCommand(commandKey = "randInt", fallbackMethod = "randIntDowngrade",
commandProperties = {
@HystrixProperty(name="metrics.rollingStats.timeInMilliseconds", value="5000"),
@HystrixProperty(name="circuitBreaker.requestVolumeThreshold", value="10"),
@HystrixProperty(name="circuitBreaker.errorThresholdPercentage", value="50")
})
public Integer randInt() {
int v = rand.nextInt(100);
if (v == 0) {
throw new RuntimeException("Invalid number");
}
return v;
}
public Integer randIntDowngrade() {
log.info("randIntDowngrade");
return 0;
}
}
步骤四:测试熔断。
package cc.lovesq.experiments;
import cc.lovesq.service.impl.RandomValueService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @Description 熔断实现
* @Date 2021/1/28 4:39 上午
* @Created by qinshu
*/
@Component
public class HystrixExperiment implements IExperiment {
private static Log log = LogFactory.getLog(HystrixExperiment.class);
@Resource
private RandomValueService randomValueService;
@Override
public void test() {
log.info("----HystrixExperiment-start----");
for (int i=0; i < 100000; i++) {
try {
randomValueService.randInt();
} catch (Exception ex) {
log.error("CatchedException: " + ex.getMessage());
}
}
log.info("----HystrixExperiment-end----");
}
}
Hystrix 将服务调用封装成一个 HystrixCommand 来执行。HystrixCommand 的熔断流程如下图所示:
Hystrix 的整个流程如下图所示:
Hystrix 的基本实现如下:
源码解析
Hystrix 借用了 RxJava 库的响应式编程机制,代码有点绕,需要细细多看几遍。总的来说,就是 Observable 对象产生的事件导致的回调和转换,以及回调和转换的级联处理。
要理解 Hystrix 源码,需要有一定的响应式编程基础,可参阅 “响应式编程库RxJava初探”。为什么要采用响应式编程模型呢?因为 Hystrix 要处理的回调比较复杂:
可以看到,这需要一个连续不断的回调流,连续不断地处理各种事件。响应式编程模型正好能够对付这种编程需求。
GenericCommand 的执行入口代码如下:
public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
其中 queue 方法的代码如下:
public Future<R> queue() {
/*
* The Future returned by Observable.toBlocking().toFuture() does not implement the
* interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
* thus, to comply with the contract of Future, we must wrap around it.
*/
final Future<R> delegate = toObservable().toBlocking().toFuture(); // 这句是重点
final Future<R> f = new Future<R>() {
// 为 delegate 对象封装线程中断功能,暂时跳过
}
return f;
重点是 toObservable().toBlocking().toFuture();
这句,主要干的事情是:首先将 GenericCommand 的执行转换成一个 Observable 对象,从而能够变成可监听的事件,连接后面的事件统计、断路器状态机及熔断功能,最后再转换成一个 Future 对象,来获取服务调用命令的结果。
先说说 toBlocking().toFuture() 这部分,toBlocking() 使用了装饰器模式,将 Observable 对象装饰成一个可阻塞的 BlockingObservable 对象,阻塞并等待被装饰的 Observable 对象的执行完成事件或发生异常事件;toFuture 方法将 BlockingObservable 的执行转换成一个 Future 对象,使用 CountDownLatch 锁来实现阻塞功能(that 就是被装饰的 Observable 对象):
public static <T> Future<T> toFuture(Observable<? extends T> that) {
final CountDownLatch finished = new CountDownLatch(1);
final AtomicReference<T> value = new AtomicReference<T>();
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
@SuppressWarnings("unchecked")
final Subscription s = ((Observable<T>)that).single().subscribe(new Subscriber<T>() {
@Override
public void onCompleted() {
finished.countDown();
}
@Override
public void onError(Throwable e) {
error.compareAndSet(null, e);
finished.countDown();
}
@Override
public void onNext(T v) {
// "single" guarantees there is only one "onNext"
value.set(v);
}
});
return new Future<T>() {
// 根据 finished 的状态、value 及 error 封装一个 Future 的实现,等待 that 的订阅执行完成之后,获取结果;暂时跳过
};
}
接下来就是重点的 toObservable 方法了。 这个方法首先定义了一些回调函数:
然后创建了一个装配了这些回调函数的带缓存功能的命令执行的 Observable 对象,在命令执行的不同阶段或发生异常时,就会执行对应的方法。
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
// 命令执行状态判断
// 命令执行日志记录
// 带缓存的命令执行
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
// put in cache
if (requestCacheEnabled && cacheKey != null) {
// 从缓存里取 afterCache
} else {
afterCache = hystrixObservable;
}
return afterCache
.doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
.doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
.doOnCompleted(fireOnCompletedHook);
}
});
Observable.defer 将一个 Func0[Observable] 对象包装成一个 Observable 对象。
接着看主要方法 applyHystrixSemantics ,这一段就是根据断路器状态及线程池许可来决定是否执行依赖服务调用。
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// mark that we‘re starting execution on the ExecutionHook
// if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent
executionHook.onStart(_cmd);
/* determine if we‘re allowed to execute */
if (circuitBreaker.attemptExecution()) { // 断路器内部的状态逻辑,断路器允许请求服务调用的情形
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() { // 归还执行调用服务的许可
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call(Throwable t) { // 标记并通知异常事件,可通过插件来实现
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
if (executionSemaphore.tryAcquire()) { // 线程池是否有许可来执行服务调用
try {
/* used to track userThreadExecutionTime */
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown) // 设置异常回调
.doOnTerminate(singleSemaphoreRelease) // 设置终止时回调,归还执行调用服务的许可
.doOnUnsubscribe(singleSemaphoreRelease); // 设置取消回调,归还执行调用服务的许可
} catch (RuntimeException e) {
return Observable.error(e);
}
} else { // 没有足够线程来执行服务调用,采取降级策略
return handleSemaphoreRejectionViaFallback();
}
} else { // 断路器不允许请求服务调用的情形,采取降级策略
return handleShortCircuitViaFallback();
}
}
最后看一下executeCommandAndObserve 方法。这个方法也是一样的套路:先定义若干回调函数,然后创建一个装配了这些回调函数的 Observable 对象,以便在适当的时候被触发调用。这里面还封装了事件的通知,比如 eventNotifier.markEvent(HystrixEventType.XXX, commandKey);
以及执行结果的处理,比如 executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
。ExecutionResult 对象使用了 Immutable Variable 模式,简化了对执行结果的并发处理。
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
final Action1<R> markEmits = new Action1<R>() {
@Override
public void call(R r) {
if (shouldOutputOnNextEvents()) {
executionResult = executionResult.addEvent(HystrixEventType.EMIT);
eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
}
if (commandIsScalar()) {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
circuitBreaker.markSuccess();
}
}
};
final Action0 markOnCompleted = new Action0() {
@Override
public void call() {
if (!commandIsScalar()) {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
circuitBreaker.markSuccess();
}
}
};
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
circuitBreaker.markNonSuccess();
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
/*
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
*/
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
};
final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
@Override
public void call(Notification<? super R> rNotification) {
setRequestContextIfNeeded(currentRequestContext);
}
};
Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
Hystrix 的代码处理流程先分析到这里。整个代码套路基本明了:
最后说下事件统计,主要有两种主要的滑动窗口计数机制,其实现分别在对象 OperatorWindowWithTime(在指定时间范围内聚合计数) 和 OperatorWindowWithSize(在指定数量内聚合计数) 里。比如 BucketedCounterStream 的计数,使用 OperatorWindowWithTime 来实现:
this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>() {
@Override
public Observable<Bucket> call() {
return inputEventStream
.observe()
.window(bucketSizeInMs, TimeUnit.MILLISECONDS) //bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext
.flatMap(reduceBucketToSummary) //for a given bucket, turn it into a long array containing counts of event types
.startWith(emptyEventCountsToStart); //start it with empty arrays to make consumer logic as generic as possible (windows are always full)
}
});
熔断机制是分布式的微服务体系中必不可少的技术手段,用来防止服务雪崩。本文总结了熔断机制的实现原理及 Hystrix 的使用和基本的源码解析。
原文:https://www.cnblogs.com/lovesqcc/p/14391770.html