Soul 网关源码学习(8) - HTTP 长轮询数据同步机制 (1)

HTTP 长轮询机制原理

HTTP 轮询技术有一个前置条件,HTTP 客户端认为 HTTP 服务端上 HTTP 客户端关注的资源会经常发生改变,轮询就是在 HTTP 通讯场景下从 HTTP 客户端出发的一种数据同步的机制,其工作原理是 HTTP 客户端发送一次请求到 HTTP 服务端,以获取 HTTP 服务端最新的数据,轮询按照其工作机制的不同分为:

  • 短轮询

HTTP 短轮询的基本思路就是 HTTP 客户端每隔一段时间(通常这个时间间隔设置都比较短)向 HTTP 服务器发送 HTTP 请求,服务器端在收到请求后,不论是否有数据更新,都直接进行响应,本质上还是 HTTP 客户端发送请求,服务器接受请求的一个过程,通过让 HTTP 客户端不断地发送请求,使得
HTTP 客户端能够模拟实时地收到服务器端的数据的变化(HTTP 客户端 “感知” HTTP 服务端数据变化的及时性取决于每两次 HTTP 客户端到 HTTP 服务端请求的间隔时间间隔时间越短,数据不一致的风险越小)。

通过段轮询实现数据同步的优点有:

  1. 方案的实现简单,只需要 HTTP 客户端侧编程实现定时发送 HTTP 请求,HTTP 服务端返回数据的接口没有任何改造

但是,这种方案的缺点也特别明显:

  1. 当 HTTP 服务器上待同步的资源变更的频率不是特别大的时候,实际上短轮询的多次 HTTP 请求返回的结果都是一样的,HTTP 客户端发送了很多次对于数据同步来说没有意义的请求
  2. 这种方式由于需要不断的建立 HTTP 连接,严重浪费了服务器端和客户端的资源;如果有过多的 HTTP 客户端使用段轮询方案和同一个 HTTP 服务器同步数据,会造成 HTTP 服务器的请求数量偏大,过多地消耗服务器的计算资源
  • 长轮询

HTTP 长轮询机制在 HTTP 短轮询的基础上做了一定的优化:当服务器收到客户端发来的请求后,服务器端不会直接进行响应,而是先将这个请求挂起,然后判断服务器端数据是否有更新。如果有更新,则进行响应,如果数据没有更新,则到达一定的时间限制 (服务器端设置) 轮询请求才返回。长轮询机制和短轮询机制比起来,明显减少了很多客户端和服务器之间不必要的 HTTP 请求次数,相比之下节约了资源。
但是在服务端挂起请求,也会导致资源的浪费。

soul 网关中 HTTP 长轮询数据同步的使用

soul admin 侧提供服务于 HTTP 长轮询数据同步的接口定义在 org.dromara.soul.admin.controller.ConfigController 中分别是:

GET /configs/fetch
POST /configs/listener

/configs/fetch 接口提供获取不同 ConfigGroup 的配置信息,ConfigGroup 的可选值有:

值名称 值说明
APP_AUTH 应用认证信息
PLUGIN 插件信息
SELECTOR 选择器信息
RULE 规则信息
META_DATA 元数据信息

调用接口的时候传入需要获取的 ConfigGroup 到 groupKeys, 比如: 调用 http://127.0.0.1:9095/configs/fetch?groupKeys=APP_AUTH&groupKeys=PLUGIN 请求来获取 APP_AUTH 数据信息和 PLUGIN 数据信息。实现长轮询的接口是
POST /configs/listener

长轮询的客户端和服务端的调用时序图

http long polling sequence

实现 HTTP 长轮询的关键的类是 org.dromara.soul.admin.listener.http.HttpLongPollingDataChangedListener,和长轮询有关的操作入口有:

  • org.dromara.soul.admin.listener.http.HttpLongPollingDataChangedListener#doLongPolling, 调用的入口是 POST /configs/listener 接口
  • 继承自 org.dromara.soul.admin.listener.AbstractDataChangedListener 的抽象方法 afterMetaDataChanged, afterAppAuthChanged, afterPluginChanged, afterRuleChanged, afterSelectorChanged

这个类中有两个内部类,分别是: org.dromara.soul.admin.listener.http.HttpLongPollingDataChangedListener.LongPollingClientorg.dromara.soul.admin.listener.http.HttpLongPollingDataChangedListener.DataChangeTask 这两个类都实现了 java.lang.Runnable 接口,可以被
HttpLongPollingDataChangedListener 类内部的线程池调度,相关的代码如下:

public class HttpLongPollingDataChangedListener extends AbstractDataChangedListener {
......
private static final ReentrantLock LOCK = new ReentrantLock();

/**
* Blocked client.
*/
private final BlockingQueue<LongPollingClient> clients;

private final ScheduledExecutorService scheduler;
......
public HttpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) {
this.clients = new ArrayBlockingQueue<>(1024);
this.scheduler = new ScheduledThreadPoolExecutor(1, SoulThreadFactory.create("long-polling", true));
this.httpSyncProperties = httpSyncProperties;
}

@Override
protected void afterInitialize() {
long syncInterval = httpSyncProperties.getRefreshInterval().toMillis();
// Periodically check the data for changes and update the cache
scheduler.scheduleWithFixedDelay(() -> {
log.info("http sync strategy refresh config start.");
try {
this.refreshLocalCache();
log.info("http sync strategy refresh config success.");
} catch (Exception e) {
log.error("http sync strategy refresh config error!", e);
}
}, syncInterval, syncInterval, TimeUnit.MILLISECONDS);
log.info("http sync strategy refresh interval: {}ms", syncInterval);
}

private void refreshLocalCache() {
this.updateAppAuthCache();
this.updatePluginCache();
this.updateRuleCache();
this.updateSelectorCache();
this.updateMetaDataCache();
}

/**
* If the configuration data changes, the group information for the change is immediately responded.
* Otherwise, the client's request thread is blocked until any data changes or the specified timeout is reached.
*
* @param request the request
* @param response the response
*/
public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {

// compare group md5
List<ConfigGroupEnum> changedGroup = compareChangedGroup(request);
String clientIp = getRemoteIp(request);

// response immediately.
if (CollectionUtils.isNotEmpty(changedGroup)) {
this.generateResponse(response, changedGroup);
log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup);
return;
}

// listen for configuration changed.
final AsyncContext asyncContext = request.startAsync();

// AsyncContext.settimeout() does not timeout properly, so you have to control it yourself
asyncContext.setTimeout(0L);

// block client's thread.
scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));
}

@Override
protected void afterAppAuthChanged(final List<AppAuthData> changed, final DataEventTypeEnum eventType) {
scheduler.execute(new DataChangeTask(ConfigGroupEnum.APP_AUTH));
}

@Override
protected void afterMetaDataChanged(final List<MetaData> changed, final DataEventTypeEnum eventType) {
scheduler.execute(new DataChangeTask(ConfigGroupEnum.META_DATA));
}

@Override
protected void afterPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {
scheduler.execute(new DataChangeTask(ConfigGroupEnum.PLUGIN));
}

@Override
protected void afterRuleChanged(final List<RuleData> changed, final DataEventTypeEnum eventType) {
scheduler.execute(new DataChangeTask(ConfigGroupEnum.RULE));
}

@Override
protected void afterSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {
scheduler.execute(new DataChangeTask(ConfigGroupEnum.SELECTOR));
}

private List<ConfigGroupEnum> compareChangedGroup(final HttpServletRequest request) {
List<ConfigGroupEnum> changedGroup = new ArrayList<>(4);
for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
// md5,lastModifyTime
String[] params = StringUtils.split(request.getParameter(group.name()), ',');
if (params == null || params.length != 2) {
throw new SoulException("group param invalid:" + request.getParameter(group.name()));
}
String clientMd5 = params[0];
long clientModifyTime = NumberUtils.toLong(params[1]);
ConfigDataCache serverCache = CACHE.get(group.name());
// do check.
if (this.checkCacheDelayAndUpdate(serverCache, clientMd5, clientModifyTime)) {
changedGroup.add(group);
}
}
return changedGroup;
}

/**
* check whether the client needs to update the cache.
* @param serverCache the admin local cache
* @param clientMd5 the client md5 value
* @param clientModifyTime the client last modify time
* @return true: the client needs to be updated, false: not need.
*/
private boolean checkCacheDelayAndUpdate(final ConfigDataCache serverCache, final String clientMd5, final long clientModifyTime) {

// is the same, doesn't need to be updated
if (StringUtils.equals(clientMd5, serverCache.getMd5())) {
return false;
}

// if the md5 value is different, it is necessary to compare lastModifyTime.
long lastModifyTime = serverCache.getLastModifyTime();
if (lastModifyTime >= clientModifyTime) {
// the client's config is out of date.
return true;
}

// the lastModifyTime before client, then the local cache needs to be updated.
// Considering the concurrency problem, admin must lock,
// otherwise it may cause the request from soul-web to update the cache concurrently, causing excessive db pressure
boolean locked = false;
try {
locked = LOCK.tryLock(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return true;
}
if (locked) {
try {
ConfigDataCache latest = CACHE.get(serverCache.getGroup());
if (latest != serverCache) {
// the cache of admin was updated. if the md5 value is the same, there's no need to update.
return !StringUtils.equals(clientMd5, latest.getMd5());
}
// load cache from db.
this.refreshLocalCache();
latest = CACHE.get(serverCache.getGroup());
return !StringUtils.equals(clientMd5, latest.getMd5());
} finally {
LOCK.unlock();
}
}

// not locked, the client need to be updated.
return true;

}

/**
* Send response datagram.
*
* @param response the response
* @param changedGroups the changed groups
*/
private void generateResponse(final HttpServletResponse response, final List<ConfigGroupEnum> changedGroups) {
try {
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
response.setContentType(MediaType.APPLICATION_JSON_VALUE);
response.setStatus(HttpServletResponse.SC_OK);
response.getWriter().println(GsonUtils.getInstance().toJson(SoulAdminResult.success(SoulResultMessage.SUCCESS, changedGroups)));
} catch (IOException ex) {
log.error("Sending response failed.", ex);
}
}

/**
* When a group's data changes, the thread is created to notify the client asynchronously.
*/
class DataChangeTask implements Runnable {

/**
* The Group where the data has changed.
*/
private final ConfigGroupEnum groupKey;

/**
* The Change time.
*/
private final long changeTime = System.currentTimeMillis();

/**
* Instantiates a new Data change task.
*
* @param groupKey the group key
*/
DataChangeTask(final ConfigGroupEnum groupKey) {
this.groupKey = groupKey;
}

@Override
public void run() {
for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext();) {
LongPollingClient client = iter.next();
iter.remove();
client.sendResponse(Collections.singletonList(groupKey));
log.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime);
}
}
}

class LongPollingClient implements Runnable {

/**
* The Async context.
*/
private final AsyncContext asyncContext;

/**
* The Ip.
*/
private final String ip;

/**
* The Timeout time.
*/
private final long timeoutTime;

/**
* The Async timeout future.
*/
private Future<?> asyncTimeoutFuture;

/**
* Instantiates a new Long polling client.
*
* @param ac the ac
* @param ip the ip
* @param timeoutTime the timeout time
*/
LongPollingClient(final AsyncContext ac, final String ip, final long timeoutTime) {
this.asyncContext = ac;
this.ip = ip;
this.timeoutTime = timeoutTime;
}

@Override
public void run() {
this.asyncTimeoutFuture = scheduler.schedule(() -> {
clients.remove(LongPollingClient.this);
List<ConfigGroupEnum> changedGroups = compareChangedGroup((HttpServletRequest) asyncContext.getRequest());
sendResponse(changedGroups);
}, timeoutTime, TimeUnit.MILLISECONDS);
clients.add(this);
}

/**
* Send response.
*
* @param changedGroups the changed groups
*/
void sendResponse(final List<ConfigGroupEnum> changedGroups) {
// cancel scheduler
if (null != asyncTimeoutFuture) {
asyncTimeoutFuture.cancel(false);
}
generateResponse((HttpServletResponse) asyncContext.getResponse(), changedGroups);
asyncContext.complete();
}
}
}

doLongPolling 方法中使用了 AsyncContext,在 Servlet 3.0 中,在 ServletRequest 上提供了 startAsync 方法, 来开启异步 Servlet 操作,doLongPolling 方法调用内部私有的方法 compareChangedGroup 来检查 ConfigGroup 是否发生了变化

AsyncContext 是 Servlet 3.0 新增的异步处理机制相关的内容,Servlet 3.0 中的异步操作可以先释放容器分配给请求的线程与相关资源,减轻系统负担,原先释放了容器所分配线程的请求,其响应将被延后,可以在处理完成(例如长时间运算完成、所需资源已获得)时再对客户端进行响应。

compareChangedGroup 方法执行的流程图如下:

img.png

soul admin 中, org.dromara.soul.admin.listener.DataChangedListener 中定义了各种配置数据变更了之后, 会更新到内存缓存中, 内存缓存定义在 org.dromara.soul.admin.listener.AbstractDataChangedListener#CACHE 中,
soul admin 的 DataChange 事件会触发缓存的更新。org.dromara.soul.admin.listener.AbstractDataChangedListener 使用了模板方法设计模式,在更新完缓存之后会调用各个实现类继承自 org.dromara.soul.admin.listener. AbstractDataChangedListener 的抽象方法 afterMetaDataChanged, afterAppAuthChanged, afterPluginChanged, afterRuleChanged, afterSelectorChanged
HTTP 长轮询数据同步的实现类中,会在 HttpLongPollingDataChangedListener 中定义的线程池中执行一个 DataChangeTask 的任务,这个任务的实现就是给排队待轮询的轮询请求推送数据变更返回,终止轮询的。长轮询的接口最终返回的数据是哪些 ConfigGroup的内容发生了变化,
这样客户端在收到配置变更的组之后只需要调用获取对应配置组的配置数据就能获取到变更的配置信息了, 以此来达到数据同步的结果。

总结

今天学习了 soul 网关中使用 HTTP 长轮询机制实现数据同步的细节,还有些协作的细节没有写出来,明天继续学习。

文章作者: David Liu
文章链接: https://davidliu.now.sh/2021/01/22/soul_dependency_discoveryIII/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 David Liu's Blog