官网 https://github.com/Netflix/Hystrix 源码
https://github.com/Netflix/Hystrix/wiki/ wiki
Hystrix 是什么 在分布式环境下,难免出现许多服务之间调用失败的场景, Hystrix 是一个通过添加延迟容忍和容错逻辑帮助您控制这些分布式服务之间交互的库。 Hystrix 通过隔离服务之间的访问点来实现应用之间连锁故障(因下游服务失败,导致上游服务不可用),出现故障提供备选方案来提高应用整体的可用性。
举例:抢购活动中,由于库存服务超时或者崩掉,可以通过提示抢购失败,请稍后重试等预防抢购应用不可用
Hystrix 用来做什么 Hystrix 可以做一些事情:
通过第三方客户端库,为控制延迟和依赖之间调用失败提供保护。
在复杂的分布式系统中阻止连锁故障
快速故障恢复
在可能的情况下,做到服务隔离和优雅降级
尽可能的实时监控、报警、运维控制
Hystrix 解决什么问题 复杂的分布式应用有许多依赖调用,在某一时刻它们之间必然会出现失败,这就是导致应用整体故障的风险。
例如,对于一个依赖于 30 个服务的应用程序,每个服务的正常运行时间为 99.99%,下面是您可以预期的结果。
99.99 的 30 次方 = 99.7% (意味着有 0.3% 的失败)
10 亿 * 0.3% = 300 万(10 亿次请求会有 300 万次请求失败)
即使每个月在很好的正常运行的时间,也会有 2 个小时的停机。通常情况会被这个更糟糕。
当所有请求都很正常的情况下,请求流就会像下图一样:
当很多后端系统有一个不稳定时,就会阻塞整个请求,如下图:
在大量请求下,一个后端不理想的依赖都会成为阻塞整个应用请求的潜在风险
应用中任何一个通过网络或者客户端发出的请求都有可能成为网络请求失败的潜在风险,比请求失败更糟糕的是,应用调用依赖之间的延迟增加,更甚应用中的队列、线程池、其他系统资源造成的连锁故障
当通过第三方客户端进行网络访问,就是一个不清楚实现细节以及随时可能发生改变的黑盒执行,对于每个客户端,网络和资源配置都是不一样的,所以很难监控和更改。
更糟糕的是,依赖调用之间,在不被应用显示调用的情况下,执行昂贵和容易出错的网络调用
网络连接失败或者降级、服务与服务之间调用失败或者延迟,新库或服务部署改变方式或者性能特性
所有这些失败和延迟的情况都需要被隔离或者管理,这样就不会因为单个依赖的失败摧毁整个应用和系统。
Hystrix 设计原则
防止单个应用耗尽服务器(比如 tomcat)所有可用线程
丢弃重负载、快速失败代替排队
为任何情况失败提供保护
使用隔离技术 (如隔板、泳道和断路器模式) 来限制任何一个依赖的影响
通过接近实时的度量、监视和警报来优化快速发现故障
Hystrix 在大部分方面依靠低延迟配置和动态配置优化恢复时间, 允许使用快速返回结果进行实时操作调整
保护整个依赖客户端执行中的失败,不仅仅是 网络流量中
Hystrix 如何实现目标
在 HystrixCommand 或 HystrixObservableCommand 对象中封装所有对外部系统 (或 “依赖项”) 的调用,该对象通常在单独的线程中执行 (这是命令模式的示例)
Hystrix 有一个默认值,如果调用时长超过这个默认值,认定为超时调用,大部分依赖可以通过配置自定义这个值,因此他们略高于依赖项的 99.5% 的性能
为每个依赖维护一个小的线程池 (或信号量); 如果它已满,将立即拒绝为该依赖指定的请求,而不是排队。
衡量成功、失败 (客户抛出的异常)、超时和线程拒绝
触发断路器,在一段时间内停止所有对某个服务的请求,如果服务的错误百分比超过阈值,则手动或自动停止
当一个请求失败、被拒绝、超时或短路时,执行备选逻辑
近实时的监控度量和配置的变化
当您使用 Hystrix 来包装每个基础依赖项时,图中所示的架构会发生变化,如下图所示。每个依赖项都是相互隔离的,当延迟发生时,它可能会被限制在资源中,并且在熔断器中覆盖,当任何类型的故障发生在依赖项中时,它决定了要做出什么响应
hello world 查看源代码
1 2 3 4 5 <dependency > <groupId > com.netflix.hystrix</groupId > <artifactId > hystrix-core</artifactId > <version > 1.5.18</version > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 public class CommandHelloWorld01 extends HystrixCommand <String> { private final String name; public CommandHelloWorld01 (String name) { super (HystrixCommandGroupKey.Factory.asKey("ExampleGroup" )); this .name = name; } @Override protected String run () { return "Hello " + name + "!" ; } public static class UnitTest { @Test public void testSynchronous () { assertEquals("Hello World!" , new CommandHelloWorld01 ("World" ).execute()); assertEquals("Hello Bob!" , new CommandHelloWorld01 ("Bob" ).execute()); } @Test public void testAsynchronous1 () throws Exception { assertEquals("Hello World!" , new CommandHelloWorld01 ("World" ).queue().get()); assertEquals("Hello Bob!" , new CommandHelloWorld01 ("Bob" ).queue().get()); } @Test public void testAsynchronous2 () throws Exception { Future<String> fWorld = new CommandHelloWorld01 ("World" ).queue(); Future<String> fBob = new CommandHelloWorld01 ("Bob" ).queue(); assertEquals("Hello World!" , fWorld.get()); assertEquals("Hello Bob!" , fBob.get()); } @Test public void testObservable () throws Exception { Observable<String> fWorld = new CommandHelloWorld01 ("World" ).observe(); Observable<String> fBob = new CommandHelloWorld01 ("Bob" ).observe(); assertEquals("Hello World!" , fWorld.toBlocking().single()); assertEquals("Hello Bob!" , fBob.toBlocking().single()); fWorld.subscribe(new Observer <String>() { @Override public void onCompleted () { } @Override public void onError (Throwable e) { e.printStackTrace(); } @Override public void onNext (String v) { System.out.println("onNext: " + v); } }); fBob.subscribe(new Action1 <String>() { @Override public void call (String v) { System.out.println("onNext: " + v); } }); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 public class CommandHelloWorld extends HystrixObservableCommand <String> { private final String name; public CommandHelloWorld (String name) { super (HystrixCommandGroupKey.Factory.asKey("ExampleGroup" )); this .name = name; } @Override protected Observable<String> construct () { return Observable.create(new Observable .OnSubscribe<String>() { @Override public void call (Subscriber<? super String> observer) { try { if (!observer.isUnsubscribed()) { observer.onNext(name + "!" ); observer.onCompleted(); } } catch (Exception e) { observer.onError(e); } } } ).subscribeOn(Schedulers.io()); } public static void main (String[] args) throws ExecutionException, InterruptedException { CommandHelloWorld observableCommand = new CommandHelloWorld ("World" ); Observable<String> observe = observableCommand.observe(); System.out.println(observe.toBlocking().toFuture().get()); observe.subscribe(new Observer <String>() { @Override public void onCompleted () { } @Override public void onError (Throwable e) { e.printStackTrace(); } @Override public void onNext (String v) { System.out.println("onNext: " + v); } }); observe.subscribe(new Action1 <String>() { @Override public void call (String v) { System.out.println("onNext: " + v); } }); } }
同步执行 通过 execute()方法同步调用 HystrixCommand的实现
1 String s = new CommandHelloWorld01 ("World" ).execute();
HystrixObservableCommand没有execute方法调用,如果清楚通过一个命令产生的 Observable必定仅仅产生一个单一的值,则可以对 Observable应用 RXjava 的操作 .toBlocking().toFuture().get()模拟 execute方法。
1 2 3 CommandHelloWorld observableCommand = new CommandHelloWorld ("World" ); Observable<String> observe = observableCommand.observe(); System.out.println(observe.toBlocking().toFuture().get());
异步执行 我们可以通过使用 queue() 方法异步执行 HystrixCommand ,示例如下:
1 Future<String> fWorld = new CommandHelloWorld01 ("World" ).queue();
我们可以通过 Future 获取到命令的结果集
HystrixObservableCommand没有简单的queuefor a等价物HystrixObservableCommand,但是如果您知道Observable这样的命令生成的 必须始终只生成一个值,您可以queue通过将 RxJava 运算符.toBlocking().toFuture()应用于Observable.
响应式执行 可以通过一下任意方法监听 HystrixCommand 的结果:
observe() : 执行这个命令会返回一个热 Observable 立刻执行 hystrix 的命令 ,因为这个 Observable 通过 ReplaySubject 过滤,咱们不会有丢失订阅之前的任何东西的危险。
toObservable(): 执行这个命令会返回一个 “冷 “ Observable,直到订阅 Observable 才会开始执行命令和发送结果 。
1 Observable<String> fWorld = new CommandHelloWorld01 ("World" ).observe();
执行完上面的代码,我们可以通过订阅 Observable 获取到它的值
1 2 3 4 5 6 7 8 fWorld.subscribe(new Action1 <String>() { @Override public void call (String v) { System.out.println("onNext: " + v); } });
Fallback (降级) 查看源代码
我们可以通过增加一个 fallback (回退) 方法在 hystrix 命令实现优雅降级,如果主命令失败,hystrix 可以获取一个默认值或者值集合。我们可能想为更多的可能失败的 hrstrix 命令实现一个回退方法,但是会有以下几种例外:
如果设计的 hrstrix 命令是执行一个写操作而不是返回一个结果(这个写操作在 HystrixCommand通常返回 void,在 HystrixObservableCommand 返回一个空的可观察者),此场景下执行回退方法没有什么意义,如果写操作失败,服务方应该是希望告知调用方,让调用方再进一步处理。
如果 Hystrix 命令正在填充一个缓存,或者生成一个报告,或者做任何离线计算。如果发生错误,通常应该将错误告知调用方,让调用方做进一步处理,而不应该发送一个默认的响应。
如果命令执行异常,无论 Hystrix 命令是否有回退方法,hystrix 命令状态、断路器 / 度量器都会更新为此条命令失败。
HystrixCommand 在一个普通的HystrixCommand 中通过重写 getFallback() 方法实现一个回退方法,比如 run() 方法有异常, hystrix 会为所有类型的异常执行回退方法,比如 超时、线程池或信号量拒绝,以及断路器短路。 包含回退方法的示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class HystrixHelloWorldFallback extends HystrixCommand <String> { private final String name; public HystrixHelloWorldFallback (String name) { super (HystrixCommandGroupKey.Factory.asKey("ExampleGroup" )); this .name = name; } @Override protected String run () { throw new RuntimeException ("模拟异常!" ); } @Override protected String getFallback () { return "Hello Failure " + name + "!" ; } public static void main (String[] args) { HystrixHelloWorldFallback hystrixHelloWorldFallback = new HystrixHelloWorldFallback ("word" ); System.out.println(hystrixHelloWorldFallback.execute()); } }
以上代码中 run()方法永远会出现异常,调用者永远接收到的是 getFallback()方法返回的值,不会接收到异常
HystrixObservableCommand 对于 HystrixObservableCommand,需要通过重写 resumeWithFallback 方法,如果执行失败了,它就会从主观察者接管失败返回第二可观察者。这里需要注意的是,已经发送的一个或多个数据项之后,可观察者可能会失败,所以回退方法不应该假设它会发送观察者看到的唯一值。
在内部,Hystrix 使用 RxJava 的 onerrorerrorenext 操作符,在发生错误时,在主观察者和回退之间无缝切换。
错误传播 HystrixCommand 从 run()方法抛出的除 HystrixBadRequestException异常外的所有异常,会统计异常次数、触发回退方法和短路逻辑。
可以将想抛出的异常包装到 HystrixBadRequestException通过 getCause()方法查询它 ,HystrixBadRequestException适用于报告非法参数或非系统故障的场景,这些错误不应该与失败的指标相违背,也不应该触发回退逻辑。
HystrixObservableCommand 在 a 的情况下HystrixObservableCommand,不可恢复的错误会通过onError来自结果的通知返回Observable,并且回退是通过回退到 Hystrix 通过resumeWithFallback您实现的方法获得的第二个 Observable 来完成的。
执行异常类型列表
失败类型
异常类
异常原因
是否执行回退
失败
HystrixRuntimeException
潜在异常(用户控制)
执行
超时
HystrixRuntimeException
j.u.c.TimeoutException
执行
短路
HystrixRuntimeException
j.l.RuntimeException
执行
线程池异常
HystrixRuntimeException
j.u.c.RejectedExecutionException
执行
信号量拒绝
HystrixRuntimeException
j.l.RuntimeException
执行
失败请求
HystrixBadRequestException
underlying exception (user-controlled)
不执行
实例 HystrixCommand
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class HystrixHelloWorldFallback extends HystrixCommand <String> { private final String name; public HystrixHelloWorldFallback (String name) { super (HystrixCommandGroupKey.Factory.asKey("ExampleGroup" )); this .name = name; } @Override protected String run () { if ("hello" .equals(name)) { throw new RuntimeException ("模拟异常!" ); } else { throw new HystrixBadRequestException ("忽略异常!" ); } } @Override protected String getFallback () { return "Hello Failure " + name + "!" ; } public static void main (String[] args) { HystrixHelloWorldFallback hystrixHelloWorldFallback = new HystrixHelloWorldFallback ("hello" ); System.out.println(hystrixHelloWorldFallback.execute()); hystrixHelloWorldFallback = new HystrixHelloWorldFallback ("world" ); System.out.println(hystrixHelloWorldFallback.execute()); } }
发现hello 走了降级方法,world没有。
命令名称 默认情况下,命令名称来源于类名。
1 getClass().getSimpleName();
如果要显示定义名称的话,可以通过 HystrixCommand 或者 HystrixObservableCommand 的构造函数:
1 2 3 4 5 public HystrixHelloWorldCommand (String name) { super (Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("Command Group: Hello World" )) .andCommandKey(HystrixCommandKey.Factory.asKey("Command Name: Hello World" ))); this .name = name; }
为了给每个命令集合保存 Setter 配置,可以缓存 Setter,示例如下:
1 2 3 4 5 6 7 private static final Setter cachedSetter = Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup" )) .andCommandKey(HystrixCommandKey.Factory.asKey("Cache Setter: Hello World" ));public HystrixHelloWorldCommand (String name) { super (cachedSetter); this .name = name; }
HystrixCommandKey 是一个接口,可以作为枚举或者普通类实现。其实它有一个辅助工厂类,示例如下
1 HystrixCommandKey.Factory.asKey("Command Name: Hello World" )
HystrixCommandKey源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public interface HystrixCommandKey extends HystrixKey { public static class Factory {private static final InternMap<String, HystrixCommandKey.Factory.HystrixCommandKeyDefault> intern = new InternMap (new ValueConstructor <String, HystrixCommandKey.Factory.HystrixCommandKeyDefault>() { public HystrixCommandKey.Factory.HystrixCommandKeyDefault create (String key) { return new HystrixCommandKey .Factory.HystrixCommandKeyDefault(key); } }); private Factory () { } public static HystrixCommandKey asKey (String name) { return (HystrixCommandKey)intern.interned(name); } static int getCommandCount () { return intern.size(); } private static class HystrixCommandKeyDefault extends HystrixKeyDefault implements HystrixCommandKey { public HystrixCommandKeyDefault (String name) { super (name); } } } }
命令分组 Hystrix 使用命令分组将一起的命令进行管理,比如报告、警报、仪表盘或组 / 库。默认情况下,Hystrix 使用 HystrixCommandGroupKey来定义命令线程池,除非单独定义线程池。
HystrixCommandGroupKey是一个接口,可以作为枚举或者普通类实现。其实它有一个辅助工厂类,示例如下:
1 HystrixCommandGroupKey.Factory.asKey("Command Group: Hello World" )
HystrixCommandGroupKey源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public interface HystrixCommandGroupKey extends HystrixKey { public static class Factory { private static final InternMap<String, HystrixCommandGroupKey.Factory.HystrixCommandGroupDefault> intern = new InternMap (new ValueConstructor <String, HystrixCommandGroupKey.Factory.HystrixCommandGroupDefault>() { public HystrixCommandGroupKey.Factory.HystrixCommandGroupDefault create (String key) { return new HystrixCommandGroupKey .Factory.HystrixCommandGroupDefault(key); } }); private Factory () { } public static HystrixCommandGroupKey asKey (String name) { return (HystrixCommandGroupKey)intern.interned(name); } static int getGroupCount () { return intern.size(); } private static class HystrixCommandGroupDefault extends HystrixKeyDefault implements HystrixCommandGroupKey { public HystrixCommandGroupDefault (String name) { super (name); } } } }
命令线程池 线程池主要体现是用于监测、指标发布、缓存和其他此类用途的 HystrixThreadPool。 一个 HystrixCommand 与一个单独注入到它的 HystrixThreadPoolKey 所检索到的 HystrixThreadPool 相关联, 或者默认为使用 HystrixCommandGroupKey 的创建一个 。
如果要显示定义名称的话,可以通过 HystrixCommand或者 HystrixObservableCommand的构造函数:
1 2 3 4 5 6 public HystrixHelloWorldCommand (String name) { super (Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("Command Group: Hello World" )) .andCommandKey(HystrixCommandKey.Factory.asKey("Command Name: Hello World" )) .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("Command ThreadPool: Hello World" ))); this .name = name; }
HystrixThreadPoolKey 是一个接口,可以作为枚举或者普通类实现。其实它有一个辅助工厂类,示例如下:
1 HystrixThreadPoolKey.Factory.asKey("Command ThreadPool: Hello World" )
HystrixThreadPoolKey源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public interface HystrixThreadPoolKey extends HystrixKey { class Factory { private Factory () { } private static final InternMap<String, HystrixThreadPoolKey> intern = new InternMap <String, HystrixThreadPoolKey>( new InternMap .ValueConstructor<String, HystrixThreadPoolKey>() { @Override public HystrixThreadPoolKey create (String key) { return new HystrixThreadPoolKeyDefault (key); } }); public static HystrixThreadPoolKey asKey (String name) { return intern.interned(name); } private static class HystrixThreadPoolKeyDefault extends HystrixKeyDefault implements HystrixThreadPoolKey { public HystrixThreadPoolKeyDefault (String name) { super (name); } } static int getThreadPoolCount () { return intern.size(); } } }
可能使用 HystrixThreadPoolKey 而不仅仅是不同的 HystrixCommandGroupKey 的原因是多个命令可能属于相同的所有权或逻辑功能的 “组”,但是某些命令可能需要彼此隔离。
简单示例如下:
访问视频元数据的两个命令
两个命令拥有同一个组名称 “VideoMetadata”
命令 A 与资源 #1 互斥
命令 B 与资源 #2 互斥
如果命令 A 的线程池潜在并且饱和,它就不应该阻止命令 B 访问资源,因为他们互相命中不同的后端资源。因此,我们在逻辑上希望这些命令组合在一起,但希望它们以不同的方式隔离,并使用 HystrixThreadPoolKey
来给它们每个线程池提供一个不同的线程池。
请求缓存 查看源代码
您可以通过在or对象上实现getCacheKey()方法来启用请求缓存,如下所示:HystrixCommand、HystrixObservableCommand
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class CommandUsingRequestCache extends HystrixCommand <Boolean> { private final int value; protected CommandUsingRequestCache (int value) { super (HystrixCommandGroupKey.Factory.asKey("ExampleGroup" )); this .value = value; } @Override protected Boolean run () { return value == 0 || value % 2 == 0 ; } @Override protected String getCacheKey () { return String.valueOf(value); } }
由于这取决于请求上下文,我们必须初始化HystrixRequestContext
在一个简单的单元测试中,您可以这样做:
1 2 3 4 5 6 7 8 9 10 11 12 @Test public void testWithoutCacheHits () { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { assertTrue(new CommandUsingRequestCache (2 ).execute()); assertFalse(new CommandUsingRequestCache (1 ).execute()); assertTrue(new CommandUsingRequestCache (0 ).execute()); assertTrue(new CommandUsingRequestCache (58672 ).execute()); } finally { context.shutdown(); } }
ServletFilter通常,此上下文将通过包装用户请求或其他一些生命周期挂钩来初始化和关闭。
以下示例显示了命令如何在请求上下文中从缓存中检索其值(以及如何查询对象以了解其值是否来自缓存):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Test public void testWithCacheHits () { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { CommandUsingRequestCache command2a = new CommandUsingRequestCache (2 ); CommandUsingRequestCache command2b = new CommandUsingRequestCache (2 ); assertTrue(command2a.execute()); assertFalse(command2a.isResponseFromCache()); assertTrue(command2b.execute()); assertTrue(command2b.isResponseFromCache()); } finally { context.shutdown(); } context = HystrixRequestContext.initializeContext(); try { CommandUsingRequestCache command3b = new CommandUsingRequestCache (2 ); assertTrue(command3b.execute()); assertFalse(command3b.isResponseFromCache()); } finally { context.shutdown(); } }
请求折叠 查看源代码
请求折叠允许将多个请求批处理到单个HystrixCommand实例执行中。
折叠器可以使用批次大小和自批次创建以来经过的时间作为执行批次的触发器。
Hystrix 支持 2 种请求折叠样式:请求范围和全局范围。这是在折叠器构造时配置的,默认为请求范围。
一个请求范围的折叠器收集一个批次 per HystrixRequestContext,而一个全局范围的折叠器跨多个HystrixRequestContexts 收集一个批次。因此,如果您的下游依赖项无法HystrixRequestContext在单个命令调用中处理多个 s,则请求范围的折叠是正确的选择。
HystrixRequestContext在 Netflix,我们专门使用请求范围的折叠器,因为所有当前系统都是建立在每个命令中使用一个单一的假设之上的。由于批处理仅针对每个请求,因此当命令与同一请求中的不同参数并行发生时,折叠是有效的。
以下是如何实现请求范围的简单示例HystrixCollapser:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public class CommandCollapserGetValueForKey extends HystrixCollapser <List<String>, String, Integer> { private final Integer key; public CommandCollapserGetValueForKey (Integer key) { this .key = key; } @Override public Integer getRequestArgument () { return key; } @Override protected HystrixCommand<List<String>> createCommand (final Collection<CollapsedRequest<String, Integer>> requests) { return new BatchCommand (requests); } @Override protected void mapResponseToRequests (List<String> batchResponse, Collection<CollapsedRequest<String, Integer>> requests) { int count = 0 ; for (CollapsedRequest<String, Integer> request : requests) { request.setResponse(batchResponse.get(count++)); } } private static final class BatchCommand extends HystrixCommand <List<String>> { private final Collection<CollapsedRequest<String, Integer>> requests; private BatchCommand (Collection<CollapsedRequest<String, Integer>> requests) { super (Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup" )) .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueForKey" ))); this .requests = requests; } @Override protected List<String> run () { ArrayList<String> response = new ArrayList <String>(); for (CollapsedRequest<String, Integer> request : requests) { response.add("ValueForKey: " + request.getArgument()); } return response; } } }
以下单元测试展示了如何使用折叠器将四个执行自动批处理CommandCollapserGetValueForKey为单个HystrixCommand执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Test public void testCollapser () throws Exception { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { Future<String> f1 = new CommandCollapserGetValueForKey (1 ).queue(); Future<String> f2 = new CommandCollapserGetValueForKey (2 ).queue(); Future<String> f3 = new CommandCollapserGetValueForKey (3 ).queue(); Future<String> f4 = new CommandCollapserGetValueForKey (4 ).queue(); assertEquals("ValueForKey: 1" , f1.get()); assertEquals("ValueForKey: 2" , f2.get()); assertEquals("ValueForKey: 3" , f3.get()); assertEquals("ValueForKey: 4" , f4.get()); assertEquals(1 , HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); HystrixCommand<?> command = HystrixRequestLog.getCurrentRequest().getExecutedCommands().toArray(new HystrixCommand <?>[1 ])[0 ]; assertEquals("GetValueForKey" , command.getCommandKey().name()); assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED)); assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS)); } finally { context.shutdown(); } }
请求上下文设置 要使用请求范围的功能(请求缓存、请求折叠、请求日志),您必须管理HystrixRequestContext生命周期(或实施替代方案HystrixConcurrencyStrategy)。
这意味着您必须在请求之前执行以下操作:
1 HystrixRequestContext context = HystrixRequestContext.initializeContext();
然后在请求结束时:
在标准的 Java Web 应用程序中,您可以使用 Servlet 过滤器通过实现类似于以下的过滤器来初始化此生命周期:
1 2 3 4 5 6 7 8 9 10 11 12 public class HystrixRequestContextServletFilter implements Filter { public void doFilter (ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { chain.doFilter(request, response); } finally { context.shutdown(); } } }
您可以通过向以下内容添加一个部分来为所有传入流量启用过滤器web.xml:
1 2 3 4 5 6 7 8 9 <filter > <display-name > HystrixRequestContextServletFilter</display-name > <filter-name > HystrixRequestContextServletFilter</filter-name > <filter-class > com.netflix.hystrix.contrib.requestservlet.HystrixRequestContextServletFilter</filter-class > </filter > <filter-mapping > <filter-name > HystrixRequestContextServletFilter</filter-name > <url-pattern > /*</url-pattern > </filter-mapping >
常见模式 以下部分是 和 的常见用途和使用HystrixCommand
模式HystrixObservableCommand
。
快速失败 查看源代码
最基本的执行是只做一件事情并且没有回退行为。如果发生任何类型的故障,它将抛出异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class CommandThatFailsFast extends HystrixCommand <String> { private final boolean throwException; public CommandThatFailsFast (boolean throwException) { super (HystrixCommandGroupKey.Factory.asKey("ExampleGroup" )); this .throwException = throwException; } @Override protected String run () { if (throwException) { throw new RuntimeException ("failure from CommandThatFailsFast" ); } else { return "success" ; } } }
这些单元测试显示了它的行为方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Test public void testSuccess () { assertEquals("success" , new CommandThatFailsFast (false ).execute()); }@Test public void testFailure () { try { new CommandThatFailsFast (true ).execute(); fail("we should have thrown an exception" ); } catch (HystrixRuntimeException e) { assertEquals("failure from CommandThatFailsFast" , e.getCause().getMessage()); e.printStackTrace(); } }
HystrixObservableCommand相等的
a 的等效 Fail-Fast 解决方案HystrixObservableCommand将涉及覆盖该resumeWithFallback方法,如下所示:
1 2 3 4 5 6 7 8 @Override protected Observable<String> resumeWithFallback () { if (throwException) { return Observable.error(new Throwable ("failure from CommandThatFailsFast" )); } else { return Observable.just("success" ); } }
静默失败 查看源代码
静默失败相当于返回空响应或删除功能。可以通过返回null、空 Map、空 List 或其他此类响应来完成。
您可以通过在实例上实现一个getFallback()方法来做到这一点:HystrixCommand
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class CommandThatFailsSilently extends HystrixCommand <String> { private final boolean throwException; public CommandThatFailsSilently (boolean throwException) { super (HystrixCommandGroupKey.Factory.asKey("ExampleGroup" )); this .throwException = throwException; } @Override protected String run () { if (throwException) { throw new RuntimeException ("failure from CommandThatFailsFast" ); } else { return "success" ; } } @Override protected String getFallback () { return null ; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 @Test public void testSuccess () { assertEquals("success" , new CommandThatFailsSilently (false ).execute()); }@Test public void testFailure () { try { assertEquals(null , new CommandThatFailsSilently (true ).execute()); } catch (HystrixRuntimeException e) { fail("we should not get an exception as we fail silently with a fallback" ); } }
另一个返回空列表的实现如下所示:
1 2 3 4 @Override protected List<String> getFallback () { return Collections.emptyList(); }
HystrixObservableCommand相等的
a 的等效 Fail-Silently 解决方案HystrixObservableCommand将涉及覆盖该resumeWithFallback()方法,如下所示:
1 2 3 4 @Override protected Observable<String> resumeWithFallback () { return Observable.empty(); }
后备:静态 回退可以返回静态嵌入在代码中的默认值。这不会像“失败静默”通常那样导致功能或服务被删除,而是会导致默认行为发生。
例如,如果命令根据用户凭据返回真/假,但命令执行失败,它可以默认为真:
1 2 3 4 @Override protected Boolean getFallback () { return true ; }
后备:存根 查看源代码
当您的命令返回包含多个字段的复合对象时,您通常使用存根回退,其中一些字段可以从其他请求状态确定,而其他字段设置为默认值。
您可能会发现适合在这些存根值中使用的状态的示例如下:
饼干
请求参数和标头
在当前服务请求失败之前对先前服务请求的响应
您的后备可以从请求范围内静态检索存根值,但通常建议在命令实例化时注入它们以供需要时使用,例如以下示例以处理countryCodeFromGeoLookup字段的方式演示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 public class CommandWithStubbedFallback extends HystrixCommand <UserAccount> { private final int customerId; private final String countryCodeFromGeoLookup; protected CommandWithStubbedFallback (int customerId, String countryCodeFromGeoLookup) { super (HystrixCommandGroupKey.Factory.asKey("ExampleGroup" )); this .customerId = customerId; this .countryCodeFromGeoLookup = countryCodeFromGeoLookup; } @Override protected UserAccount run () { throw new RuntimeException ("forcing failure for example" ); } @Override protected UserAccount getFallback () { return new UserAccount (customerId, "Unknown Name" , countryCodeFromGeoLookup, true , true , false ); } public static class UserAccount { private final int customerId; private final String name; private final String countryCode; private final boolean isFeatureXPermitted; private final boolean isFeatureYPermitted; private final boolean isFeatureZPermitted; UserAccount(int customerId, String name, String countryCode, boolean isFeatureXPermitted, boolean isFeatureYPermitted, boolean isFeatureZPermitted) { this .customerId = customerId; this .name = name; this .countryCode = countryCode; this .isFeatureXPermitted = isFeatureXPermitted; this .isFeatureYPermitted = isFeatureYPermitted; this .isFeatureZPermitted = isFeatureZPermitted; } } }
以下单元测试演示了它的行为:
1 2 3 4 5 6 7 8 9 10 11 12 @Test public void test () { CommandWithStubbedFallback command = new CommandWithStubbedFallback (1234 , "ca" ); UserAccount account = command.execute(); assertTrue(command.isFailedExecution()); assertTrue(command.isResponseFromFallback()); assertEquals(1234 , account.customerId); assertEquals("ca" , account.countryCode); assertEquals(true , account.isFeatureXPermitted); assertEquals(true , account.isFeatureYPermitted); assertEquals(false , account.isFeatureZPermitted); }
HystrixObservableCommand相等的
a 的等效 Stubbed 解决方案HystrixObservableCommand将涉及重写resumeWithFallback方法以返回Observable发出存根响应的 an。与上一个示例等效的版本如下所示:
1 2 3 4 5 @Override protected Observable<Boolean> resumeWithFallback () { return Observable.just( new UserAccount (customerId, "Unknown Name" , countryCodeFromGeoLookup, true , true , false ) ); }
但是,如果您期望从您Observable的Observable. 这是一个简单的例子来展示你如何完成这个 - 它跟踪从 main 发出的最后一个 item,Observable以便 fallback 知道从哪里拾取以继续序列:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Override protected Observable<Integer> construct () { return Observable.just(1 , 2 , 3 ) .concatWith(Observable.<Integer> error(new RuntimeException ("forced error" ))) .doOnNext(new Action1 <Integer>() { @Override public void call (Integer t1) { lastSeen = t1; } }) .subscribeOn(Schedulers.computation()); }@Override protected Observable<Integer> resumeWithFallback () { if (lastSeen < 4 ) { return Observable.range(lastSeen + 1 , 4 - lastSeen); } else { return Observable.empty(); } }
回退:通过网络缓存 查看源代码
有时,如果后端服务失败,可以从缓存服务(如 memcached)中检索过时的数据版本。
由于回退将通过网络,这是另一个可能的故障点,因此它也需要用HystrixCommand or包裹HystrixObservableCommand。
在单独的线程池上执行回退命令很重要,否则如果主命令成为潜在命令并填充线程池,如果两个命令共享同一个池,这将阻止回退运行。
以下代码显示了如何在其方法中CommandWithFallbackViaNetwork执行。FallbackViaNetwork getFallback()
请注意,如果回退失败,它还有 一个回退,它执行“失败静默”的返回方法null。
要将命令配置为在与从 派生FallbackViaNetwork的默认线程池不同的线程池上运行,它会注入到构造函数中。RemoteServiceX HystrixCommandGroupKey HystrixThreadPoolKey.Factory.asKey(“RemoteServiceXFallback”)
这意味着CommandWithFallbackViaNetwork将在名为的线程池上运行,并将在名为RemoteServiceX的FallbackViaNetwork线程池上运行RemoteServiceXFallback。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public class CommandWithFallbackViaNetwork extends HystrixCommand <String> { private final int id; protected CommandWithFallbackViaNetwork (int id) { super (Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceX" )) .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueCommand" ))); this .id = id; } @Override protected String run () { throw new RuntimeException ("force failure for example" ); } @Override protected String getFallback () { return new FallbackViaNetwork (id).execute(); } private static class FallbackViaNetwork extends HystrixCommand <String> { private final int id; public FallbackViaNetwork (int id) { super (Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceX" )) .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueFallbackCommand" )) .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("RemoteServiceXFallback" ))); this .id = id; } @Override protected String run () { MemCacheClient.getValue(id); } @Override protected String getFallback () { return null ; } } }
主要 + 次要与后备 查看源代码
一些系统具有双模式行为——主要和次要,或主要和故障转移。
有时,辅助或故障转移被视为故障状态,仅用于回退;在这些场景中,它与上述“通过网络缓存”的模式相同。
但是,如果切换到辅助系统很常见,例如推出新代码的正常部分(有时这是有状态系统处理代码推送的一部分),那么每次使用辅助系统时,主系统都会处于故障状态,跳闸断路器和触发警报。
这不是期望的行为,如果没有其他原因,只是为了避免“狼来了”的疲劳,当真正的问题发生时,会导致警报被忽略。
因此,在这种情况下,策略是将主要和次要之间的切换视为正常、健康的模式,并在它们面前放置一个外观。
主要和次要HystrixCommand实现是线程隔离的,因为它们正在处理网络流量和业务逻辑。它们可能每个都具有非常不同的性能特征(通常辅助系统是静态缓存),因此每个单独命令的另一个好处是它们可以单独调整。
您不会公开这两个命令,而是将它们隐藏在另一个HystrixCommand信号量隔离的命令后面,该命令实现了关于是调用主要命令还是次要命令的条件逻辑。如果主要和次要都失败,则控制切换到外观命令本身的后备。
外观HystrixCommand可以使用信号量隔离,因为它所做的所有工作都经过另外两个HystrixCommand已经线程隔离的 s。只要run()外观的方法不执行任何其他网络调用、重试逻辑或其他“容易出错”的事情,就没有必要再有另一层线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 public class CommandFacadeWithPrimarySecondary extends HystrixCommand <String> { private final static DynamicBooleanProperty usePrimary = DynamicPropertyFactory.getInstance().getBooleanProperty("primarySecondary.usePrimary" , true ); private final int id; public CommandFacadeWithPrimarySecondary (int id) { super (Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX" )) .andCommandKey(HystrixCommandKey.Factory.asKey("PrimarySecondaryCommand" )) .andCommandPropertiesDefaults( HystrixCommandProperties.Setter() .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE))); this .id = id; } @Override protected String run () { if (usePrimary.get()) { return new PrimaryCommand (id).execute(); } else { return new SecondaryCommand (id).execute(); } } @Override protected String getFallback () { return "static-fallback-" + id; } @Override protected String getCacheKey () { return String.valueOf(id); } private static class PrimaryCommand extends HystrixCommand <String> { private final int id; private PrimaryCommand (int id) { super (Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX" )) .andCommandKey(HystrixCommandKey.Factory.asKey("PrimaryCommand" )) .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("PrimaryCommand" )) .andCommandPropertiesDefaults( HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(600 ))); this .id = id; } @Override protected String run () { return "responseFromPrimary-" + id; } } private static class SecondaryCommand extends HystrixCommand <String> { private final int id; private SecondaryCommand (int id) { super (Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX" )) .andCommandKey(HystrixCommandKey.Factory.asKey("SecondaryCommand" )) .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("SecondaryCommand" )) .andCommandPropertiesDefaults( HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(100 ))); this .id = id; } @Override protected String run () { return "responseFromSecondary-" + id; } } public static class UnitTest { @Test public void testPrimary () { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary" , true ); assertEquals("responseFromPrimary-20" , new CommandFacadeWithPrimarySecondary (20 ).execute()); } finally { context.shutdown(); ConfigurationManager.getConfigInstance().clear(); } } @Test public void testSecondary () { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary" , false ); assertEquals("responseFromSecondary-20" , new CommandFacadeWithPrimarySecondary (20 ).execute()); } finally { context.shutdown(); ConfigurationManager.getConfigInstance().clear(); } } } }
客户端不执行网络访问 查看源代码
当你包装不执行网络访问的行为时,但延迟是一个问题或线程开销是不可接受的,你可以将executionIsolationStrategy属性设置为,Hystrix 将使用信号量隔离。ExecutionIsolationStrategy.SEMAPHORE
下面展示了如何通过代码将此属性设置为命令的默认值(您也可以在运行时通过动态属性覆盖它)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class CommandUsingSemaphoreIsolation extends HystrixCommand <String> { private final int id; public CommandUsingSemaphoreIsolation (int id) { super (Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup" )) .andCommandPropertiesDefaults(HystrixCommandProperties.Setter() .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE))); this .id = id; } @Override protected String run () { return "ValueFromHashMap_" + id; } }
Get-Set-Get 与请求缓存失效 查看源代码
如果您正在实现 Get-Set-Get 用例,其中 Get 接收到需要请求缓存的足够流量,但有时 Set 发生在另一个应该使同一请求中的缓存无效的命令上,您可以通过调用HystrixRequestCache.clear().
这是一个示例实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 public class CommandUsingRequestCacheInvalidation { private static volatile String prefixStoredOnRemoteDataStore = "ValueBeforeSet_" ; public static class GetterCommand extends HystrixCommand <String> { private static final HystrixCommandKey GETTER_KEY = HystrixCommandKey.Factory.asKey("GetterCommand" ); private final int id; public GetterCommand (int id) { super (Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("GetSetGet" )) .andCommandKey(GETTER_KEY)); this .id = id; } @Override protected String run () { return prefixStoredOnRemoteDataStore + id; } @Override protected String getCacheKey () { return String.valueOf(id); } public static void flushCache (int id) { HystrixRequestCache.getInstance(GETTER_KEY, HystrixConcurrencyStrategyDefault.getInstance()).clear(String.valueOf(id)); } } public static class SetterCommand extends HystrixCommand <Void> { private final int id; private final String prefix; public SetterCommand (int id, String prefix) { super (HystrixCommandGroupKey.Factory.asKey("GetSetGet" )); this .id = id; this .prefix = prefix; } @Override protected Void run () { prefixStoredOnRemoteDataStore = prefix; GetterCommand.flushCache(id); return null ; } } }
确认行为的单元测试是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Test public void getGetSetGet () { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { assertEquals("ValueBeforeSet_1" , new GetterCommand (1 ).execute()); GetterCommand commandAgainstCache = new GetterCommand (1 ); assertEquals("ValueBeforeSet_1" , commandAgainstCache.execute()); assertTrue(commandAgainstCache.isResponseFromCache()); new SetterCommand (1 , "ValueAfterSet_" ).execute(); GetterCommand commandAfterSet = new GetterCommand (1 ); assertFalse(commandAfterSet.isResponseFromCache()); assertEquals("ValueAfterSet_1" , commandAfterSet.execute()); } finally { context.shutdown(); } } }
将库迁移到 Hystrix 当您迁移现有客户端库以使用 Hystrix 时,您应该将每个“服务方法”替换为HystrixCommand.
然后,服务方法应该将调用转发到HystrixCommand并且其中没有任何额外的业务逻辑。
因此,在迁移之前,服务库可能如下所示:
迁移后,库的用户将能够HystrixCommand通过委托给 s 的服务外观直接或间接访问HystrixCommands。