前两篇文章我们已经讲解了Hystrix的一些基本概念,并举了一些demo说明如何使用Hystrix,这篇文章我们更深一步,通过阅读一些源码来看下Hystrix是怎么工作的。我们主要根据官方文档上的一个流程图,对其中几个主要的过程从源码层面来研究下。
创建HystrixCommand(HystrixObservableCommand)
public class GetUserAccountCommand extends HystrixCommand<UserAccount> {
private final HttpCookie httpCookie;
private final UserCookie userCookie;
public GetUserAccountCommand(HttpCookie cookie) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("User")));
this.httpCookie = cookie;
/* parse or throw an IllegalArgumentException */
this.userCookie = UserCookie.parseCookie(httpCookie);
}
@Override
protected UserAccount run() {
return new UserAccount(86975, "John James", 2, true, false, true);
}
@Override
protected String getCacheKey() {
return httpCookie.getValue();
}
@Override
protected UserAccount getFallback() {
return new UserAccount(userCookie.userId, userCookie.name, userCookie.accountType, true, true, true);
}
}
上面我们通过继承HystrixCommand实现run,getFallback,getCacheKey等方法实现了一个自己的Command
Execute the Command
执行一个command可以有以下四种方式(前两种是HystrixCommand独有的):
- execute():阻塞方法,返回一个单个的response(或者异常)
- queue():非阻塞方法,返回一个与单个response关联的future
- observe():返回一个Observable,非延时方法,不管有没有订阅者,都会立即执行命令(不用担心后订阅的订阅者接收不到事件,因为会把事件放到一个RelaySubject里面)
- toObservable():同样是返回一个Observable,延时方法。只有订阅了这个Observable,才会执行命令。
UserAccount user = new GetUserAccountCommand(new HttpCookie("mockKey", "mockValueFromHttpRequest")).execute();
public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
public Future<R> queue() {
/*
* The Future returned by Observable.toBlocking().toFuture() does not implement the
* interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
* thus, to comply with the contract of Future, we must wrap around it.
*/
final Future<R> delegate = toObservable().toBlocking().toFuture();
final Future<R> f = new Future<R>() {
@Override
public R get() throws InterruptedException, ExecutionException {
return delegate.get();
}
@Override
public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return delegate.get(timeout, unit);
}
};
return f;
}
上述代码,我们通过GetUserAccountCommand.execute来执行命令,debug源码可以看到,execute命令其实也是调用了上面的queue方法返回一个future,然后通过future.get阻塞获取response。继续深入future方法,我们可以先不用管那个代理的future(主要用来实现中断的),final Future
事实上,不管是execute,queue最终都会调用toObservable。也就是说同步的HystrixCommand最终都会依赖Observable,尽管HystrixCommand是用来发射单个事件的
Is the Response Cached
AbstractCommand
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
/* this is a stateful object so can only be used once */
if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
//TODO make a new error type for this
throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
}
commandStartTimestamp = System.currentTimeMillis();
if (properties.requestLogEnabled().get()) {
// log this command execution regardless of what happened
if (currentRequestLog != null) {
currentRequestLog.addExecutedCommand(_cmd);
}
}
final boolean requestCacheEnabled = isRequestCachingEnabled();
final String cacheKey = getCacheKey();
/* try from cache first */
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
// put in cache
if (requestCacheEnabled && cacheKey != null) {
// wrap it for caching
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
// another thread beat us so we'll use the cached value instead
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
// we just created an ObservableCommand so we cast and return it
afterCache = toCache.toObservable();
}
} else {
afterCache = hystrixObservable;
}
return afterCache
.doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
.doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
.doOnCompleted(fireOnCompletedHook);
}
});
上述代码中有个判断final boolean requestCacheEnabled = isRequestCachingEnabled();可以看到如果我们的Command实现了getCacheKey方法,并且requestCacheEnabled(这个属性默认是true,可以通过调用HystrixCommand的构造方法传入一个setter对象修改默认属性)这样就不会执行后续的run方法,就会直接返回一个缓存的Observable。(上一篇文章我们已经提到,必须是同一个request context里面的两个command才能用到缓存)
protected boolean isRequestCachingEnabled() {
return properties.requestCacheEnabled().get() && getCacheKey() != null;
}
Is the Circuit Open?
可以看到在缓存逻辑过了之后,就会判断断路器(Circuit)的状态是否是open,如果是open状态,就会直接调用fallback方法;如果不是就继续后面的流程,这里断路器要重点说一下,我们在第一篇文章中就说了Hystrix可以解决在我们依赖的外部服务异常造成瀑布式报错,就是通过这个断路器来控制的,所以我们很有必要了解它的工作过程。
Circuit Breaker
从上面的图我们可以看到在HystrixCommand(HystrixObservableCommand)在执行过程中会与HystrixCircuitBreaker交互,执行之前会根据断路器的状态来决定后续流程,命令执行成功/失败/超时又会向断路器上报数据,断路器根据这些数据来改变状态。下面是断路器的一个流程图
- 如果通过当前断路器请求达到了阈值HystrixCommandProperties.circuitBreakerRequestVolumeThreshold()
- 如果当前的错误率达到了阈值HystrixCommandProperties.circuitBreakerErrorThresholdPercentage()
- 那么就会把断路器的状态从CLOSED变成OPEN
- OPEN状态时候,就会断路次断路器上所有的请求(直接返回fallback方法)
- 经过一段时间(HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds()默认5s)允许一个请求进来,此时断路器状态变为HALF-OPEN,如果这个请求还是失败,那么状态就还是OPEN ,继续等待一个时间;如果此次请求成功,就把状态变更为CLOSED,然后继续循环1的过程。
下面通过源码,来看一下上述断路器的执行过程。
AbstractCommand
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// mark that we're starting execution on the ExecutionHook
// if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent
executionHook.onStart(_cmd);
/* determine if we're allowed to execute */
if (circuitBreaker.attemptExecution()) {
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
if (executionSemaphore.tryAcquire()) {
try {
/* used to track userThreadExecutionTime */
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
return handleSemaphoreRejectionViaFallback();
}
} else {
return handleShortCircuitViaFallback();
}
}
commad在request cache逻辑之后,run方法之前都会运行circuitBreaker.attemptExecution(),官方注释说明这个方法不是一个幂等方法,会改变内部的状态。
/**
* Invoked at start of command execution to attempt an execution. This is non-idempotent - it may modify internal
* state.
*/
boolean attemptExecution();
HystrixCircuitBreaker
@Override
public boolean attemptExecution() {
if (properties.circuitBreakerForceOpen().get()) {
return false;
}
if (properties.circuitBreakerForceClosed().get()) {
return true;
}
if (circuitOpened.get() == -1) {
return true;
} else {
if (isAfterSleepWindow()) {
//only the first request after sleep window should execute
//if the executing command succeeds, the status will transition to CLOSED
//if the executing command fails, the status will transition to OPEN
//if the executing command gets unsubscribed, the status will transition to OPEN
if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
return true;
} else {
return false;
}
} else {
return false;
}
}
}
private boolean isAfterSleepWindow() {
final long circuitOpenTime = circuitOpened.get();
final long currentTime = System.currentTimeMillis();
final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
return currentTime > circuitOpenTime + sleepWindowTime;
}
@Override
public void markSuccess() {
if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
//This thread wins the race to close the circuit - it resets the stream to start it over from 0
metrics.resetStream();
Subscription previousSubscription = activeSubscription.get();
if (previousSubscription != null) {
previousSubscription.unsubscribe();
}
Subscription newSubscription = subscribeToStream();
activeSubscription.set(newSubscription);
circuitOpened.set(-1L);
}
}
@Override
public void markNonSuccess() {
if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
//This thread wins the race to re-open the circuit - it resets the start time for the sleep window
circuitOpened.set(System.currentTimeMillis());
}
}
上述代码,就可以说明一开始我们解释的断路器状态变迁过程。如果状态为OPEN,返回false;CLOSED返回true;经过了sleepWindow时间后,允许一个请求进来,此时断路器状态从OPEN变为HALF_OPEN,这个status是一个原子性的操作。最终这个放进来的请求完成根据成功失败会调用对应的markSuccess或者markNonSuccess,通过一个CAS操作,将HALF_OPEN状态变为CLOSED或者OPEN,如果没有成功,会circuitOpened.set(System.currentTimeMillis())更新此次失败的时间;成功更新circuitOpened.set(-1L),这样后续请求就都可以进来了。
Is the Thread Pool/Queue/Semaphore Full?
之前说过Hystrix通过为每个单独的外部服务创建一个线程池来达到隔离外部服务的目的,这个很明显的一个好处是外部服务挂了,不会影响我们。但是如果依赖过多的服务,会不会造成创建的线程池过多,这是一个问题。Hystix官方有个压测的结果,可以参考下。如果依赖的服务是一些内存级别的很快的操作,那么创建线程池带来的上下文切换的消耗可能会过大,这个时候Hystrix也提供了Semaphore,可以控制同时并发的请求数,直接在当前线程运行。上述不管是线程池还是Semaphore如果满了,就会执行fallback方法。并且我们可以通过设置execution.isolation.strategy动态调整策略
AbstractCommand
protected TryableSemaphore getExecutionSemaphore() {
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.SEMAPHORE) {
if (executionSemaphoreOverride == null) {
TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name());
if (_s == null) {
// we didn't find one cache so setup
executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests()));
// assign whatever got set (this or another thread)
return executionSemaphorePerCircuit.get(commandKey.name());
} else {
return _s;
}
} else {
return executionSemaphoreOverride;
}
} else {
// return NoOp implementation since we're not using SEMAPHORE isolation
return TryableSemaphoreNoOp.DEFAULT;
}
}
可以看到,如果配置的策略是SEMAPHORE,会创建一个许可是properties.executionIsolationSemaphoreMaxConcurrentRequests()的SEMAPHORE,可以看到tryAcquire方法会根据设置的许可直接返回true或者false,不会阻塞。
static class TryableSemaphoreActual implements TryableSemaphore {
protected final HystrixProperty<Integer> numberOfPermits;
private final AtomicInteger count = new AtomicInteger(0);
public TryableSemaphoreActual(HystrixProperty<Integer> numberOfPermits) {
this.numberOfPermits = numberOfPermits;
}
@Override
public boolean tryAcquire() {
int currentCount = count.incrementAndGet();
if (currentCount > numberOfPermits.get()) {
count.decrementAndGet();
return false;
} else {
return true;
}
}
@Override
public void release() {
count.decrementAndGet();
}
@Override
public int getNumberOfPermitsUsed() {
return count.get();
}
}
如果配置的不是SEMAPHORE,则返回TryableSemaphoreNoOp.DEFAULT,这个其实不是Semaphore,因为下面源码可以看到都是返回true。
static class TryableSemaphoreNoOp implements TryableSemaphore {
public static final TryableSemaphore DEFAULT = new TryableSemaphoreNoOp();
@Override
public boolean tryAcquire() {
return true;
}
@Override
public void release() {
}
@Override
public int getNumberOfPermitsUsed() {
return 0;
}
}
HystrixObservableCommand.construct() or HystrixCommand.run()
AbstractCommand
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
// mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
executionResult = executionResult.setExecutionOccurred();
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
// the command timed out in the wrapping thread so we will return immediately
// and not increment any of the counters below or other such logic
return Observable.error(new RuntimeException("timed out before executing run()"));
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
//we have not been unsubscribed, so should proceed
HystrixCounters.incrementGlobalConcurrentThreads();
threadPool.markThreadExecution();
// store the command that is being run
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
executionResult = executionResult.setExecutedInThread();
/**
* If any of these hooks throw an exception, then it appears as if the actual execution threw an error
*/
try {
executionHook.onThreadStart(_cmd);
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
} else {
//command has already been unsubscribed, so return immediately
return Observable.empty();
}
}
}).doOnTerminate(new Action0() {
@Override
public void call() {
if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
handleThreadEnd(_cmd);
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
//if it was never started and received terminal, then no need to clean up (I don't think this is possible)
}
//if it was unsubscribed, then other cleanup handled it
}
}).doOnUnsubscribe(new Action0() {
@Override
public void call() {
if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
handleThreadEnd(_cmd);
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
//if it was never started and was cancelled, then no need to clean up
}
//if it was terminal, then other cleanup handled it
}
}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
} else {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
executionResult = executionResult.setExecutionOccurred();
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
// semaphore isolated
// store the command that is being run
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
try {
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd); //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw
} catch (Throwable ex) {
//If the above hooks throw, then use that as the result of the run method
return Observable.error(ex);
}
}
});
}
}
上述代码可以看到首先判断是不是ExecutionIsolationStrategy.THREAD模式,如果不是,说明是Semaphore模式,就走到最后面的逻辑,直接调用getUserExecutionObservable;如果是THREAD模式,可以看到最终会对返回的Observable.subscribeOn(Func0
继续来看getUserExecutionObservable方法
private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
Observable<R> userObservable;
try {
userObservable = getExecutionObservable();
} catch (Throwable ex) {
// the run() method is a user provided implementation so can throw instead of using Observable.onError
// so we catch it here and turn it into Observable.error
userObservable = Observable.error(ex);
}
return userObservable
.lift(new ExecutionHookApplication(_cmd))
.lift(new DeprecatedOnRunHookApplication(_cmd));
}
这个方法会调用到getExecutionObservable()方法,这里会根据你是实现的HystrixCommand还是HystrixObservableCommand调用对应的方法。
HystrixCommand
@Override
final protected Observable<R> getExecutionObservable() {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
try {
return Observable.just(run());
} catch (Throwable ex) {
return Observable.error(ex);
}
}
}).doOnSubscribe(new Action0() {
@Override
public void call() {
// Save thread on which we get subscribed so that we can interrupt it later if needed
executionThread.set(Thread.currentThread());
}
});
}
并且这里通过doOnSubscribe方法设置了回调保存了当前执行订阅的线程以便后续需要的时候可以interrupt。
HystrixObservableCommand
@Override
final protected Observable<R> getExecutionObservable() {
return construct();
}
可以看到通过调用getExecutionObservable最终都会调用到你实现的run方法或者construct方法
Calculate Circuit Health
从整个Hystrix flow chart看到步骤5和6的执行结果不管成功与否都会上报Metrics,然后断路器通过这些上报的数据来计算当前的状态,上面Is the Circuit Open?里面已经说过。
Get the Fallback
从整个Hystrix flow chart看到步骤4断路器状态OPEN,步骤5Semphore或者Threadpool reject,步骤6执行失败或者超时都会执行fallback方法,这里就不详细看源码了。
Return the Successful Response
如果整个流程执行成功,那么就会返回调用者一个Observable,你可以通过同步方式获取结果也可以subscribe这个Observable通过异步方式获取最终结果,整个获取Observable的流程如下: