网站首页 文章专栏 soul源码解析(六) - websocket 数据同步过程
在admin模块的 DataSyncConfiguration 配置类中,有一个静态内部类:WebsocketListener类,初始化条件为soul.sync.websocket.enable为true
@Configuration @ConditionalOnProperty(name = "soul.sync.websocket.enabled", havingValue = "true", matchIfMissing = true) @EnableConfigurationProperties(WebsocketSyncProperties.class) static class WebsocketListener { /** * Config event listener data changed listener. * * @return the data changed listener */ @Bean @ConditionalOnMissingBean(WebsocketDataChangedListener.class) public DataChangedListener websocketDataChangedListener() { return new WebsocketDataChangedListener(); } /** * Websocket collector websocket collector. * * @return the websocket collector */ @Bean @ConditionalOnMissingBean(WebsocketCollector.class) public WebsocketCollector websocketCollector() { return new WebsocketCollector(); } /** * Server endpoint exporter server endpoint exporter. * * @return the server endpoint exporter */ @Bean @ConditionalOnMissingBean(ServerEndpointExporter.class) public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
同时初始化了3个bean,WebsocketDataChangedListener,ServerEndpointExporter,WebsocketCollector
1). 其中 WebsocketDataChangedListener 实现了 DataChangedListener 接口,在自定义的DataChangeEvent事件发生改变时,执行相应的动作。
2). ServerEndpointExporter这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
3). WebsocketCollector为 websocket的服务端,因为WebSocket是类似客户端服务端的形式(采用ws协议),那么这里的@ServerEndpoint("/websocket")启用即可,然后在里面实现@OnOpen开启连接,@onClose关闭连接,@onMessage接收消息等方法。
使用一个set存储所有的连接,并在有信息发送,且类型为MYSELF时,进行全量的数据同步
private static final Set SESSION_SET = new CopyOnWriteArraySet<>(); private static Session session; @OnOpen public void onOpen(final Session session) { LOGGER.info("websocket on open successful...."); SESSION_SET.add(session); } @OnMessage public void onMessage(final String message, final Session session) { if (message.equals(DataEventTypeEnum.MYSELF.name())) { WebsocketCollector.session = session; SpringBeanUtils.getInstance().getBean(SyncDataService.class).syncAll(DataEventTypeEnum.MYSELF); } } public static void send(final String message, final DataEventTypeEnum type) { if (StringUtils.isNotBlank(message)) { if (DataEventTypeEnum.MYSELF == type) { try { session.getBasicRemote().sendText(message); } catch (IOException e) { LOGGER.error("websocket send result is exception :", e); } return; } for (Session session : SESSION_SET) { try { session.getBasicRemote().sendText(message); } catch (IOException e) { LOGGER.error("websocket send result is exception :", e); } } } }
具体的发送数据方法为:
@Override public boolean syncAll(final DataEventTypeEnum type) { appAuthService.syncData(); List pluginDataList = pluginService.listAll(); eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList)); List selectorDataList = selectorService.listAll(); eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList)); List ruleDataList = ruleService.listAll(); eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList)); metaDataService.syncData(); return true; }
1>. 先同步appAuth相关的数据,具体行为就是去数据库查出所有的数据,发送一个配置组为APP_AUTH,事件类型为REFRUSH的DataChangeEvent 事件
2>. 同上,查询插件数据,选择器数据,规则数据,以及meteData,发送相应的事件
3>. 在 DataChangeEventDispatcher 类中,监听相应的配置组,根据具体的协议,使用http,zk,nacos,websocket等进行具体的发送,也就是上面我们初始化的 WebsocketDataChangedListener
public void onApplicationEvent(final DataChangedEvent event) { for (DataChangedListener listener : listeners) { switch (event.getGroupKey()) { case APP_AUTH: listener.onAppAuthChanged((List) event.getSource(), event.getEventType()); break; case PLUGIN: listener.onPluginChanged((List) event.getSource(), event.getEventType()); break; case RULE: listener.onRuleChanged((List) event.getSource(), event.getEventType()); break; case SELECTOR: listener.onSelectorChanged((List) event.getSource(), event.getEventType()); break; case META_DATA: listener.onMetaDataChanged((List) event.getSource(), event.getEventType()); break; default: throw new IllegalStateException("Unexpected value: " + event.getGroupKey()); } } }
4>. 构建相关的数据,使用WebsocketCollector的send方法进行发送到客户端
public void onPluginChanged(final List pluginDataList, final DataEventTypeEnum eventType) { WebsocketData websocketData = new WebsocketData<>(ConfigGroupEnum.PLUGIN.name(), eventType.name(), pluginDataList); WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType); }
1). 在 WebsocketSyncDataConfiguration 配置类中,当配置文件中有soul.sync.websocket配置时,进行加载。并加载一个websocketSyncDataService bean。
@Configuration @ConditionalOnClass(WebsocketSyncDataService.class) @ConditionalOnProperty(prefix = "soul.sync.websocket", name = "urls") @Slf4j public class WebsocketSyncDataConfiguration { /** * Websocket sync data service. * * @param websocketConfig the websocket config * @param pluginSubscriber the plugin subscriber * @param metaSubscribers the meta subscribers * @param authSubscribers the auth subscribers * @return the sync data service */ @Bean public SyncDataService websocketSyncDataService(final ObjectProvider websocketConfig, final ObjectProvider pluginSubscriber, final ObjectProvider<List> metaSubscribers, final ObjectProvider<List> authSubscribers) { log.info("you use websocket sync soul data......."); return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(), metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList)); } ... }
2). WebsocketSyncDataService实现了SyncDataService接口,根据配置文件的配置,通过new SoulWebsocketClient初始化多个client,并进行连接
public WebsocketSyncDataService(final WebsocketConfig websocketConfig, final PluginDataSubscriber pluginDataSubscriber, final List metaDataSubscribers, final List authDataSubscribers) { String[] urls = StringUtils.split(websocketConfig.getUrls(), ","); executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true)); for (String url : urls) { try { clients.add(new SoulWebsocketClient(new URI(url), pluginDataSubscriber, metaDataSubscribers, authDataSubscribers)); } catch (URISyntaxException e) { log.error("websocket url is error :", e); } } try { for (WebSocketClient client : clients) { boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS); if (success) { log.info("websocket connection is successful....."); } else { log.error("websocket connection is error....."); } executor.scheduleAtFixedRate(() -> { try { if (client.isClosed()) { boolean reconnectSuccess = client.reconnectBlocking(); if (reconnectSuccess) { log.info("websocket reconnect is successful....."); } else { log.error("websocket reconnection is error....."); } } } catch (InterruptedException e) { log.error("websocket connect is error :{}", e.getMessage()); } }, 10, 30, TimeUnit.SECONDS); } /* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/ } catch (InterruptedException e) { log.info("websocket connection...exception....", e); } }
3). SoulWebsocketClient继承自WebSocketClient,在初始化时,向服务端发送类型的MYSELF的消息,进行全量数据同步
@Override public void onOpen(final ServerHandshake serverHandshake) { if (!alreadySync) { send(DataEventTypeEnum.MYSELF.name()); alreadySync = true; } }
4). SoulWebsocketClient 在收到服务端发送的消息后,在自己的内存中进行数据存储
@Override public void onMessage(final String result) { handleResult(result); } private void handleResult(final String result) { WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class); ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType()); String eventType = websocketData.getEventType(); String json = GsonUtils.getInstance().toJson(websocketData.getData()); websocketDataHandler.executor(groupEnum, json, eventType); } // 根据类型获取相关的key并处理 public void executor(final ConfigGroupEnum type, final String json, final String eventType) { ENUM_MAP.get(type).handle(json, eventType); } @Override public void handle(final String json, final String eventType) { List dataList = convert(json); if (CollectionUtils.isNotEmpty(dataList)) { DataEventTypeEnum eventTypeEnum = DataEventTypeEnum.acquireByName(eventType); switch (eventTypeEnum) { case REFRESH: case MYSELF: doRefresh(dataList); break; case UPDATE: case CREATE: doUpdate(dataList); break; case DELETE: doDelete(dataList); break; default: break; } } }
5). 具体的doRefrush方法,是遍历数据列表的每一条数据,并在实际存储结构PLUGIN_MAP里面(结构为ConcurrentMap),先进行移除,再进行数据的缓存,并初始化相关的插件数据
@Override protected void doRefresh(final List dataList) { // 移除原来map中的数据 pluginDataSubscriber.refreshPluginDataSelf(dataList); // 重新缓存数据 dataList.forEach(pluginDataSubscriber::onSubscribe); }
1. 全量数据同步
1). admin端与bootstrap端分别根据配置文件,进行服务端,客户端的初始化
2). bootstrap端启动时,发送全量数据同步的请求
3). admin端接受到全量请求后,查询自己mysql中的全量数据进行发送
4). bootstrap端收到全量数据,先把自己内存中原来的数据全部清除,再进行缓存
2. 增量数据同步
5). admin端在页面进行配置后,或者显示的在页面点击同步时,会在admin发送相应的 DatachangeEvent事件
6). admin端的DataChangeEventDispatcher 收到事件后,根据具体的协议,使用http,zk,nacos,websocket等进行具体的消息发送
7). bootstrap端收到消息,跟全量流程一样,缓存到自己的内存中
至此,数据同步的流程已经全部梳理完毕。
版权声明:本文由星尘阁原创出品,转载请注明出处!