序
本文主要研究一下HystrixEventNotifier
HystrixEventNotifier
/**
* Abstract EventNotifier that allows receiving notifications for different events with default implementations.
* <p>
* See {@link HystrixPlugins} or the Hystrix GitHub Wiki for information on configuring plugins: <a
* href="https://github.com/Netflix/Hystrix/wiki/Plugins">https://github.com/Netflix/Hystrix/wiki/Plugins</a>.
* <p>
* <b>Note on thread-safety and performance</b>
* <p>
* A single implementation of this class will be used globally so methods on this class will be invoked concurrently from multiple threads so all functionality must be thread-safe.
* <p>
* Methods are also invoked synchronously and will add to execution time of the commands so all behavior should be fast. If anything time-consuming is to be done it should be spawned asynchronously
* onto separate worker threads.
*/
public abstract class HystrixEventNotifier {
/**
* Called for every event fired.
* <p>
* <b>Default Implementation: </b> Does nothing
*
* @param eventType event type
* @param key event key
*/
public void markEvent(HystrixEventType eventType, HystrixCommandKey key) {
// do nothing
}
/**
* Called after a command is executed using thread isolation.
* <p>
* Will not get called if a command is rejected, short-circuited etc.
* <p>
* <b>Default Implementation: </b> Does nothing
*
* @param key
* {@link HystrixCommandKey} of command instance.
* @param isolationStrategy
* {@link ExecutionIsolationStrategy} the isolation strategy used by the command when executed
* @param duration
* time in milliseconds of executing <code>run()</code> method
* @param eventsDuringExecution
* {@code List<HystrixEventType>} of events occurred during execution.
*/
public void markCommandExecution(HystrixCommandKey key, ExecutionIsolationStrategy isolationStrategy, int duration, List<HystrixEventType> eventsDuringExecution) {
// do nothing
}
}
- 这个notifier是同步调用的,因此里头方法的实现不能太耗时,不然则会阻塞,如果方法太耗时则需要考虑异步到其他线程
- markEvent,在每个事件触发的时候调用,markCommandExecution是在使用线程隔离方式的时候会调用
HystrixEventNotifierDefault
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/strategy/eventnotifier/HystrixEventNotifierDefault.java
/**
* Default implementations of {@link HystrixEventNotifier} that does nothing.
*
* @ExcludeFromJavadoc
*/
public class HystrixEventNotifierDefault extends HystrixEventNotifier {
private static HystrixEventNotifierDefault INSTANCE = new HystrixEventNotifierDefault();
private HystrixEventNotifierDefault() {
}
public static HystrixEventNotifier getInstance() {
return INSTANCE;
}
}
默认实现不做任何操作
HystrixEventType
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/HystrixEventType.java
/**
* Various states/events that execution can result in or have tracked.
* <p>
* These are most often accessed via {@link HystrixRequestLog} or {@link HystrixCommand#getExecutionEvents()}.
*/
public enum HystrixEventType {
EMIT(false),
SUCCESS(true),
FAILURE(false),
TIMEOUT(false),
BAD_REQUEST(true),
SHORT_CIRCUITED(false),
THREAD_POOL_REJECTED(false),
SEMAPHORE_REJECTED(false),
FALLBACK_EMIT(false),
FALLBACK_SUCCESS(true),
FALLBACK_FAILURE(true),
FALLBACK_REJECTION(true),
FALLBACK_MISSING(true),
EXCEPTION_THROWN(false),
RESPONSE_FROM_CACHE(true),
CANCELLED(true),
COLLAPSED(false),
COMMAND_MAX_ACTIVE(false);
private final boolean isTerminal;
HystrixEventType(boolean isTerminal) {
this.isTerminal = isTerminal;
}
public boolean isTerminal() {
return isTerminal;
}
//......
/**
* List of events that throw an Exception to the caller
*/
public final static List<HystrixEventType> EXCEPTION_PRODUCING_EVENT_TYPES = new ArrayList<HystrixEventType>();
/**
* List of events that are terminal
*/
public final static List<HystrixEventType> TERMINAL_EVENT_TYPES = new ArrayList<HystrixEventType>();
static {
EXCEPTION_PRODUCING_EVENT_TYPES.add(BAD_REQUEST);
EXCEPTION_PRODUCING_EVENT_TYPES.add(FALLBACK_FAILURE);
EXCEPTION_PRODUCING_EVENT_TYPES.add(FALLBACK_MISSING);
EXCEPTION_PRODUCING_EVENT_TYPES.add(FALLBACK_REJECTION);
for (HystrixEventType eventType: HystrixEventType.values()) {
if (eventType.isTerminal()) {
TERMINAL_EVENT_TYPES.add(eventType);
}
}
}
//......
}
- 这里定了HystrixEvent的枚举,然后还对这些事件进行了分类,分为EXCEPTION_PRODUCING_EVENT_TYPES以及EXCEPTION_PRODUCING_EVENT_TYPES两类
- EXCEPTION_PRODUCING_EVENT_TYPES包括BAD_REQUEST、FALLBACK_FAILURE、FALLBACK_MISSING、FALLBACK_REJECTION
- TERMINAL_EVENT_TYPES主要是根据event枚举的isTerminal属性来,包括SUCCESS、BAD_REQUEST、FALLBACK_SUCCESS、FALLBACK_FAILURE、FALLBACK_REJECTION、FALLBACK_MISSING、RESPONSE_FROM_CACHE、CANCELLED
markCommandExecution
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.java
/**
* This decorates "Hystrix" functionality around the run() Observable.
*
* @return R
*/
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
final Action1<R> markEmits = new Action1<R>() {
@Override
public void call(R r) {
if (shouldOutputOnNextEvents()) {
executionResult = executionResult.addEvent(HystrixEventType.EMIT);
eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
}
if (commandIsScalar()) {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
circuitBreaker.markSuccess();
}
}
};
final Action0 markOnCompleted = new Action0() {
@Override
public void call() {
if (!commandIsScalar()) {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
circuitBreaker.markSuccess();
}
}
};
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
circuitBreaker.markNonSuccess();
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
/*
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
*/
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
};
final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
@Override
public void call(Notification<? super R> rNotification) {
setRequestContextIfNeeded(currentRequestContext);
}
};
Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
- 这里在markEmits的action里头如果是scalar command,会调用markCommandExecution
- 这里在markOnCompleted的action里头,如果不是scalar command,则会调用markCommandExecution
小结
在HystrixCommand以及HystrixObservableCommand调用的时候,都会调用HystrixEventNotifier来发布事件,提供给开发者自定义实现,来做指标收集及监控报警。