上一篇文章主要讲了Hystrix是什么,用来做什么,解决了什么问题,以及设计模式等,这篇文章主要来讲下Hystrix如何使用。下面我们从最简单的Hello World入手,来讲解下Hystrix的用法。
Hello World!
我们可以继承HystrixCommand(HystrixObservableCommand)来完成一个最简单的Hello World,分别说明。
HystrixCommand
HystrixCommand提供了同步和异步两种执行模式。
同步模式
public class CommandHelloWorld extends HystrixCommand<String> {
private final String name;
public CommandHelloWorld(String name) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.name = name;
}
@Override
protected String run() {
// a real example would do work like a network call here
return "Hello " + name + "!";
}
@Test
public void testSynchronous() {
assertEquals("Hello World!", new CommandHelloWorld("World").execute());
assertEquals("Hello Bob!", new CommandHelloWorld("Bob").execute());
}
}
异步模式
@Test
public void testAsynchronous1() throws Exception {
assertEquals("Hello World!", new CommandHelloWorld("World").queue().get());
assertEquals("Hello Bob!", new CommandHelloWorld("Bob").queue().get());
}
@Test
public void testAsynchronous2() throws Exception {
Future<String> fWorld = new CommandHelloWorld("World").queue();
Future<String> fBob = new CommandHelloWorld("Bob").queue();
assertEquals("Hello World!", fWorld.get());
assertEquals("Hello Bob!", fBob.get());
}
上述通过执行HystrixCommand.execute()实现了Synchronous Execution,通过HystrixCommand.queue()返回一个future实现Asynchronous Execution,下面两种写法是等价的。
String s1 = new CommandHelloWorld("World").execute();
String s2 = new CommandHelloWorld("World").queue().get();
HystrixObservableCommand
This command should be used for a purely non-blocking call pattern. The caller of this command will be subscribed to the Observable<R> returned by the run() method.
按照官方注释,这个命令应该只用在非阻塞模式下面
public class CommandHelloWorld extends HystrixObservableCommand<String> {
private final String name;
public CommandHelloWorld(String name) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.name = name;
}
@Override
protected Observable<String> construct() {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> observer) {
try {
if (!observer.isUnsubscribed()) {
// a real example would do work like a network call here
observer.onNext("Hello");
observer.onNext(name + "!");
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
} ).subscribeOn(Schedulers.io());
}
}
不同于HystrixCommand我们要实现run方法,继承HystrixObservableCommand需要实现construct方法,这个方法返回一个Observable,当我们调用observe()或者toObservable()方法,上述construct方法会被执行。(注意toObservable()是一个延迟命令,真正只有在上述Observable被subscribe时候,construct才会被执行)另外上述代码其实用到了rxjava,一种函数响应式编程模型,大量用在android客户端开发,事实上,Hystrix的源码大量用到了rxjava,有兴趣的可以看一下。这里我们要关注这个construct方法里面运行了两次onNext方法,相当于给订阅者发射了两次事件,如果上述只调用一次observer.onNext(“Hello”)方法,那么下面的操作是等价的。
HystrixCommand.execute()=HystrixObservableCommand.observe().toBlocking().toFuture().get()
HystrixCommand.queue()=HystrixObservableCommand.observe().toBlocking().toFuture()
如果有两次observer.onNext方法,则会抛出Sequence contains too many elements异常
两者区别
- 前者的命令逻辑写在run();后者的命令逻辑写在construct()
- 前者的run()是由新创建的线程执行;后者的construct()是由调用程序线程执行
- 前者一个实例只能向调用程序发送(emit)单条数据,比如上面例子中run()只能返回一个String结果;后者一个实例可以顺序发送多条数据,比如demo中顺序调用多个onNext(),便实现了向调用程序发送多条数据,甚至还能发送一个范围的数据集。
响应式(Reactive)
你可以通过observe()和toObservable()返回一个可以观察的Observable然后,一旦你subscribe订阅了这个Observable后,就会收到Observable的事件。
- observe():返回一个 “hot” Observable,会立即执行HystrixCommand的run方法(或者HystrixObservableCommand的construct方法)
- toObservable():返回一个“code” Observable,直到你subscribe订阅了这个Observable才会执行run()或者construct()
@Test
public void testObservable() throws Exception {
Observable<String> fWorld = new CommandHelloWorld("World").observe();
Observable<String> fBob = new CommandHelloWorld("Bob").observe();
// blocking
assertEquals("Hello World!", fWorld.toBlockingObservable().single());
assertEquals("Hello Bob!", fBob.toBlockingObservable().single());
// non-blocking
// - this is a verbose anonymous inner-class approach and doesn't do assertions
fWorld.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
// nothing needed here
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(String v) {
System.out.println("onNext: " + v);
}
});
// non-blocking
// - also verbose anonymous inner-class
// - ignore errors and onCompleted signal
fBob.subscribe(new Action1<String>() {
@Override
public void call(String v) {
System.out.println("onNext: " + v);
}
});
}
如果是java8使用lambda表达式,会更简洁(作为一个写代码能少就少的码农,这里吐槽下国内的项目一般都用的是java6和java7,感觉lambda写起来很爽很屌)
fWorld.subscribe((v) -> {
System.out.println("onNext: " + v);
})
// - or while also including error handling
fWorld.subscribe((v) -> {
System.out.println("onNext: " + v);
}, (exception) -> {
exception.printStackTrace();
})
Command Name/Group/Thread-pool
Command Name
默认情况CommandName=getClass().getSimpleName(),当然你可以通过HystrixCommand或者HystrixObservableCommand的构造方法显示的指定CommandName
public CommandHelloWorld(String name) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld")));
this.name = name;
}
我们在创建自己的HystrixCommand(HystrixObservableCommand),需要在构造方法中调用父类的构造方法,其中可以传入一个Setter对象,这其实就是Hystrix给我提供的一个builder模式构造对象,里面可以设置很多需要的属性(HystrixCommandProperties也可以在这里面设置,当然你不设置Hystrix对与这些属性也会有默认设置)。这里我们通过HystrixCommandKey.Factory.asKey(“HelloWorld”))方法设置commandName=’HelloWorld’
Command Group
和上面的Command Name一样,我们可以通过下面方法设置Goup Name
HystrixCommandGroupKey.Factory.asKey(“ExampleGroup”)
Hystrix使用了group key把一些命令放在同一个组里面(比如我们系统依赖了外部的Service A和Service B,我们可以创建两个Command Group A、Command Group B,其中A服务又有m1,m2方法,那么Command Group A又包括了Command Name m1 和Command Name m2),然后以组为维度上传监控数据,提供监控大盘,报警。
另外,默认情况下,同一个Command Group下的Command使用相同的Thread-pool,除非某些Command单独指定了线程池。
Command Thread-pool
Thread-pool key代表了用来执行用户业务逻辑的一个线程池HystrixThreadPool(注意,hystrix为了failfast,实现的线程池没有用队列缓存,如果没有可用线程,直接抛出异常)。如果你创建的HystrixCommand没有显示的指定thread-pool key,那么默认情况下就会使用HystrixCommandGroupKey。
public CommandHelloWorld(String name) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("HelloWorldPool")));
this.name = name;
}
你可以下使用HystrixThreadPoolKey.Factory.asKey(“HelloWorldPool”)来指定某个command特有的线程池,我们之所以不用一个新的Command Group而使用
HystrixThreadPoolKey是有原因的:我们有若干个Commands属于一个group,但是某些情况下,有部分command发生异常需要被隔离例如:
two commands used to access User metadata
group name is “userGroup”
command A fetch userName
command B fetch userAge
command A依赖的服务挂了不应该影响commadn B的服务。
逻辑上Command A和B都依赖于外部的user service所以让他们属于同一个组userGroup,但是为了避免Command A挂了影响Command B,我们可以给A指定一个单独的HystrixThreadPoolKey。
Fallback降级
如果你的系统依赖的外部服务出现故障,Hystrix给我们提供了优雅的降级方案,你只需要实现getFallBack方法,这样当调用失败,超时,reject或者short-circuits时候会回调你的fallback方法可以返回一个默认值给调用方。当然以下情况可能fallback方法可能没啥作用,可以注意下:
- 如果你是调用一个写命令,这种情况多数都是返回void,这种情况一般是希望报错尽快能通知到调用者(比如抛异常,默认不实现抛出UnsupportedOperationException),这个时候实现这个fallback方法可能意义不大。
- 批量作业或者离线计算,这个时候也一般希望错误能尽快传给调用者
public class CommandHelloFailure extends HystrixCommand<String> {
private final String name;
public CommandHelloFailure(String name) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.name = name;
}
@Override
protected String run() {
throw new RuntimeException("this command always fails");
}
@Override
protected String getFallback() {
return "Hello Failure " + name + "!";
}
@Test
public void testSynchronous() {
assertEquals("Hello Failure World!", new CommandHelloFailure("World").execute());
assertEquals("Hello Failure Bob!", new CommandHelloFailure("Bob").execute());
}
}
另外不是所有的异常都会回调fallback方法,下面有个表格
Failure Type | Exception class | Exception.cause | subject to fallback |
---|---|---|---|
FAILURE | HystrixRuntimeException | underlying exception (user-controlled) | YES |
TIMEOUT | HystrixRuntimeException | j.u.c.TimeoutException | YES |
SHORT_CIRCUITED | HystrixRuntimeException | j.l.RuntimeException | YES |
THREAD_POOL_REJECTED | HystrixRuntimeException | j.u.c.RejectedExecutionException | YES |
SEMAPHORE_REJECTED | HystrixRuntimeException | j.l.RuntimeException | YES |
BAD_REQUEST | HystrixRuntimeException | underlying exception (user-controlled) | NO |
HystrixBadRequestException主要用来处理非法参数和一些非系统错误问题,所以不会触发fallback方法
HystrixObservableCommand就需要实现resumeWithFallback方法,如果你用HystrixObservableCommand emit发射了多个事件,那么你可能更希望知道是哪个事件发生了错误,下面有个demo可以参考一下。
@Override
protected Observable<Integer> construct() {
return Observable.just(1, 2, 3)
.concatWith(Observable.<Integer> error(new RuntimeException("forced error")))
.doOnNext(new Action1<Integer>() {
@Override
public void call(Integer t1) {
lastSeen = t1;
}
})
.subscribeOn(Schedulers.computation());
}
@Override
protected Observable<Integer> resumeWithFallback() {
if (lastSeen < 4) {
return Observable.range(lastSeen + 1, 4 - lastSeen);
} else {
return Observable.empty();
}
}
上述也是用到了rxjava,这里简单提一下,不然可能很多人看不懂。
- Observable.just(1, 2, 3):这个Observable会依次发送1,2,3三个事件
- Observable.concatWith(Exception):这个相当于在(1,2,3)后面又发送了一个Exception事件
- Observable.doOnNext:每次在调用订阅者Observer的消费(onNext)方法之前都会调用这个doOnNext方法
- Observable.subscribeOn:指定产生订阅事件发生在Schedulers.computation()线程池里面
- Observable.range(start,count):发射从start开始依次递增加一 count个事件
知道了上述意思后,我们就不难搞懂上述代码是每次都把执行的事件通过一个中间变量记录下来,这样就知道Observable发射的多个事件到底是哪个报错。
Request Cache
可以实现getCacheKey()方法来达到缓存请求的目的,这样在同一个上下文档中,如果有相同的请求,就可以使用缓存中的值。注意这里我强调了同一个上下文,只有同一个上下文当中缓存才有效
public class CommandUsingRequestCache extends HystrixCommand<Boolean> {
private final int value;
protected CommandUsingRequestCache(int value) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.value = value;
}
@Override
protected Boolean run() {
return value == 0 || value % 2 == 0;
}
@Override
protected String getCacheKey() {
return String.valueOf(value);
}
}
@Test
public void testWithCacheHits() {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
CommandUsingRequestCache command2a = new CommandUsingRequestCache(2);
CommandUsingRequestCache command2b = new CommandUsingRequestCache(2);
assertTrue(command2a.execute());
// 第一次执行cacheKey=2,缓存还没有,所以false
assertFalse(command2a.isResponseFromCache());
assertTrue(command2b.execute());
// 第二次执行cacheKey=2,缓存已经有了,所以true
assertTrue(command2b.isResponseFromCache());
} finally {
context.shutdown();
}
// start a new request context
context = HystrixRequestContext.initializeContext();
try {
CommandUsingRequestCache command3b = new CommandUsingRequestCache(2);
assertTrue(command3b.execute());
// 因为是开启了一个新的context,所以尽管cacheKey=2,但是还是没有用到缓存
assertFalse(command3b.isResponseFromCache());
} finally {
context.shutdown();
}
}
Request Context Setup
如果你想用到上述的reqeust cache(还有Hystrix其他的request collapsing, request log)功能,我们可以看到你需要创建一个context,如果每次都在代码里面new 一个太繁琐了。现在我们一般应用都是部署在一个web环境下,我们可以使用Servlet Filter来实现这个上下文
public class HystrixRequestContextServletFilter implements Filter {
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
chain.doFilter(request, response);
} finally {
context.shutdown();
}
}
}
<filter>
<display-name>HystrixRequestContextServletFilter</display-name>
<filter-name>HystrixRequestContextServletFilter</filter-name>
<filter-class>com.netflix.hystrix.contrib.requestservlet.HystrixRequestContextServletFilter</filter-class>
</filter>
<filter-mapping>
<filter-name>HystrixRequestContextServletFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>