Soul 网关源码学习(17) - Soul 网关插件 Hystrix, Sentinel

soul 网关中关于熔断和限流的插件的可选实现有 HystrixPlugin, SentinelPlugin, Reslience4jPlugin, 这三个插件属于 soul 网关插件链中转发请求之前插件。

HystrixPlugin 源码分析

什么是 Hystrix

Hystrix 是 Netflix 团队开源的一个框架,它提供了熔断、隔离、Fallback、cache、监控等功能,能够保证在一个分布式系统中,一个或者多个依赖服务出问题的情况下,保证整体服务可用。Netflix Hystrix 是 SOA /
微服务架构中提供服务隔离、熔断、降级机制的工具 / 框架。Netflix Hystrix 是断路器的一种实现,用于高微服务架构的可用性,是防止服务出现雪崩的利器。如果想详细了解 Hystrix 的工作原理,请查看 Netflix Hystrix 断路器简介与工作原理

源码分析

HystrixPlugin 通过继承 AbstractSoulPlugin 完成插件的实现,HystrixPlugin 的插件逻辑在方法 doExecute 中实现,在 selector 信息和 rule 信息在插件的通用逻辑 AbstractSoulPluginexecute 方法中获取,doExecute 方法的实现为:

@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
assert soulContext != null;
final HystrixHandle hystrixHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), HystrixHandle.class);
if (StringUtils.isBlank(hystrixHandle.getGroupKey())) {
hystrixHandle.setGroupKey(Objects.requireNonNull(soulContext).getModule());
}
if (StringUtils.isBlank(hystrixHandle.getCommandKey())) {
hystrixHandle.setCommandKey(Objects.requireNonNull(soulContext).getMethod());
}
Command command = fetchCommand(hystrixHandle, exchange, chain);
return Mono.create(s -> {
Subscription sub = command.fetchObservable().subscribe(s::success,
s::error, s::success);
s.onCancel(sub::unsubscribe);
if (command.isCircuitBreakerOpen()) {
log.error("hystrix execute have circuitBreaker is Open! groupKey:{},commandKey:{}", hystrixHandle.getGroupKey(), hystrixHandle.getCommandKey());
}
}).doOnError(throwable -> {
log.error("hystrix execute exception:", throwable);
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.ERROR.getName());
chain.execute(exchange);
}).then();
}

private Command fetchCommand(final HystrixHandle hystrixHandle, final ServerWebExchange exchange, final SoulPluginChain chain) {
if (hystrixHandle.getExecutionIsolationStrategy() == HystrixIsolationModeEnum.SEMAPHORE.getCode()) {
return new HystrixCommand(HystrixBuilder.build(hystrixHandle),
exchange, chain, hystrixHandle.getCallBackUri());
}
return new HystrixCommandOnThread(HystrixBuilder.buildForHystrixCommand(hystrixHandle),
exchange, chain, hystrixHandle.getCallBackUri());
}

从源码中我们可以看到 HystrixPlugin 的处理方法可以分为 3 块:获取 HystrixHandle -> 构建 Command 信息 -> 构建返回 Mono 信息

  • 获取 HystrixHandle

HystrixHandle 从插件的规则信息中获取,定义了插件在完全匹配之后如何运行,在 soul-admin 中维护。HystrixHandle 中维护了构建 com.netflix.hystrix.HystrixCommand 的相关参数,相关的配置对象还有 org.dromara.soul.common.dto.convert.HystrixThreadPoolConfig

  • 构建 Command 信息

    构建 Command 信息的方式入口在 fetchCommand 中,通过源码可以看出在构建的过程中通过 hystrixHandleexecutionIsolationStrategy 字段决定插件运行时以哪种模式隔离 HystrixCommand, 默认使用信号量隔离

    soul 网关中,org.dromara.soul.plugin.hystrix.command.Command 接口定义了 HystrixCommand 的行为,它有两个实现类:

    • org.dromara.soul.plugin.hystrix.command.HystrixCommand

      soul 网关中通过 Hystrix 信号量隔离的模式执行 HystrixCommand 的实现,因为使用信号量在执行请求的时候使用的同步调用的方式,所以 org.dromara.soul.plugin.hystrix.command.HystrixCommand 继承了 com.netflix.hystrix.HystrixObservableCommand<Void>, 通过子类实现 Observable<Void> construct() 使构建的 HystrixCommand 支持非阻塞式的执行调用。

    • org.dromara.soul.plugin.hystrix.command.HystrixCommandOnThread

      soul 网关中通过 Hystrix 线程池隔离模式执行 HystrixCommand 的实现,它的主要逻辑在 run 方法中。

  • 构建返回 Mono 信息

    使用 Mono.create() 构建返回值,这里使用到了 reactor 的语法,在 subscribe 处理逻辑中将 HystrixCommand 产生的 Observable 对象的订阅信号绑定到当前信号处理中,使用 then 方法返回 Mono.empty()

SentinelPlugin 源码分析

什么是 Sentinel

Sentinel 是面向分布式服务架构的流量控制组件,主要以流量为切入点,从限流、流量整形、熔断降级、系统负载保护、热点防护等多个维度来帮助开发者保障微服务的稳定性。如果想详细了解 Sentinel 工作原理,请查看 Sentinel Github 主页

源码分析

SentinelPlugin 通过继承 AbstractSoulPlugin 完成插件的实现,SentinelPlugin 的插件逻辑在方法 doExecute 中实现,在 selector 信息和 rule 信息在插件的通用逻辑 AbstractSoulPluginexecute 方法中获取,doExecute 方法的实现为:

@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
assert soulContext != null;
String resourceName = SentinelRuleHandle.getResourceName(rule);
SentinelHandle sentinelHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), SentinelHandle.class);
return chain.execute(exchange).transform(new SentinelReactorTransformer<>(resourceName)).doOnSuccess(v -> {
if (exchange.getResponse().getStatusCode() != HttpStatus.OK) {
HttpStatus status = exchange.getResponse().getStatusCode();
exchange.getResponse().setStatusCode(null);
throw new SentinelFallbackException(status);
}
}).onErrorResume(throwable -> sentinelFallbackHandler.fallback(exchange, UriUtils.createUri(sentinelHandle.getFallbackUri()), throwable));
}

SentinelPlugindoExecute 方法处理流程分为两块: 获取 SentinelHandle -> 构建返回 Mono 信息

  • 获取 SentinelHandle

doExecute 的方法逻辑中可以看出,处理逻辑只会使用到获取到的 SentinelHandle 对象的实例中的 fallbackUri 属性,SentinelHandle 中的其它属性的使用在 org.dromara.soul.plugin.sentinel.handler.SentinelRuleHandlehandlerRule 方法中,在处理数据同步流程中,接收到 SentinelPlugin 的规则数据后会注册 Sentinel 的运行时规则信息。

  • 构建返回 Mono 信息

从源码中可以看出构建返回信息的逻辑中有几个主要的动作,

  1. 利用 com.alibaba.csp.sentinel.adapter.reactor.SentinelReactorTransformer 将 Sentinel 相关的资源限流执行加入了网关请求执行流程中
  2. 在网关请求执行流程中注册成功的回调事件
  3. 在处理异常的回调方法 onErrorResume 的处理中织入 sentinelFallbackHandler 实例的 fallback 方法。

fallback 方法是接口 org.dromara.soul.plugin.base.fallback.FallbackHandler 中的默认方法,org.dromara.soul.plugin.sentinel.fallback.SentinelFallbackHandler 实现了 org.dromara.soul.plugin.base.fallback.FallbackHandler 接口,fallback 方法会使用到 FallbackHandler 中的另一个方法 generateError, 在实现类 SentinelFallbackHandlergenerateError 定义了如何构建一个包含生成异常的 Mono 对象。

总结

通过今天的源码学习,了解到了 soul 网关中 Hystrix 和 Sentinel 插件实现熔断和限流的工作原理,以及一些 Java 响应式编程的知识,HystrixPlugin 对 Hystrix 的支持的实现值得好好学习,特别是 Command 接口抽象设计上面以及响应式编程的实际应用技巧。

参考

文章作者: David Liu
文章链接: https://davidliu.now.sh/2021/02/03/soul_plugin_source_discoveryIV/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 David Liu's Blog