Soul 网关源码学习(19) - Soul 网关插件 Divide

Divide 插件介绍

Divide 插件是 soul 网关中处理 HTTP 协议请求核心的插件, 是进行 http 正向代理的插件,所有 http 类型的请求,都是由该插件进行负载均衡的调用, 目前 soul 网关中支持的负载均衡算法目前有三种: Hash, Random, RoundRobin。Divide 插件的在 soul 网关中的实现代码在 org.dromara.soul.plugin.divide.DividePlugin

soul 网关进行 http 请求正向代理的含义是: soul 网关接收到请求目标是 http 协议的请求之后, 由 soul 网关向目标发起 http 请求, 得到响应之后返回给客户端。

DividePlugin 源码分析

DividePlugin 通过继承 org.dromara.soul.plugin.base.AbstractSoulPlugin 来实现插件的逻辑, 它的实现逻辑在 org.dromara.soul.plugin.divide.DividePlugin#doExecute 方法,

@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
assert soulContext != null;
final DivideRuleHandle ruleHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), DivideRuleHandle.class);
final List<DivideUpstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
if (CollectionUtils.isEmpty(upstreamList)) {
log.error("divide upstream configuration error: {}", rule.toString());
Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
if (Objects.isNull(divideUpstream)) {
log.error("divide has no upstream");
Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
// set the http url
String domain = buildDomain(divideUpstream);
String realURL = buildRealURL(domain, soulContext, exchange);
exchange.getAttributes().put(Constants.HTTP_URL, realURL);
// set the http timeout
exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout());
exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry());
return chain.execute(exchange);
}

doExecute 方法中的运行流程如下:

  • 从 rule 信息中取出 PluginHandle 信息构造出 DivideRuleHandle 对象的实例
  • 根据选择器 selectorId 获取到代理的 upstream 列表
  • 通过 Plugin 中配置的负载均衡算法从 upstream 列表中选择出发送请求的目标 upstream
  • 构造好需要访问服务的 URL 后将目标 HTTP URL, PluginHandle 中的请求超时时间和重试次数放入 ServerWebExchange 中后, 调用插件链中下个继续处理

rule 中的 PluginHandle 信息是在 soul-admin 中配置的, 其中的配置项包括: 负载均衡算法, 重试次数和超时时间。在 doExecute 方法中通过如下代码来获取当前请求匹配选择器的目标 upstream 列表:

final List<DivideUpstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());

UpstreamCacheManager 管理了运行时 soul-admin 同步到 soul-bootstrap 所有选择器对应的 upstream 列表信息, 通过 ConcurrentHashMap 管理, UpstreamCacheManager 中定义了两个 ConcurrentHashMap
类型的私有静态类常量 UPSTREAM_MAPUPSTREAM_MAP_TEMP, 在 UpstreamCacheManager 的构造函数中, 可以看到 UpstreamCacheManager 通过检测系统运行时变量中 soul.upstream.check 的值是否为 true, 来判断是否开启 upstream
探活(定期检查 upstream 是否可访问)。在 submit 方法, removeByKey 方法和定时调度的 scheduled 方法中都包含了操作 UPSTREAM_MAPUPSTREAM_MAP_TEMP 的代码, 我个人理解操作两个 ConcurrentHashMap 有如下好处:

  1. 执行逻辑操作的目标的职责会更清晰, HTTP 请求路由获取 upstream 都只从 UPSTREAM_MAP_TEMP 中查找
  2. 减少锁冲突的可能

DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip); 实现了从 upstream 列表中获取请求的目标 upstream 的逻辑, LoadBalanceUtils 中使用了
soul-spi 模块中实现的 SPI 功能, 来完成负载均衡算法的选取。

总结

通过今天的学习,对 Soul 网关中代理 HTTP 请求的机制有了深入的了解,同时学习到了 SPI 的机制。

文章作者: David Liu
文章链接: https://davidliu.now.sh/2021/02/05/soul_plugin_source_discoveryVI/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 David Liu's Blog