网站首页 文章专栏 soul源码解析(七) - http 长轮询数据同步过程
1). 跟websocket方式一样,也是通过DataSyncConfiguration配置类,以及配置文件中数据同步策略,初始化
@Configuration @ConditionalOnProperty(name = "soul.sync.http.enabled", havingValue = "true") @EnableConfigurationProperties(HttpSyncProperties.class) static class HttpLongPollingListener { @Bean @ConditionalOnMissingBean(HttpLongPollingDataChangedListener.class) public HttpLongPollingDataChangedListener httpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) { return new HttpLongPollingDataChangedListener(httpSyncProperties); } }
注意一点是,websocket的配置中加了matchIfMissing 配置,也就是就算没有相关的配置,也会初始化websocket的初始化server
@ConditionalOnProperty(name = "soul.sync.websocket.enabled", havingValue = "true", matchIfMissing = true)
2). 初始化的HttpLongPollingDataChangedListener类继承自AbstractDataChangedListener抽象类,该抽象类实现了DataChangedListener接口以及InitializingBean接口
new该类的时候会初始化,一个用于存放连接的ArrayBlockingQueue,大小为1024,以及一个定时线程池
public HttpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) { this.clients = new ArrayBlockingQueue<>(1024); this.scheduler = new ScheduledThreadPoolExecutor(1, SoulThreadFactory.create("long-polling", true)); this.httpSyncProperties = httpSyncProperties; }
3). 由于AbstractDataChangedListener抽象类实现了InitializingBean接口,则会在加载的时候执行afterPropertiesSet方法,方法中对数据库存放的数据进行缓存到本地
@Override public final void afterPropertiesSet() { updateAppAuthCache(); updatePluginCache(); updateRuleCache(); updateSelectorCache(); updateMetaDataCache(); afterInitialize(); } // 将appAuth的信息从mysql查出来,并放到内存 protected void updateAppAuthCache() { this.updateCache(ConfigGroupEnum.APP_AUTH, appAuthService.listAll()); } // 数据转为json格式,并加密 protected void updateCache(final ConfigGroupEnum group, final List data) { String json = GsonUtils.getInstance().toJson(data); ConfigDataCache newVal = new ConfigDataCache(group.name(), json, Md5Utils.md5(json), System.currentTimeMillis()); ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal); LOGGER.info("update config cache[{}], old:{}, updated:{}", group, oldVal, newVal); }
最后调用 afterInitialize() 方法,在该方法中构建一个延迟任务,每隔5分钟刷新下本地缓存
@Override protected void afterInitialize() { // 默认5分钟 long syncInterval = httpSyncProperties.getRefreshInterval().toMillis(); // Periodically check the data for changes and update the cache scheduler.scheduleWithFixedDelay(() -> { LOGGER.info("http sync strategy refresh config start."); try { this.refreshLocalCache(); LOGGER.info("http sync strategy refresh config success."); } catch (Exception e) { LOGGER.error("http sync strategy refresh config error!", e); } }, syncInterval, syncInterval, TimeUnit.MILLISECONDS); LOGGER.info("http sync strategy refresh interval: {}ms", syncInterval); }
1). 根据配置,初始化HttpSyncDataConfiguration配置类
@Configuration @ConditionalOnClass(HttpSyncDataService.class) @ConditionalOnProperty(prefix = "soul.sync.http", name = "url") @Slf4j public class HttpSyncDataConfiguration { 。。。 }
在配置类中,加载一个 HttpSyncDataService bean,该类实现了SyncDataService接口
public HttpSyncDataService(final HttpConfig httpConfig, final PluginDataSubscriber pluginDataSubscriber, final List metaDataSubscribers, final List authDataSubscribers) { factory = new DataRefreshFactory(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers); this.httpConfig = httpConfig; this.serverList = Lists.newArrayList(Splitter.on(",").split(httpConfig.getUrl())); this.start(httpConfig); }
初始化时,创建了DataRefreshFactory,并初始化了各个数据类型的刷新类,PluginDataRefresh,SelectorDataRefresh等等,这些类用于具体数据缓存实现
public DataRefreshFactory(final PluginDataSubscriber pluginDataSubscriber, final List metaDataSubscribers, final List authDataSubscribers) { ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataRefresh(pluginDataSubscriber)); ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataRefresh(pluginDataSubscriber)); ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataRefresh(pluginDataSubscriber)); ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AppAuthDataRefresh(authDataSubscribers)); ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataRefresh(metaDataSubscribers)); }
2). 在new HttpSyncDataService 时,最后调用了 start方法,在该方法中创建了一个http连接对象,readTime设置为90s,连接时间为10s
private void start(final HttpConfig httpConfig) { // init RestTemplate OkHttp3ClientHttpRequestFactory factory = new OkHttp3ClientHttpRequestFactory(); factory.setConnectTimeout((int) this.connectionTimeout.toMillis()); factory.setReadTimeout((int) HttpConstants.CLIENT_POLLING_READ_TIMEOUT); this.httpClient = new RestTemplate(factory); // It could be initialized multiple times, so you need to control that. if (RUNNING.compareAndSet(false, true)) { // fetch all group configs. this.fetchGroupConfig(ConfigGroupEnum.values()); int threadSize = serverList.size(); this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), SoulThreadFactory.create("http-long-polling", true)); // start long polling, each server creates a thread to listen for changes. this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server))); } else { log.info("soul http long polling was started, executor=[{}]", executor); } }
之后会去获取所有组配置,根据配置的ip端口信息列表去发起http请求
private void fetchGroupConfig(final ConfigGroupEnum... groups) throws SoulException { for (int index = 0; index < this.serverList.size(); index++) { String server = serverList.get(index); try { this.doFetchGroupConfig(server, groups); break; } catch (SoulException e) { // no available server, throw exception. if (index >= serverList.size() - 1) { throw e; } log.warn("fetch config fail, try another one: {}", serverList.get(index + 1)); } } }
private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) { StringBuilder params = new StringBuilder(); for (ConfigGroupEnum groupKey : groups) { params.append("groupKeys").append("=").append(groupKey.name()).append("&"); } String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&"); log.info("request configs: [{}]", url); String json = null; try { json = this.httpClient.getForObject(url, String.class); } catch (RestClientException e) { String message = String.format("fetch config fail from server[%s], %s", url, e.getMessage()); log.warn(message); throw new SoulException(message, e); } // update local cache boolean updated = this.updateCacheWithJson(json); if (updated) { log.info("get latest configs: [{}]", json); return; } // not updated. it is likely that the current config server has not been updated yet. wait a moment. log.info("The config of the server[{}] has not been updated or is out of date. Wait for 30s to listen for changes again.", server); ThreadUtils.sleep(TimeUnit.SECONDS, 30); }
拼装url以及参数信息
http://localhost:9095/configs/fetch?groupKeys=APP_AUTH&groupKeys=PLUGIN&groupKeys=RULE&groupKeys=SELECTOR&groupKeys=META_DATA
发起请求,拿到数据后,执行本地缓存
boolean updated = this.updateCacheWithJson(json);
根据初始化的配置组信息,遍历并刷新内存数据
public boolean executor(final JsonObject data) { final boolean[] success = {false}; ENUM_MAP.values().parallelStream().forEach(dataRefresh -> success[0] = dataRefresh.refresh(data)); return success[0]; } @Override public Boolean refresh(final JsonObject data) { boolean updated = false; JsonObject jsonObject = convert(data); if (null != jsonObject) { ConfigData result = fromJson(jsonObject); if (this.updateCacheIfNeed(result)) { updated = true; refresh(result.getData()); } } return updated; }
里面的 convent方法,fromJson方法,refresh方法都是具体ENUM_MAP中最早放进去的具体PluginDataRefresh,SelectorDataRefresh等实现类实现的,感觉设计的挺巧妙的
同时,刷新前,会去判断是否需要刷新,需要才刷新
3). 在全量获取数据后,根据admin数量启动相应的HttpLongPollingTask线程,不停的执行doLongPulling方法
while (RUNNING.get()) { for (int time = 1; time <= retryTimes; time++) { try { doLongPolling(server); ...
doLongPulling方法中,首先拼接url以及参数,发送 http://localhost:9095/configs/listener 请求,服务端接受到请求后阻塞住请求,如果配置数据更改,则会立即响应更改的组信息。否则,客户端的请求线程将被阻塞,直到数据发生变化或达到指定的超时时间。
4). http 请求到达 sou-admin 之后,并非立马响应数据,而是利用 Servlet3.0 的异步机制,异步响应数据。首先,将长轮询请求任务 LongPollingClient 扔到 BlocingQueue 中,并且开启调度任务,60s 后执行,这样做的目的是 60s 后将该长轮询请求移除队列,即便是这段时间内没有发生配置数据变更
public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) { // 因为soul-web可能未收到某个配置变更的通知,因此MD5值可能不一致,则立即响应 List changedGroup = compareMD5(request); String clientIp = getRemoteIp(request); if (CollectionUtils.isNotEmpty(changedGroup)) { this.generateResponse(response, changedGroup); return; } // Servlet3.0异步响应http请求 final AsyncContext asyncContext = request.startAsync(); asyncContext.setTimeout(0L); scheduler.execute(new LongPollingClient(asyncContext, clientIp, 60)); }class LongPollingClient implements Runnable { LongPollingClient(final AsyncContext ac, final String ip, final long timeoutTime) { // 省略...... } @Override public void run() { // 加入定时任务,如果60s之内没有配置变更,则60s后执行,响应http请求 this.asyncTimeoutFuture = scheduler.schedule(() -> { // clients是阻塞队列,保存了来自soul-web的请求信息 clients.remove(LongPollingClient.this); List changedGroups = HttpLongPollingDataChangedListener.compareMD5((HttpServletRequest) asyncContext.getRequest()); sendResponse(changedGroups); }, timeoutTime, TimeUnit.MILLISECONDS); // clients.add(this); } }
5). 如果这段时间内,管理员变更了配置数据,此时,会挨个移除队列中的长轮询请求,并响应数据,告知是哪个 Group 的数据发生了变更(我们将插件、规则、流量配置、用户配置数据分成不同的组)。网关收到响应信息之后,只知道是哪个 Group 发生了配置变更,还需要再次请求该 Group 的配置数据。有人会问,为什么不是直接将变更的数据写出?因为 http 长轮询机制只能保证准实时,如果在网关层处理不及时,或者管理员频繁更新配置,很有可能便错过了某个配置变更的推送,安全起见,只告知某个 Group 信息发生了变更。
// soul-admin发生了配置变更,挨个将队列中的请求移除,并予以响应class DataChangeTask implements Runnable { DataChangeTask(final ConfigGroupEnum groupKey) { this.groupKey = groupKey; } @Override public void run() { try { for (Iterator iter = clients.iterator(); iter.hasNext(); ) { LongPollingClient client = iter.next(); iter.remove(); client.sendResponse(Collections.singletonList(groupKey)); } } catch (Throwable e) { LOGGER.error("data change error.", e); } } }
6). 网关层接收到 http 响应信息之后,拉取变更信息(如果有变更的话),然后再次请求 soul-admin 的配置服务
if (groupJson != null) { // fetch group configuration async. ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class); if (ArrayUtils.isNotEmpty(changedGroups)) { log.info("Group config changed: {}", Arrays.toString(changedGroups)); this.doFetchGroupConfig(server, changedGroups); } }
调用 doFetchGroupConfig 方法拉去数据,并同步到本地。
版权声明:本文由星尘阁原创出品,转载请注明出处!