序
本文主要研究一下HystrixCircuitBreaker
Flow Chart
HystrixCircuitBreaker
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/HystrixCircuitBreaker.java
/**
* Circuit-breaker logic that is hooked into {@link HystrixCommand} execution and will stop allowing executions if failures have gone past the defined threshold.
* <p>
* The default (and only) implementation will then allow a single retry after a defined sleepWindow until the execution
* succeeds at which point it will again close the circuit and allow executions again.
*/
public interface HystrixCircuitBreaker {
/**
* Every {@link HystrixCommand} requests asks this if it is allowed to proceed or not. It is idempotent and does
* not modify any internal state, and takes into account the half-open logic which allows some requests through
* after the circuit has been opened
*
* @return boolean whether a request should be permitted
*/
boolean allowRequest();
/**
* Whether the circuit is currently open (tripped).
*
* @return boolean state of circuit breaker
*/
boolean isOpen();
/**
* Invoked on successful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state.
*/
void markSuccess();
/**
* Invoked on unsuccessful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state.
*/
void markNonSuccess();
/**
* Invoked at start of command execution to attempt an execution. This is non-idempotent - it may modify internal
* state.
*/
boolean attemptExecution();
}
- HystrixCircuitBreaker的逻辑是内嵌入HystrixCommand的执行中,如果断路器打开,则HystrixCommand不执行,直接进入fallback
- 每个HystrixCommand执行之前先调用allowRequest()方法,来判断是否可以执行,在断路器半开状态允许部分请求进入
- isOpen()方法用来判断断路器是否已经打开
- markSuccess()用于在半开状态标记成功的请求
- markNonSuccess()用于在半开状态标记不成功的请求
- attemptExecution()这个方法不是幂等操作,会修改内部状态,用来尝试去执行command
HystrixCircuitBreakerImpl
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/HystrixCircuitBreaker.java
/**
* The default production implementation of {@link HystrixCircuitBreaker}.
*
* @ExcludeFromJavadoc
* @ThreadSafe
*/
/* package */class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
private final HystrixCommandProperties properties;
private final HystrixCommandMetrics metrics;
enum Status {
CLOSED, OPEN, HALF_OPEN;
}
private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED);
private final AtomicLong circuitOpened = new AtomicLong(-1);
private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null);
protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
this.properties = properties;
this.metrics = metrics;
//On a timer, this will set the circuit between OPEN/CLOSED as command executions occur
Subscription s = subscribeToStream();
activeSubscription.set(s);
}
private Subscription subscribeToStream() {
/*
* This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream
*/
return metrics.getHealthCountsStream()
.observe()
.subscribe(new Subscriber<HealthCounts>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(HealthCounts hc) {
// check if we are past the statisticalWindowVolumeThreshold
if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
// we are not past the minimum volume threshold for the stat window,
// so no change to circuit status.
// if it was CLOSED, it stays CLOSED
// if it was half-open, we need to wait for a successful command execution
// if it was open, we need to wait for sleep window to elapse
} else {
if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
//we are not past the minimum error threshold for the stat window,
// so no change to circuit status.
// if it was CLOSED, it stays CLOSED
// if it was half-open, we need to wait for a successful command execution
// if it was open, we need to wait for sleep window to elapse
} else {
// our failure rate is too high, we need to set the state to OPEN
if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
circuitOpened.set(System.currentTimeMillis());
}
}
}
}
});
}
@Override
public void markSuccess() {
if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
//This thread wins the race to close the circuit - it resets the stream to start it over from 0
metrics.resetStream();
Subscription previousSubscription = activeSubscription.get();
if (previousSubscription != null) {
previousSubscription.unsubscribe();
}
Subscription newSubscription = subscribeToStream();
activeSubscription.set(newSubscription);
circuitOpened.set(-1L);
}
}
@Override
public void markNonSuccess() {
if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
//This thread wins the race to re-open the circuit - it resets the start time for the sleep window
circuitOpened.set(System.currentTimeMillis());
}
}
@Override
public boolean isOpen() {
if (properties.circuitBreakerForceOpen().get()) {
return true;
}
if (properties.circuitBreakerForceClosed().get()) {
return false;
}
return circuitOpened.get() >= 0;
}
@Override
public boolean allowRequest() {
if (properties.circuitBreakerForceOpen().get()) {
return false;
}
if (properties.circuitBreakerForceClosed().get()) {
return true;
}
if (circuitOpened.get() == -1) {
return true;
} else {
if (status.get().equals(Status.HALF_OPEN)) {
return false;
} else {
return isAfterSleepWindow();
}
}
}
private boolean isAfterSleepWindow() {
final long circuitOpenTime = circuitOpened.get();
final long currentTime = System.currentTimeMillis();
final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
return currentTime > circuitOpenTime + sleepWindowTime;
}
@Override
public boolean attemptExecution() {
if (properties.circuitBreakerForceOpen().get()) {
return false;
}
if (properties.circuitBreakerForceClosed().get()) {
return true;
}
if (circuitOpened.get() == -1) {
return true;
} else {
if (isAfterSleepWindow()) {
if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
//only the first request after sleep window should execute
return true;
} else {
return false;
}
} else {
return false;
}
}
}
}
- HystrixCircuitBreakerImpl为默认的内部实现,status默认是CLOSED,circuitOpened默认为-1
- markSuccess()会先把status从HALF_OPEN改为CLOSED,重置circuitOpened为-1
- markNonSuccess()会把status从HALF_OPEN改为OPEN,设置circuitOpened为当前时间戳
- isOpen(),如果配置文件circuitBreakerForceOpen为true,则优先返回true,如果配置文件circuitBreakerForceClosed,则返回false,最后根据circuitOpened是否>=0来判断
- allowRequest()也是优先判断配置文件两个属性,之后如果circuitOpened为-1则返回true;如果处于HALF_OPEN则返回false,否则判断isAfterSleepWindow(),即如果当前时间-circuitOpenTime > circuitBreakerSleepWindowInMilliseconds,则返回true
- attemptExecution()方法也是优先判断配置文件两个属性,之后判断如果circuitOpened为-1则返回true,否则判断isAfterSleepWindow(),如果处于窗口时间,则会更改状态从OPEN到HALF_OPEN。
AbstractCommand.executeCommandAndObserve
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.java
/**
* This decorates "Hystrix" functionality around the run() Observable.
*
* @return R
*/
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
final Action1<R> markEmits = new Action1<R>() {
@Override
public void call(R r) {
if (shouldOutputOnNextEvents()) {
executionResult = executionResult.addEvent(HystrixEventType.EMIT);
eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
}
if (commandIsScalar()) {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
circuitBreaker.markSuccess();
}
}
};
final Action0 markOnCompleted = new Action0() {
@Override
public void call() {
if (!commandIsScalar()) {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
circuitBreaker.markSuccess();
}
}
};
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
circuitBreaker.markNonSuccess();
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
/*
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
*/
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
};
final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
@Override
public void call(Notification<? super R> rNotification) {
setRequestContextIfNeeded(currentRequestContext);
}
};
Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
- 这里看最后的配置,先执行markEmits,然后成功完成调用markOnCompleted,如果遇到失败则调用handleFallback
- 当commandIsScalar为true时,在markEmits会调用circuitBreaker.markSuccess();commandIsScalar为false时,markOnCompleted会调用circuitBreaker.markSuccess()
- handleFallback这里会标记circuitBreaker.markNonSuccess()
AbstractCommand.applyHystrixSemantics
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.java
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// mark that we're starting execution on the ExecutionHook
// if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent
executionHook.onStart(_cmd);
/* determine if we're allowed to execute */
if (circuitBreaker.attemptExecution()) {
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
if (executionSemaphore.tryAcquire()) {
try {
/* used to track userThreadExecutionTime */
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
return handleSemaphoreRejectionViaFallback();
}
} else {
return handleShortCircuitViaFallback();
}
}
- 这个方法将普通command应用上hystrix的语义,即会调用circuitBreaker.attemptExecution(),如果circuitOpened==-1的话,返回true,否则判断如果时间窗口已经过,则从OPEN改为HALF_OPEN
- 这个方法跟allowRequest()方法有点类似,不过allowRequest()貌似没有地方调用;而attemptExecution()则会在需要的时候,试图去半开断路器
- 这个方法真正执行的时候调用了executeCommandAndObserve方法,调用之前会先executionSemaphore.tryAcquire()获取信号量,获取到了才执行
小结
HystrixCircuitBreaker内置给HystrixCommand去调用,请求之前先调用circuitBreaker.attemptExecution(),之后执行时调用executeCommandAndObserve,在开始或完成的时候标记circuitBreaker.markSuccess(),fallback的时候标记circuitBreaker.markNonSuccess()去完成status及circuitOpened字段的变更。超时抛出异常的部分详见聊聊hystrix的timeout处理。