Soul 网关源码学习(15) - Soul 网关插件 Waf, RateLimiter

WafPlugin 源码分析

WafPlugin 是 Soul 网关请求执行插件链上执行顺序排在 SignPlugin 之后的插件。WafPlugin 继承自 AbstractSoulPlugin, SoulPlugin -> AbstractSoulPlugin -> WafPlugin 使用了模板方法设计模式,
插件执行的主体的逻辑在 execute 方法中, 这个方法在 SoulPlugin 接口中定义, 插件具体的执行逻辑在 AbstractSoulPlugin 抽象方法 doExecute 中由继承 AbstractSoulPlugin 类的子类实现。WafPlugin 的执行逻辑为:

  • 判断内存缓存中是否有插件数据,没有则执行下一个插件
  • 判断插件数据中是否启用的标志位是否为 true,否则执行下一个插件
  • 插件数据中是否有 selector 信息,没有则执行 WafPlugin 的 doExecute 方法
  • selector 信息中是否有与当前请求匹配的 selector,没有则执行 WafPlugin 的 doExecute 方法
  • 与当前请求匹配的 selector 是否存在 rule 信息,没有则执行 WafPlugin 的 doExecute 方法
  • rule 信息中是否有与当前请求匹配的 rule,没有则执行 WafPlugin 的 doExecute 方法
  • 执行 WafPlugin 的 doExecute 方法

WafPlugindoExecute 方法:

@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
WafConfig wafConfig = Singleton.INST.get(WafConfig.class);
if (Objects.isNull(selector) && Objects.isNull(rule)) {
if (WafModelEnum.BLACK.getName().equals(wafConfig.getModel())) {
return chain.execute(exchange);
}
exchange.getResponse().setStatusCode(HttpStatus.FORBIDDEN);
Object error = SoulResultWrap.error(403, Constants.REJECT_MSG, null);
return WebFluxResultUtils.result(exchange, error);
}
String handle = rule.getHandle();
WafHandle wafHandle = GsonUtils.getInstance().fromJson(handle, WafHandle.class);
if (Objects.isNull(wafHandle) || StringUtils.isBlank(wafHandle.getPermission())) {
log.error("waf handler can not configuration:{}", handle);
return chain.execute(exchange);
}
if (WafEnum.REJECT.getName().equals(wafHandle.getPermission())) {
exchange.getResponse().setStatusCode(HttpStatus.FORBIDDEN);
Object error = SoulResultWrap.error(Integer.parseInt(wafHandle.getStatusCode()), Constants.REJECT_MSG, null);
return WebFluxResultUtils.result(exchange, error);
}
return chain.execute(exchange);
}

执行的逻辑如下:

  • 如果传入参数的 selector 和 rule 都是空
    • 官方文档中介绍”如果配置的 model 是 black,即只有匹配的流量才会执行拒绝策略,不匹配的,直接会跳过“, 此时传入参数的 selector 和 rule 都是空, 结果是未匹配任何流量,直接执行下一个插件
    • 官网文档中介绍”当 module 设置为 mixed 模式的时候,所有的流量都会通过 WafPlugin,针对不同的匹配流量,用户可以设置是拒绝,还是通过“, 此时传入参数的 selector 和 rule 都是空, 结果是未匹配任何流量,所以拒绝该请求,响应值为 403
  • 如果 rule 信息不为空,且 rule 里面的 handle (插件处理信息, 主要包含规则相关处理参数) 是空,那么继续执行下一个插件逻辑
  • 如果 rule 的 handle (插件处理信息) 不为空,且配置的拦截策略是 REJECT, 那么拒绝该请求,响应值为拒绝策略中自定义的 statusCode
  • 否则配置的拦截策略是ALLOW,即允许请求通过, 则执行下一个插件逻辑

RateLimiterPlugin 源码分析

Soul 网关中的 RateLimiter 插件采用 Redis 令牌桶算法进行限流, 支持针对接口级别的限流。

限流插件工作原理

RateLimiterPlugin 是 Soul 网关请求执行插件链上执行顺序排在 WafPlugin 之后的插件, RateLimiterPlugin 也是继承自 AbstractSoulPlugin,
RateLimiterPlugin 被调用的时候也是先执行 AbstractSoulPlugin 中的 execute 方法,找到与当前请求匹配的 selector 和 rule 之后,再执行 RateLimiterPlugindoExecute 方法。RateLimiterdoExecute 方法的逻辑如下:

@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
final String handle = rule.getHandle();
final RateLimiterHandle limiterHandle = GsonUtils.getInstance().fromJson(handle, RateLimiterHandle.class);
return redisRateLimiter.isAllowed(rule.getId(), limiterHandle.getReplenishRate(), limiterHandle.getBurstCapacity())
.flatMap(response -> {
if (!response.isAllowed()) {
exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
Object error = SoulResultWrap.error(SoulResultEnum.TOO_MANY_REQUESTS.getCode(), SoulResultEnum.TOO_MANY_REQUESTS.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
return chain.execute(exchange);
});
}

RateLimiterHandle 这个对象的实例里面就是我们配置的令牌生成速率和令牌桶的最大容量,主要实现 Redis 令牌桶算法的逻辑在 RedisRateLimiter 这个类中,主要的逻辑在 isAllowed 方法中:

   @SuppressWarnings("unchecked")
public Mono<RateLimiterResponse> isAllowed(final String id, final double replenishRate, final double burstCapacity) {
if (!this.initialized.get()) {
throw new IllegalStateException("RedisRateLimiter is not initialized");
}
List<String> keys = getKeys(id);
List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "", Instant.now().getEpochSecond() + "", "1");
Flux<List<Long>> resultFlux = Singleton.INST.get(ReactiveRedisTemplate.class).execute(this.script, keys, scriptArgs);
return resultFlux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L)))
.reduce(new ArrayList<Long>(), (longs, l) -> {
longs.addAll(l);
return longs;
}).map(results -> {
boolean allowed = results.get(0) == 1L;
Long tokensLeft = results.get(1);
RateLimiterResponse rateLimiterResponse = new RateLimiterResponse(allowed, tokensLeft);
log.info("RateLimiter response:{}", rateLimiterResponse.toString());
return rateLimiterResponse;
}).doOnError(throwable -> log.error("Error determining if user allowed from redis:{}", throwable.getMessage()));
}

isAllowed 方法执行的逻辑如下:

  • 先判断实例变量 initialized 是否是为 true,来判断该对象的实例是否已经初始化, 如果是未初始化, 则抛出运行时异常 IllegalStateException
  • 执行 lua 脚本, 根据 lua 脚本的执行结果构造 RateLimiterResponse 对象实例,该对象主要的两个属性是 allowed 、tokensRemaining

Redis 中使用 lua 脚本的好处:

  1. 减少网络开销, 可以将多个请求通过脚本的形式一次发送,减少网络时延
  2. 保证了脚本中的批量 Redis 操作的原子性, Redis Server 会将整个脚本中的所有 Redis 操作作为一个整体单元执行,中间不会被其他命令插入

以此来完成 RateLimiter 限流功能。

总结

通过今天的学习,可以学习 soul 网关插件怎么样在运行时通过 Selector,Rule,PluginHandle 完成插件的功能,这样我们如果要为 Soul 开发插件,也能通过这种方式完成想要的功能。

文章作者: David Liu
文章链接: https://davidliu.now.sh/2021/01/30/soul_plugin_source_discoveryII/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 David Liu's Blog