/** * If the configuration data changes, the group information for the change is immediately responded. * Otherwise, the client's request thread is blocked until any data changes or the specified timeout is reached. * * @param request the request * @param response the response */ publicvoiddoLongPolling(final HttpServletRequest request, final HttpServletResponse response){
@Override protectedvoidafterAppAuthChanged(final List<AppAuthData> changed, final DataEventTypeEnum eventType){ scheduler.execute(new DataChangeTask(ConfigGroupEnum.APP_AUTH)); }
@Override protectedvoidafterMetaDataChanged(final List<MetaData> changed, final DataEventTypeEnum eventType){ scheduler.execute(new DataChangeTask(ConfigGroupEnum.META_DATA)); }
@Override protectedvoidafterPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType){ scheduler.execute(new DataChangeTask(ConfigGroupEnum.PLUGIN)); }
@Override protectedvoidafterRuleChanged(final List<RuleData> changed, final DataEventTypeEnum eventType){ scheduler.execute(new DataChangeTask(ConfigGroupEnum.RULE)); }
@Override protectedvoidafterSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType){ scheduler.execute(new DataChangeTask(ConfigGroupEnum.SELECTOR)); }
private List<ConfigGroupEnum> compareChangedGroup(final HttpServletRequest request){ List<ConfigGroupEnum> changedGroup = new ArrayList<>(4); for (ConfigGroupEnum group : ConfigGroupEnum.values()) { // md5,lastModifyTime String[] params = StringUtils.split(request.getParameter(group.name()), ','); if (params == null || params.length != 2) { thrownew SoulException("group param invalid:" + request.getParameter(group.name())); } String clientMd5 = params[0]; long clientModifyTime = NumberUtils.toLong(params[1]); ConfigDataCache serverCache = CACHE.get(group.name()); // do check. if (this.checkCacheDelayAndUpdate(serverCache, clientMd5, clientModifyTime)) { changedGroup.add(group); } } return changedGroup; }
/** * check whether the client needs to update the cache. * @param serverCache the admin local cache * @param clientMd5 the client md5 value * @param clientModifyTime the client last modify time * @return true: the client needs to be updated, false: not need. */ privatebooleancheckCacheDelayAndUpdate(final ConfigDataCache serverCache, final String clientMd5, finallong clientModifyTime){
// is the same, doesn't need to be updated if (StringUtils.equals(clientMd5, serverCache.getMd5())) { returnfalse; }
// if the md5 value is different, it is necessary to compare lastModifyTime. long lastModifyTime = serverCache.getLastModifyTime(); if (lastModifyTime >= clientModifyTime) { // the client's config is out of date. returntrue; }
// the lastModifyTime before client, then the local cache needs to be updated. // Considering the concurrency problem, admin must lock, // otherwise it may cause the request from soul-web to update the cache concurrently, causing excessive db pressure boolean locked = false; try { locked = LOCK.tryLock(5, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); returntrue; } if (locked) { try { ConfigDataCache latest = CACHE.get(serverCache.getGroup()); if (latest != serverCache) { // the cache of admin was updated. if the md5 value is the same, there's no need to update. return !StringUtils.equals(clientMd5, latest.getMd5()); } // load cache from db. this.refreshLocalCache(); latest = CACHE.get(serverCache.getGroup()); return !StringUtils.equals(clientMd5, latest.getMd5()); } finally { LOCK.unlock(); } }
// not locked, the client need to be updated. returntrue;
/** * When a group's data changes, the thread is created to notify the client asynchronously. */ classDataChangeTaskimplementsRunnable{
/** * The Group where the data has changed. */ privatefinal ConfigGroupEnum groupKey;
/** * The Change time. */ privatefinallong changeTime = System.currentTimeMillis();
/** * Instantiates a new Data change task. * * @param groupKey the group key */ DataChangeTask(final ConfigGroupEnum groupKey) { this.groupKey = groupKey; }
@Override publicvoidrun(){ for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext();) { LongPollingClient client = iter.next(); iter.remove(); client.sendResponse(Collections.singletonList(groupKey)); log.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime); } } } classLongPollingClientimplementsRunnable{
/** * The Async context. */ privatefinal AsyncContext asyncContext;
/** * The Ip. */ privatefinal String ip;
/** * The Timeout time. */ privatefinallong timeoutTime;
/** * The Async timeout future. */ private Future<?> asyncTimeoutFuture;
/** * Instantiates a new Long polling client. * * @param ac the ac * @param ip the ip * @param timeoutTime the timeout time */ LongPollingClient(final AsyncContext ac, final String ip, finallong timeoutTime) { this.asyncContext = ac; this.ip = ip; this.timeoutTime = timeoutTime; }