【SpringCloud】Hystrix源码解析

e246a1dda09849a5a89535a62441565d.png

hystrix是一个微服务容错组件,提供了资源隔离、服务降级、服务熔断的功能。这一章重点分析hystrix的实现原理

1、服务降级

CAP原则是分布式系统的一个理论基础,它的三个关键属性分别是一致性、可用性和容错性。当服务实例所在服务器承受过大的压力或者受到网络因素影响没法及时响应请求时,整个任务将处于阻塞状态,这样的系统容错性不高,稍有不慎就会陷入瘫痪,hystrix为此提供了一种容错机制:当服务实例没法及时响应请求,可以采用服务降级的方式快速失败,维持系统的稳定性

服务降级和@HystrixCommand注解绑定,查看它的源码

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface HystrixCommand {...String fallbackMethod() default "";}

源码提供的信息很少,想要分析注解的功能,还得找到处理注解信息的类:HystrixCommandAspect

@Aspect
public class HystrixCommandAspect {...// 环绕通知@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")public Object methodsAnnotatedWithHystrixCommand(ProceedingJoinPoint joinPoint) throws Throwable {Method method = AopUtils.getMethodFromTarget(joinPoint);Validate.notNull(method, "failed to get method from joinPoint: %s", new Object[]{joinPoint});if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser annotations at the same time");} else {MetaHolderFactory metaHolderFactory = (MetaHolderFactory)META_HOLDER_FACTORY_MAP.get(HystrixCommandAspect.HystrixPointcutType.of(method));MetaHolder metaHolder = metaHolderFactory.create(joinPoint);HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ? metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();try {Object result;if (!metaHolder.isObservable()) {// 代理执行方法result = CommandExecutor.execute(invokable, executionType, metaHolder);} else {result = this.executeObservable(invokable, executionType, metaHolder);}return result;} catch (HystrixBadRequestException var9) {throw var9.getCause();} catch (HystrixRuntimeException var10) {throw this.hystrixRuntimeExceptionToThrowable(metaHolder, var10);}}}
}

从命名上我们能看出这是一个切面,说明服务降级是通过aop代理实现的,跟踪CommandExecutor的execute方法

调用链:
-> CommandExecutor.execute
-> castToExecutable(invokable, executionType).execute()
-> HystrixCommand.execute
-> this.queue().get()
public Future<R> queue() {// 获取Future对象final Future<R> delegate = this.toObservable().toBlocking().toFuture();Future<R> f = new Future<R>() {...public R get() throws InterruptedException, ExecutionException {return delegate.get();}public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {return delegate.get(timeout, unit);}};...
}

HystrixCommand类的queue方法返回了一个Future对象,在线程任务中常用Future对象来获取任务执行的结果。这里的Future对象是通过this.toObservable().toBlocking().toFuture()创建的,点击查看toObservable方法,它返回一个Observable对象

public Observable<R> toObservable() {...final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {public Observable<R> call() {return 
((CommandState)AbstractCommand.this.commandState.get()).equals(AbstractCommand.CommandState.UNSUBSCRIBED) ? Observable.never() : // 传入指令执行任务AbstractCommand.this.applyHystrixSemantics(AbstractCommand.this);}};...return Observable.defer(new Func0<Observable<R>>() {public Observable<R> call() {...// 有订阅者订阅了才创建Observable对象Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics).map(wrapWithAllOnNextHooks);Observable afterCache;if (requestCacheEnabled && cacheKey != null) {HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, AbstractCommand.this);HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache)AbstractCommand.this.requestCache.putIfAbsent(cacheKey, toCache);if (fromCache != null) {toCache.unsubscribe();AbstractCommand.this.isResponseFromCache = true;return AbstractCommand.this.handleRequestCacheHitAndEmitValues(fromCache, AbstractCommand.this);}afterCache = toCache.toObservable();} else {afterCache = hystrixObservable;}return afterCache.doOnTerminate(terminateCommandCleanup).doOnUnsubscribe(unsubscribeCommandCleanup).doOnCompleted(fireOnCompletedHook);...}});        
}

Observable对象的创建任务委托了给了AbstractCommand.this.applyHystrixSemantics方法

private Observable<R> applyHystrixSemantics(AbstractCommand<R> _cmd) {this.executionHook.onStart(_cmd);// 是否允许请求,判断熔断状态if (this.circuitBreaker.allowRequest()) {final TryableSemaphore executionSemaphore = this.getExecutionSemaphore();final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);Action0 singleSemaphoreRelease = new Action0() {public void call() {if (semaphoreHasBeenReleased.compareAndSet(false, true)) {executionSemaphore.release();}}};Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {public void call(Throwable t) {AbstractCommand.this.eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, AbstractCommand.this.commandKey);}};if (executionSemaphore.tryAcquire()) {try {this.executionResult = this.executionResult.setInvocationStartTime(System.currentTimeMillis());// 执行任务return this.executeCommandAndObserve(_cmd).doOnError(markExceptionThrown).doOnTerminate(singleSemaphoreRelease).doOnUnsubscribe(singleSemaphoreRelease);} catch (RuntimeException var7) {return Observable.error(var7);}} else {return this.handleSemaphoreRejectionViaFallback();}} else {// 处于熔断状态,执行备用任务return this.handleShortCircuitViaFallback();}
}

this.circuitBreaker.allowReques返回true表示没有熔断,走executeCommandAndObserve方法

private Observable<R> executeCommandAndObserve(AbstractCommand<R> _cmd) {...Observable execution;if ((Boolean)this.properties.executionTimeoutEnabled().get()) {// 添加了超时监控execution = this.executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator(_cmd));} else {execution = this.executeCommandWithSpecifiedIsolation(_cmd);}...// handleFallback:不同异常状况下使用不同的处理方法Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {public Observable<R> call(Throwable t) {Exception e = AbstractCommand.this.getExceptionFromThrowable(t);AbstractCommand.this.executionResult = AbstractCommand.this.executionResult.setExecutionException(e);if (e instanceof RejectedExecutionException) {return AbstractCommand.this.handleThreadPoolRejectionViaFallback(e);} else if (t instanceof HystrixTimeoutException) {// 抛出超时异常时,做超时处理return AbstractCommand.this.handleTimeoutViaFallback();} else if (t instanceof HystrixBadRequestException) {return AbstractCommand.this.handleBadRequestByEmittingError(e);} else if (e instanceof HystrixBadRequestException) {AbstractCommand.this.eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, AbstractCommand.this.commandKey);return Observable.error(e);} else {return AbstractCommand.this.handleFailureViaFallback(e);}}};...return execution.doOnNext(markEmits).doOnCompleted(markOnCompleted)// 调用handleFallback处理异常.onErrorResumeNext(handleFallback).doOnEach(setRequestContext);
}
private static class HystrixObservableTimeoutOperator<R> implements Observable.Operator<R, R> {final AbstractCommand<R> originalCommand;public HystrixObservableTimeoutOperator(AbstractCommand<R> originalCommand) {this.originalCommand = originalCommand;}public Subscriber<? super R> call(final Subscriber<? super R> child) {final CompositeSubscription s = new CompositeSubscription();child.add(s);final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(this.originalCommand.concurrencyStrategy, new Runnable() {public void run() {// 3.抛出超时异常child.onError(new HystrixTimeoutException());}});HystrixTimer.TimerListener listener = new HystrixTimer.TimerListener() {// 1.判断是否超时public void tick() {if (HystrixObservableTimeoutOperator.this.originalCommand.isCommandTimedOut.compareAndSet(AbstractCommand.TimedOutStatus.NOT_EXECUTED, AbstractCommand.TimedOutStatus.TIMED_OUT)) {HystrixObservableTimeoutOperator.this.originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, HystrixObservableTimeoutOperator.this.originalCommand.commandKey);s.unsubscribe();// 2.执行超时任务timeoutRunnable.run();}}};}}

executeCommandAndObserve方法添加超时监控,如果任务执行超出限制时间会抛出超时异常,由handleTimeoutViaFallback方法处理异常

private Observable<R> handleTimeoutViaFallback() {// 1.根据异常类型处理异常return this.getFallbackOrThrowException(this, HystrixEventType.TIMEOUT, FailureType.TIMEOUT, "timed-out", new TimeoutException());
}private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, HystrixEventType eventType, final HystrixRuntimeException.FailureType failureType, final String message, final Exception originalException) {...// 获取回调观察者fallbackExecutionChain = this.getFallbackObservable();...
}    protected final Observable<R> getFallbackObservable() {return Observable.defer(new Func0<Observable<R>>() {public Observable<R> call() {try {// 执行备用方法return Observable.just(HystrixCommand.this.getFallback());} catch (Throwable var2) {return Observable.error(var2);}}});
}

到这里终于看到了getFallback方法,它会调用注解中fallback指向的方法,快速失败返回响应结果

protected Object getFallback() {// 获取注解中的备用方法信息final CommandAction commandAction = this.getFallbackAction();if (commandAction != null) {try {return this.process(new AbstractHystrixCommand<Object>.Action() {Object execute() {MetaHolder metaHolder = commandAction.getMetaHolder();Object[] args = CommonUtils.createArgsForFallback(metaHolder, GenericCommand.this.getExecutionException());return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);}});} catch (Throwable var3) {LOGGER.error(FallbackErrorMessageBuilder.create().append(commandAction, var3).build());throw new FallbackInvocationException(ExceptionUtils.unwrapCause(var3));}} else {return super.getFallback();}
}

回到AbstractCommand.this.applyHystrixSemantics方法,当this.circuitBreaker.allowReques返回true是请求正常往下走,当它返回false时表示服务进入熔断状态,会走else分支,同样会进入getFallback方法

调用链
-> AbstractCommand.handleShortCircuitViaFallback
-> getFallbackOrThrowException
-> this.getFallbackObservable
-> GenericCommand.getFallback

2、服务熔断

服务熔断是hystrix提供的一种保护机制,当一段时间内服务响应的异常的次数过多,hystrix会让服务降级快速返回失败信息,避免累积压力造成服务崩溃。
联系上文找到circuitBreaker.allowRequest方法,该方法判断是否允许请求往下走

public boolean allowRequest() {// 是否强制打开熔断if ((Boolean)this.properties.circuitBreakerForceOpen().get()) {return false;// 是否强制关闭熔断} else if ((Boolean)this.properties.circuitBreakerForceClosed().get()) {this.isOpen();return true;} else {return !this.isOpen() || this.allowSingleTest();}
}public boolean isOpen() {if (this.circuitOpen.get()) {return true;} else {HystrixCommandMetrics.HealthCounts health = this.metrics.getHealthCounts();// 请求次数是否超过单位时间内请求数阈值if (health.getTotalRequests() < (long)(Integer)this.properties.circuitBreakerRequestVolumeThreshold().get()) {return false;// 请求异常次数占比} else if (health.getErrorPercentage() < (Integer)this.properties.circuitBreakerErrorThresholdPercentage().get()) {return false;} else if (this.circuitOpen.compareAndSet(false, true)) {this.circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());return true;} else {return true;}}
}

isOpen方法内有针对请求的各种量化计算,当请求异常情况过多,就会触发熔断,走服务降级

3、总结

hystrix组件会根据请求状态判断是否执行请求,当请求超时或者存在其他异常会走备用方法,当异常次数过多会进入熔断状态快速失败,避免服务累积过多压力

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/37455.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

前端工程化09-webpack静态的模块化打包工具(未完结)

9.1、开发模式的进化历史 webpacks是一个非常非常的强大的一个工具&#xff0c;相应的这个东西的学习也是有一定的难度的&#xff0c;里边的东西非常的多&#xff0c;里面涉及到的 概念的话也是非常非常的多的。 这个东西既然非常重要&#xff0c;那么在我们前端到底处于怎样…

HCIA4.26-5.10

OSPF ——开放式最短路径优先协议 无类别链路状态IGP动态路由协议 距离矢量协议 运行距离矢量协议的路由器会周期性的泛洪自己的路由表&#xff0c;通过路由之间的交互&#xff0c;每台路由器都从相邻的路由器学习到路由条目&#xff0c;随后加载进自己的路由表中。对于网络…

Python代码分析和修复工具库之coala使用详解

概要 代码质量在软件开发中至关重要,保持代码的可读性、一致性和易维护性是每个开发者的目标。coala 是一个开源的代码分析和修复工具,旨在帮助开发者自动化代码质量检查,支持多种编程语言,包括 Python、C++、JavaScript 等。通过使用 coala,开发者可以方便地集成代码检查…

AI时代的软件工程:挑战与改变

人工智能&#xff08;AI&#xff09;正以惊人的速度改变着我们的生活和工作方式。作为与AI关系最为密切的领域之一&#xff0c;软件工程正经历着深刻的转变。 1 软件工程的演变 软件工程的起源 软件工程&#xff08;Software Engineering&#xff09;是关于如何系统化、规范化地…

input调用手机摄像头实现拍照功能vue

项目需要一个拍照功能&#xff0c;实现功能如下图所示:若使用浏览器则可以直接上传图片&#xff0c;若使用手机则调用手机摄像头拍照。 1.代码结构 <!--input标签--> <input ref"photoRef"type"file"accept"image/*"capture"envir…

基于多源数据的密码攻防领域知识图谱构建

源自&#xff1a; 信息安全与通信保密杂志社 作者&#xff1a;曹增辉 , 郭渊博 , 黄慧敏 摘 要 提高网络空间安全的密码攻防能力&#xff0c;需要形成可表示、可共享、可分析的领域知识模式和知识库。利用自顶向下的构建方法&#xff0c;并通过本体构建方法梳理密码攻防领域…

IPSec:互联网协议安全机制的深度解析与应用

目录 一、IPSec概述 二、IPSec的组成 三、IPSec的工作原理 四、IPSec的用途 IPSec&#xff08;Internet Protocol Security&#xff09;作为现代网络通信中不可或缺的安全基础设施&#xff0c;旨在为基于IP&#xff08;Internet Protocol&#xff09;的数据传输提供端到端的…

【Linux】虚拟机安装openEuler 24.03 X86_64 教程

目录 一、概述 1.1 openEuler 覆盖全场景的创新平台 1.2 系统框架 1.3 平台框架 二、安装详细步骤 一、概述 1.1 openEuler 覆盖全场景的创新平台 openEuler 已支持 x86、Arm、SW64、RISC-V、LoongArch 多处理器架构&#xff0c;逐步扩展 PowerPC 等更多芯片架构支持&…

iptables 防火墙(一)

iptables 防火墙&#xff08;一&#xff09; 一、Linux 防火墙基础防火墙分类 二、iptables 的表、链结构规则表规则链数据包过滤的匹配流程 三、编写防火墙规则iptables 的安装iptables的基本语法规则的匹配条件通用匹配隐含匹配显式匹配 四、总结 在网络安全的世界里&#xf…

XRP对接文档

XRP对接文档 技术预研 参考文档 官方文档: https://xrpl.org/list-xrp-in-your-exchange.html 官方文档: https://xrpl.org/list-xrp-as-an-exchange.html#flow-of-funds 交易所对接XRP(内容齐全, 很推荐) https://blog.csdn.net/weixin_40396076/article/details/10020207…

基于51单片机的篮球计时器Proteus仿真

文章目录 一、篮球计时器1.题目要求2.思路3.仿真图3.1 未仿真时3.2 仿真开始3.3 A队进分3.4 B队进分3.5 比赛结束 4.仿真程序4.1 主函数4.2 时间显示4.3 比分显示4.4 按键扫描 二、总结 一、篮球计时器 1.题目要求 以51单片机为核心&#xff0c;设计并制作篮球计时器 基本功…

python实现符文加、解密

在历史悠久的加密技术中&#xff0c;恺撒密码以其简单却有效的原理闻名。通过固定的字母位移&#xff0c;明文可以被转换成密文&#xff0c;而解密则是逆向操作。这种技术不仅适用于英文字母&#xff0c;还可以扩展到其他语言的字符体系&#xff0c;如日语的平假名或汉语的拼音…

医院管理系统带万字文档医院预约挂号管理系统基于spingboot和vue的前后端分离java项目java课程设计java毕业设计

文章目录 仓库管理系统一、项目演示二、项目介绍三、万字项目文档四、部分功能截图五、部分代码展示六、底部获取项目源码带万字文档&#xff08;9.9&#xffe5;带走&#xff09; 仓库管理系统 一、项目演示 医院管理系统 二、项目介绍 基于springbootvue的前后端分离医院管…

跨阻放大器

#创作灵感# 最近涉及到微电流的监测项目&#xff0c;而里面的核心就是跨阻放大器&#xff0c;所以这里做一个简单的介绍&#xff0c;后续等项目完成了&#xff0c;再做一个实例的介绍。 #正文# 跨阻放大器&#xff08;Transimpedance Amplifier, TIA&#xff09;是一种将输入电…

NCBI Virus 帮助文档

What is NCBI Virus?&#xff08;什么是NCBI病毒&#xff09; 主要功能&#xff1a; Compare your sequence to those in the NCBI Virus database using NCBI BLAST algorithm. 使用NCBI BLAST算法将您的序列与NCBI病毒数据库中的序列进行比较。Search, view and download …

威联通 NAS 磁盘扩容 更换大容量磁盘具体操作以以TS-532X为例

第一步 检查磁盘状态 打开存储与快照总管&#xff0c;选左侧磁盘查看磁盘状态&#xff0c;应该是就绪状态。 三块磁盘都是就绪状态。 上面截图是更换过程中的截图 具体操作 然后点击存储/快照 &#xff0c;选管理 选逐一更换磁盘&#xff0c;这里raid组需要注意&#xff0…

【LeetCode】 740. 删除并获得点数

这真是一道好题&#xff01;这道题不仅考察了抽象思维&#xff0c;还考察了分析能力、化繁为简的能力&#xff0c;同时还有对基本功的考察。想顺利地做出这道题还挺不容易&#xff01;我倒在了第一步与第二步&#xff1a;抽象思维和化繁为简。题目的要求稍微复杂一些&#xff0…

数据恢复篇:如何在电脑上恢复已删除和丢失的音乐文件

尽管流媒体网络非常流行&#xff0c;但许多人仍然选择将音乐下载并保存在 PC 本地。这会使文件面临丢失或意外删除的风险。 幸运的是&#xff0c;您可以使用数据恢复软件恢复已删除的音乐和其他文件类型。这篇文章讨论了这些解决方案以及如何使用奇客数据恢复检索丢失的音乐文…

02.Linux下安装FFmpeg

目录 一、下载FFmpeg的编译源码 二、编译源码 三、ffmpeg工具结构解析 1、bin目录 2、include库 3、lib库 四、注意事项 五、可能出现的一些问题 1、某些工具未安装/版本过久 2、缺少pkg-config工具 3、缺少ffmplay FFmpeg 是一个开源的跨平台音视频处理工具集&…

科普文:八大排序算法(JAVA实现)+ 自制动画 (袁厨的算法小屋)

我将我仓库里的排序算法给大家汇总整理了一下&#xff0c;写的非常非常细&#xff0c;还对每个算法制作了动画&#xff0c;一定能够对大家有所帮助&#xff0c;欢迎大家阅读。另外我也对 leetcode 上面可以用排序算法秒杀的算法题进行了总结&#xff0c;会在后面的文章中进行发…