网站首页 文章专栏 soul源码解析(六) - websocket 数据同步过程
在admin模块的 DataSyncConfiguration 配置类中,有一个静态内部类:WebsocketListener类,初始化条件为soul.sync.websocket.enable为true
@Configuration
@ConditionalOnProperty(name = "soul.sync.websocket.enabled", havingValue = "true", matchIfMissing = true)
@EnableConfigurationProperties(WebsocketSyncProperties.class)
static class WebsocketListener {
/**
* Config event listener data changed listener.
*
* @return the data changed listener
*/
@Bean
@ConditionalOnMissingBean(WebsocketDataChangedListener.class)
public DataChangedListener websocketDataChangedListener() {
return new WebsocketDataChangedListener();
}
/**
* Websocket collector websocket collector.
*
* @return the websocket collector
*/
@Bean
@ConditionalOnMissingBean(WebsocketCollector.class)
public WebsocketCollector websocketCollector() {
return new WebsocketCollector();
}
/**
* Server endpoint exporter server endpoint exporter.
*
* @return the server endpoint exporter
*/
@Bean
@ConditionalOnMissingBean(ServerEndpointExporter.class)
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}同时初始化了3个bean,WebsocketDataChangedListener,ServerEndpointExporter,WebsocketCollector
1). 其中 WebsocketDataChangedListener 实现了 DataChangedListener 接口,在自定义的DataChangeEvent事件发生改变时,执行相应的动作。
2). ServerEndpointExporter这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
3). WebsocketCollector为 websocket的服务端,因为WebSocket是类似客户端服务端的形式(采用ws协议),那么这里的@ServerEndpoint("/websocket")启用即可,然后在里面实现@OnOpen开启连接,@onClose关闭连接,@onMessage接收消息等方法。
使用一个set存储所有的连接,并在有信息发送,且类型为MYSELF时,进行全量的数据同步
private static final Set SESSION_SET = new CopyOnWriteArraySet<>();
private static Session session;
@OnOpen
public void onOpen(final Session session) {
LOGGER.info("websocket on open successful....");
SESSION_SET.add(session);
}
@OnMessage
public void onMessage(final String message, final Session session) {
if (message.equals(DataEventTypeEnum.MYSELF.name())) {
WebsocketCollector.session = session;
SpringBeanUtils.getInstance().getBean(SyncDataService.class).syncAll(DataEventTypeEnum.MYSELF);
}
}
public static void send(final String message, final DataEventTypeEnum type) {
if (StringUtils.isNotBlank(message)) {
if (DataEventTypeEnum.MYSELF == type) {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
LOGGER.error("websocket send result is exception :", e);
}
return;
}
for (Session session : SESSION_SET) {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
LOGGER.error("websocket send result is exception :", e);
}
}
}
}具体的发送数据方法为:
@Override
public boolean syncAll(final DataEventTypeEnum type) {
appAuthService.syncData();
List pluginDataList = pluginService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList));
List selectorDataList = selectorService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList));
List ruleDataList = ruleService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList));
metaDataService.syncData();
return true;
}1>. 先同步appAuth相关的数据,具体行为就是去数据库查出所有的数据,发送一个配置组为APP_AUTH,事件类型为REFRUSH的DataChangeEvent 事件
2>. 同上,查询插件数据,选择器数据,规则数据,以及meteData,发送相应的事件
3>. 在 DataChangeEventDispatcher 类中,监听相应的配置组,根据具体的协议,使用http,zk,nacos,websocket等进行具体的发送,也就是上面我们初始化的 WebsocketDataChangedListener
public void onApplicationEvent(final DataChangedEvent event) {
for (DataChangedListener listener : listeners) {
switch (event.getGroupKey()) {
case APP_AUTH:
listener.onAppAuthChanged((List) event.getSource(), event.getEventType());
break;
case PLUGIN:
listener.onPluginChanged((List) event.getSource(), event.getEventType());
break;
case RULE:
listener.onRuleChanged((List) event.getSource(), event.getEventType());
break;
case SELECTOR:
listener.onSelectorChanged((List) event.getSource(), event.getEventType());
break;
case META_DATA:
listener.onMetaDataChanged((List) event.getSource(), event.getEventType());
break;
default:
throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
}
}
}4>. 构建相关的数据,使用WebsocketCollector的send方法进行发送到客户端
public void onPluginChanged(final List pluginDataList, final DataEventTypeEnum eventType) {
WebsocketData websocketData =
new WebsocketData<>(ConfigGroupEnum.PLUGIN.name(), eventType.name(), pluginDataList);
WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);
}1). 在 WebsocketSyncDataConfiguration 配置类中,当配置文件中有soul.sync.websocket配置时,进行加载。并加载一个websocketSyncDataService bean。
@Configuration
@ConditionalOnClass(WebsocketSyncDataService.class)
@ConditionalOnProperty(prefix = "soul.sync.websocket", name = "urls")
@Slf4j
public class WebsocketSyncDataConfiguration {
/**
* Websocket sync data service.
*
* @param websocketConfig the websocket config
* @param pluginSubscriber the plugin subscriber
* @param metaSubscribers the meta subscribers
* @param authSubscribers the auth subscribers
* @return the sync data service
*/
@Bean
public SyncDataService websocketSyncDataService(final ObjectProvider websocketConfig, final ObjectProvider pluginSubscriber,
final ObjectProvider<List> metaSubscribers, final ObjectProvider<List> authSubscribers) {
log.info("you use websocket sync soul data.......");
return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),
metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
}
...
}2). WebsocketSyncDataService实现了SyncDataService接口,根据配置文件的配置,通过new SoulWebsocketClient初始化多个client,并进行连接
public WebsocketSyncDataService(final WebsocketConfig websocketConfig,
final PluginDataSubscriber pluginDataSubscriber,
final List metaDataSubscribers,
final List authDataSubscribers) {
String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");
executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true));
for (String url : urls) {
try {
clients.add(new SoulWebsocketClient(new URI(url), pluginDataSubscriber, metaDataSubscribers, authDataSubscribers));
} catch (URISyntaxException e) {
log.error("websocket url is error :", e);
}
}
try {
for (WebSocketClient client : clients) {
boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);
if (success) {
log.info("websocket connection is successful.....");
} else {
log.error("websocket connection is error.....");
}
executor.scheduleAtFixedRate(() -> {
try {
if (client.isClosed()) {
boolean reconnectSuccess = client.reconnectBlocking();
if (reconnectSuccess) {
log.info("websocket reconnect is successful.....");
} else {
log.error("websocket reconnection is error.....");
}
}
} catch (InterruptedException e) {
log.error("websocket connect is error :{}", e.getMessage());
}
}, 10, 30, TimeUnit.SECONDS);
}
/* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/
} catch (InterruptedException e) {
log.info("websocket connection...exception....", e);
}
}3). SoulWebsocketClient继承自WebSocketClient,在初始化时,向服务端发送类型的MYSELF的消息,进行全量数据同步
@Override
public void onOpen(final ServerHandshake serverHandshake) {
if (!alreadySync) {
send(DataEventTypeEnum.MYSELF.name());
alreadySync = true;
}
}4). SoulWebsocketClient 在收到服务端发送的消息后,在自己的内存中进行数据存储
@Override
public void onMessage(final String result) {
handleResult(result);
}
private void handleResult(final String result) {
WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);
ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());
String eventType = websocketData.getEventType();
String json = GsonUtils.getInstance().toJson(websocketData.getData());
websocketDataHandler.executor(groupEnum, json, eventType);
}
// 根据类型获取相关的key并处理
public void executor(final ConfigGroupEnum type, final String json, final String eventType) {
ENUM_MAP.get(type).handle(json, eventType);
}
@Override
public void handle(final String json, final String eventType) {
List dataList = convert(json);
if (CollectionUtils.isNotEmpty(dataList)) {
DataEventTypeEnum eventTypeEnum = DataEventTypeEnum.acquireByName(eventType);
switch (eventTypeEnum) {
case REFRESH:
case MYSELF:
doRefresh(dataList);
break;
case UPDATE:
case CREATE:
doUpdate(dataList);
break;
case DELETE:
doDelete(dataList);
break;
default:
break;
}
}
}5). 具体的doRefrush方法,是遍历数据列表的每一条数据,并在实际存储结构PLUGIN_MAP里面(结构为ConcurrentMap),先进行移除,再进行数据的缓存,并初始化相关的插件数据
@Override
protected void doRefresh(final List dataList) {
// 移除原来map中的数据
pluginDataSubscriber.refreshPluginDataSelf(dataList);
// 重新缓存数据
dataList.forEach(pluginDataSubscriber::onSubscribe);
}1. 全量数据同步
1). admin端与bootstrap端分别根据配置文件,进行服务端,客户端的初始化
2). bootstrap端启动时,发送全量数据同步的请求
3). admin端接受到全量请求后,查询自己mysql中的全量数据进行发送
4). bootstrap端收到全量数据,先把自己内存中原来的数据全部清除,再进行缓存
2. 增量数据同步
5). admin端在页面进行配置后,或者显示的在页面点击同步时,会在admin发送相应的 DatachangeEvent事件
6). admin端的DataChangeEventDispatcher 收到事件后,根据具体的协议,使用http,zk,nacos,websocket等进行具体的消息发送
7). bootstrap端收到消息,跟全量流程一样,缓存到自己的内存中
至此,数据同步的流程已经全部梳理完毕。
版权声明:本文由星尘阁原创出品,转载请注明出处!