

Hystrix full flow chart


public class GetUserAccountCommand extends HystrixCommand<UserAccount> {

    private final HttpCookie httpCookie;
    private final UserCookie userCookie;

    public GetUserAccountCommand(HttpCookie cookie) {
        this.httpCookie = cookie;
        /* parse or throw an IllegalArgumentException */
        this.userCookie = UserCookie.parseCookie(httpCookie);

    protected UserAccount run() {
        return new UserAccount(86975, "John James", 2, true, false, true);

    protected String getCacheKey() {
        return httpCookie.getValue();

    protected UserAccount getFallback() {
        return new UserAccount(userCookie.userId, userCookie.name, userCookie.accountType, true, true, true);


Execute the Command


  • execute():阻塞方法,返回一个单个的response(或者异常)
  • queue():非阻塞方法,返回一个与单个response关联的future
  • observe():返回一个Observable,非延时方法,不管有没有订阅者,都会立即执行命令(不用担心后订阅的订阅者接收不到事件,因为会把事件放到一个RelaySubject里面)
  • toObservable():同样是返回一个Observable,延时方法。只有订阅了这个Observable,才会执行命令。
UserAccount user = new GetUserAccountCommand(new HttpCookie("mockKey", "mockValueFromHttpRequest")).execute();

public R execute() {
        try {
            return queue().get();
        } catch (Exception e) {
            throw Exceptions.sneakyThrow(decomposeException(e));
public Future<R> queue() {
         * The Future returned by Observable.toBlocking().toFuture() does not implement the
         * interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
         * thus, to comply with the contract of Future, we must wrap around it.
        final Future<R> delegate = toObservable().toBlocking().toFuture();
        final Future<R> f = new Future<R>() {
            public R get() throws InterruptedException, ExecutionException {
                return delegate.get();

            public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                return delegate.get(timeout, unit);

        return f;

上述代码,我们通过GetUserAccountCommand.execute来执行命令,debug源码可以看到,execute命令其实也是调用了上面的queue方法返回一个future,然后通过future.get阻塞获取response。继续深入future方法,我们可以先不用管那个代理的future(主要用来实现中断的),final Future delegate = toObservable().toBlocking().toFuture();可以看到这里其实也是调用了上面提到的toObservable()方法返回一个Observable,然后通过toBlocking().toFuture()返回一个future。


Is the Response Cached


return Observable.defer(new Func0<Observable<R>>() {
            public Observable<R> call() {
                 /* this is a stateful object so can only be used once */
                if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
                    IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
                    //TODO make a new error type for this
                    throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);

                commandStartTimestamp = System.currentTimeMillis();

                if (properties.requestLogEnabled().get()) {
                    // log this command execution regardless of what happened
                    if (currentRequestLog != null) {

                final boolean requestCacheEnabled = isRequestCachingEnabled();
                final String cacheKey = getCacheKey();

                /* try from cache first */
                if (requestCacheEnabled) {
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                    if (fromCache != null) {
                        isResponseFromCache = true;
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);

                Observable<R> hystrixObservable =

                Observable<R> afterCache;

                // put in cache
                if (requestCacheEnabled && cacheKey != null) {
                    // wrap it for caching
                    HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
                    if (fromCache != null) {
                        // another thread beat us so we'll use the cached value instead
                        isResponseFromCache = true;
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    } else {
                        // we just created an ObservableCommand so we cast and return it
                        afterCache = toCache.toObservable();
                } else {
                    afterCache = hystrixObservable;

                return afterCache
                        .doOnTerminate(terminateCommandCleanup)     // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
                        .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once

上述代码中有个判断final boolean requestCacheEnabled = isRequestCachingEnabled();可以看到如果我们的Command实现了getCacheKey方法,并且requestCacheEnabled(这个属性默认是true,可以通过调用HystrixCommand的构造方法传入一个setter对象修改默认属性)这样就不会执行后续的run方法,就会直接返回一个缓存的Observable。(上一篇文章我们已经提到,必须是同一个request context里面的两个command才能用到缓存)

protected boolean isRequestCachingEnabled() {
        return properties.requestCacheEnabled().get() && getCacheKey() != null;

Is the Circuit Open?


Circuit Breaker



  • 如果通过当前断路器请求达到了阈值HystrixCommandProperties.circuitBreakerRequestVolumeThreshold()
  • 如果当前的错误率达到了阈值HystrixCommandProperties.circuitBreakerErrorThresholdPercentage()
  • 那么就会把断路器的状态从CLOSED变成OPEN
  • OPEN状态时候,就会断路次断路器上所有的请求(直接返回fallback方法)
  • 经过一段时间(HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds()默认5s)允许一个请求进来,此时断路器状态变为HALF-OPEN,如果这个请求还是失败,那么状态就还是OPEN ,继续等待一个时间;如果此次请求成功,就把状态变更为CLOSED,然后继续循环1的过程。



private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
       // mark that we're starting execution on the ExecutionHook
       // if this hook throws an exception, then a fast-fail occurs with no fallback.  No state is left inconsistent

       /* determine if we're allowed to execute */
       if (circuitBreaker.attemptExecution()) {
           final TryableSemaphore executionSemaphore = getExecutionSemaphore();
           final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
           final Action0 singleSemaphoreRelease = new Action0() {
               public void call() {
                   if (semaphoreHasBeenReleased.compareAndSet(false, true)) {

           final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
               public void call(Throwable t) {
                   eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);

           if (executionSemaphore.tryAcquire()) {
               try {
                   /* used to track userThreadExecutionTime */
                   executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                   return executeCommandAndObserve(_cmd)
               } catch (RuntimeException e) {
                   return Observable.error(e);
           } else {
               return handleSemaphoreRejectionViaFallback();
       } else {
           return handleShortCircuitViaFallback();

commad在request cache逻辑之后,run方法之前都会运行circuitBreaker.attemptExecution(),官方注释说明这个方法不是一个幂等方法,会改变内部的状态。

    * Invoked at start of command execution to attempt an execution.  This is non-idempotent - it may modify internal
    * state.
   boolean attemptExecution();


public boolean attemptExecution() {
    if (properties.circuitBreakerForceOpen().get()) {
        return false;
    if (properties.circuitBreakerForceClosed().get()) {
        return true;
    if (circuitOpened.get() == -1) {
        return true;
    } else {
        if (isAfterSleepWindow()) {
            //only the first request after sleep window should execute
            //if the executing command succeeds, the status will transition to CLOSED
            //if the executing command fails, the status will transition to OPEN
            //if the executing command gets unsubscribed, the status will transition to OPEN
            if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
                return true;
            } else {
                return false;
        } else {
            return false;

private boolean isAfterSleepWindow() {
    final long circuitOpenTime = circuitOpened.get();
    final long currentTime = System.currentTimeMillis();
    final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
    return currentTime > circuitOpenTime + sleepWindowTime;

public void markSuccess() {
    if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
        //This thread wins the race to close the circuit - it resets the stream to start it over from 0
        Subscription previousSubscription = activeSubscription.get();
        if (previousSubscription != null) {
        Subscription newSubscription = subscribeToStream();

public void markNonSuccess() {
    if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
        //This thread wins the race to re-open the circuit - it resets the start time for the sleep window


Is the Thread Pool/Queue/Semaphore Full?



protected TryableSemaphore getExecutionSemaphore() {
    if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.SEMAPHORE) {
        if (executionSemaphoreOverride == null) {
            TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name());
            if (_s == null) {
                // we didn't find one cache so setup
                executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests()));
                // assign whatever got set (this or another thread)
                return executionSemaphorePerCircuit.get(commandKey.name());
            } else {
                return _s;
        } else {
            return executionSemaphoreOverride;
    } else {
        // return NoOp implementation since we're not using SEMAPHORE isolation
        return TryableSemaphoreNoOp.DEFAULT;


static class TryableSemaphoreActual implements TryableSemaphore {
    protected final HystrixProperty<Integer> numberOfPermits;
    private final AtomicInteger count = new AtomicInteger(0);

    public TryableSemaphoreActual(HystrixProperty<Integer> numberOfPermits) {
        this.numberOfPermits = numberOfPermits;

    public boolean tryAcquire() {
        int currentCount = count.incrementAndGet();
        if (currentCount > numberOfPermits.get()) {
            return false;
        } else {
            return true;

    public void release() {

    public int getNumberOfPermitsUsed() {
        return count.get();



static class TryableSemaphoreNoOp implements TryableSemaphore {

    public static final TryableSemaphore DEFAULT = new TryableSemaphoreNoOp();

    public boolean tryAcquire() {
        return true;

    public void release() {


    public int getNumberOfPermitsUsed() {
        return 0;


HystrixObservableCommand.construct() or HystrixCommand.run()


private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
    if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
        // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
        return Observable.defer(new Func0<Observable<R>>() {
            public Observable<R> call() {
                executionResult = executionResult.setExecutionOccurred();
                if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                    return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));

                metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);

                if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
                    // the command timed out in the wrapping thread so we will return immediately
                    // and not increment any of the counters below or other such logic
                    return Observable.error(new RuntimeException("timed out before executing run()"));
                if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
                    //we have not been unsubscribed, so should proceed
                    // store the command that is being run
                    endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                    executionResult = executionResult.setExecutedInThread();
                     * If any of these hooks throw an exception, then it appears as if the actual execution threw an error
                    try {
                        return getUserExecutionObservable(_cmd);
                    } catch (Throwable ex) {
                        return Observable.error(ex);
                } else {
                    //command has already been unsubscribed, so return immediately
                    return Observable.empty();
        }).doOnTerminate(new Action0() {
            public void call() {
                if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
                if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
                    //if it was never started and received terminal, then no need to clean up (I don't think this is possible)
                //if it was unsubscribed, then other cleanup handled it
        }).doOnUnsubscribe(new Action0() {
            public void call() {
                if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
                if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
                    //if it was never started and was cancelled, then no need to clean up
                //if it was terminal, then other cleanup handled it
        }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
            public Boolean call() {
                return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
    } else {
        return Observable.defer(new Func0<Observable<R>>() {
            public Observable<R> call() {
                executionResult = executionResult.setExecutionOccurred();
                if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                    return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));

                metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
                // semaphore isolated
                // store the command that is being run
                endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                try {
                    return getUserExecutionObservable(_cmd);  //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw
                } catch (Throwable ex) {
                    //If the above hooks throw, then use that as the result of the run method
                    return Observable.error(ex);

上述代码可以看到首先判断是不是ExecutionIsolationStrategy.THREAD模式,如果不是,说明是Semaphore模式,就走到最后面的逻辑,直接调用getUserExecutionObservable;如果是THREAD模式,可以看到最终会对返回的Observable.subscribeOn(Func0 shouldInterruptThread),这也就是对于外部服务的隔离,让最终产生事件的方法(run或者construct方法)发生在一个单独的线程池里面。


private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
    Observable<R> userObservable;

    try {
        userObservable = getExecutionObservable();
    } catch (Throwable ex) {
        // the run() method is a user provided implementation so can throw instead of using Observable.onError
        // so we catch it here and turn it into Observable.error
        userObservable = Observable.error(ex);

    return userObservable
            .lift(new ExecutionHookApplication(_cmd))
            .lift(new DeprecatedOnRunHookApplication(_cmd));



final protected Observable<R> getExecutionObservable() {
    return Observable.defer(new Func0<Observable<R>>() {
        public Observable<R> call() {
            try {
                return Observable.just(run());
            } catch (Throwable ex) {
                return Observable.error(ex);
    }).doOnSubscribe(new Action0() {
        public void call() {
            // Save thread on which we get subscribed so that we can interrupt it later if needed



final protected Observable<R> getExecutionObservable() {
    return construct();


Calculate Circuit Health

从整个Hystrix flow chart看到步骤5和6的执行结果不管成功与否都会上报Metrics,然后断路器通过这些上报的数据来计算当前的状态,上面Is the Circuit Open?里面已经说过。

Get the Fallback

从整个Hystrix flow chart看到步骤4断路器状态OPEN,步骤5Semphore或者Threadpool reject,步骤6执行失败或者超时都会执行fallback方法,这里就不详细看源码了。

Return the Successful Response



