网站首页 文章专栏 soul源码解析(六) - websocket 数据同步过程
soul源码解析(六) - websocket 数据同步过程

一. websocket在admin模块(service)的初始化

    在admin模块的 DataSyncConfiguration 配置类中,有一个静态内部类:WebsocketListener类,初始化条件为soul.sync.websocket.enable为true

@Configuration
@ConditionalOnProperty(name = "soul.sync.websocket.enabled", havingValue = "true", matchIfMissing = true)
@EnableConfigurationProperties(WebsocketSyncProperties.class)
static class WebsocketListener {

    /**
     * Config event listener data changed listener.
     *
     * @return the data changed listener
     */
    @Bean
    @ConditionalOnMissingBean(WebsocketDataChangedListener.class)
    public DataChangedListener websocketDataChangedListener() {
        return new WebsocketDataChangedListener();
    }

    /**
     * Websocket collector websocket collector.
     *
     * @return the websocket collector
     */
    @Bean
    @ConditionalOnMissingBean(WebsocketCollector.class)
    public WebsocketCollector websocketCollector() {
        return new WebsocketCollector();
    }

    /**
     * Server endpoint exporter server endpoint exporter.
     *
     * @return the server endpoint exporter
     */
    @Bean
    @ConditionalOnMissingBean(ServerEndpointExporter.class)
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

    同时初始化了3个bean,WebsocketDataChangedListener,ServerEndpointExporter,WebsocketCollector

1). 其中 WebsocketDataChangedListener 实现了 DataChangedListener 接口,在自定义的DataChangeEvent事件发生改变时,执行相应的动作。

2). ServerEndpointExporter这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint

3). WebsocketCollector为 websocket的服务端,因为WebSocket是类似客户端服务端的形式(采用ws协议),那么这里的@ServerEndpoint("/websocket")启用即可,然后在里面实现@OnOpen开启连接,@onClose关闭连接,@onMessage接收消息等方法。


使用一个set存储所有的连接,并在有信息发送,且类型为MYSELF时,进行全量的数据同步

private static final Set SESSION_SET = new CopyOnWriteArraySet<>();

private static Session session;

@OnOpen
public void onOpen(final Session session) {
    LOGGER.info("websocket on open successful....");
    SESSION_SET.add(session);
}

@OnMessage
public void onMessage(final String message, final Session session) {
    if (message.equals(DataEventTypeEnum.MYSELF.name())) {
        WebsocketCollector.session = session;
        SpringBeanUtils.getInstance().getBean(SyncDataService.class).syncAll(DataEventTypeEnum.MYSELF);
    }
}

public static void send(final String message, final DataEventTypeEnum type) {
    if (StringUtils.isNotBlank(message)) {
        if (DataEventTypeEnum.MYSELF == type) {
            try {
                session.getBasicRemote().sendText(message);
            } catch (IOException e) {
                LOGGER.error("websocket send result is exception :", e);
            }
            return;
        }
        for (Session session : SESSION_SET) {
            try {
                session.getBasicRemote().sendText(message);
            } catch (IOException e) {
                LOGGER.error("websocket send result is exception :", e);
            }
        }
    }
}

具体的发送数据方法为:

@Override
public boolean syncAll(final DataEventTypeEnum type) {
    appAuthService.syncData();
    List pluginDataList = pluginService.listAll();
    eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList));
    List selectorDataList = selectorService.listAll();
    eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList));
    List ruleDataList = ruleService.listAll();
    eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList));
    metaDataService.syncData();
    return true;
}

1>. 先同步appAuth相关的数据,具体行为就是去数据库查出所有的数据,发送一个配置组为APP_AUTH,事件类型为REFRUSH的DataChangeEvent 事件

2>. 同上,查询插件数据,选择器数据,规则数据,以及meteData,发送相应的事件

3>. 在 DataChangeEventDispatcher 类中,监听相应的配置组,根据具体的协议,使用http,zk,nacos,websocket等进行具体的发送,也就是上面我们初始化的 WebsocketDataChangedListener

public void onApplicationEvent(final DataChangedEvent event) {
    for (DataChangedListener listener : listeners) {
        switch (event.getGroupKey()) {
            case APP_AUTH:
                listener.onAppAuthChanged((List) event.getSource(), event.getEventType());
                break;
            case PLUGIN:
                listener.onPluginChanged((List) event.getSource(), event.getEventType());
                break;
            case RULE:
                listener.onRuleChanged((List) event.getSource(), event.getEventType());
                break;
            case SELECTOR:
                listener.onSelectorChanged((List) event.getSource(), event.getEventType());
                break;
            case META_DATA:
                listener.onMetaDataChanged((List) event.getSource(), event.getEventType());
                break;
            default:
                throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
        }
    }
}

4>. 构建相关的数据,使用WebsocketCollector的send方法进行发送到客户端

public void onPluginChanged(final List pluginDataList, final DataEventTypeEnum eventType) {
    WebsocketData websocketData =
            new WebsocketData<>(ConfigGroupEnum.PLUGIN.name(), eventType.name(), pluginDataList);
    WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);
}


二. websocket在bootstrap模块(client)的初始化

1). 在 WebsocketSyncDataConfiguration 配置类中,当配置文件中有soul.sync.websocket配置时,进行加载。并加载一个websocketSyncDataService bean。

@Configuration
@ConditionalOnClass(WebsocketSyncDataService.class)
@ConditionalOnProperty(prefix = "soul.sync.websocket", name = "urls")
@Slf4j
public class WebsocketSyncDataConfiguration {
    
    /**
     * Websocket sync data service.
     *
     * @param websocketConfig   the websocket config
     * @param pluginSubscriber the plugin subscriber
     * @param metaSubscribers   the meta subscribers
     * @param authSubscribers   the auth subscribers
     * @return the sync data service
     */
    @Bean
    public SyncDataService websocketSyncDataService(final ObjectProvider websocketConfig, final ObjectProvider pluginSubscriber,
                                           final ObjectProvider<List> metaSubscribers, final ObjectProvider<List> authSubscribers) {
        log.info("you use websocket sync soul data.......");
        return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),
                metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
    }
    
    ...    
}

2). WebsocketSyncDataService实现了SyncDataService接口,根据配置文件的配置,通过new SoulWebsocketClient初始化多个client,并进行连接

public WebsocketSyncDataService(final WebsocketConfig websocketConfig,
                                final PluginDataSubscriber pluginDataSubscriber,
                                final List metaDataSubscribers,
                                final List authDataSubscribers) {
    String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");
    executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true));
    for (String url : urls) {
        try {
            clients.add(new SoulWebsocketClient(new URI(url), pluginDataSubscriber, metaDataSubscribers, authDataSubscribers));
        } catch (URISyntaxException e) {
            log.error("websocket url is error :", e);
        }
    }
    
    try {
        for (WebSocketClient client : clients) {
            boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);
            if (success) {
                log.info("websocket connection is successful.....");
            } else {
                log.error("websocket connection is error.....");
            }
            executor.scheduleAtFixedRate(() -> {
                try {
                    if (client.isClosed()) {
                        boolean reconnectSuccess = client.reconnectBlocking();
                        if (reconnectSuccess) {
                            log.info("websocket reconnect is successful.....");
                        } else {
                            log.error("websocket reconnection is error.....");
                        }
                    }
                } catch (InterruptedException e) {
                    log.error("websocket connect is error :{}", e.getMessage());
                }
            }, 10, 30, TimeUnit.SECONDS);
        }
        /* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/
        
    } catch (InterruptedException e) {
        log.info("websocket connection...exception....", e);
    }
    
}

3).  SoulWebsocketClient继承自WebSocketClient,在初始化时,向服务端发送类型的MYSELF的消息,进行全量数据同步

@Override
public void onOpen(final ServerHandshake serverHandshake) {
    if (!alreadySync) {
        send(DataEventTypeEnum.MYSELF.name());
        alreadySync = true;
    }
}


4).  SoulWebsocketClient 在收到服务端发送的消息后,在自己的内存中进行数据存储

@Override
public void onMessage(final String result) {
    handleResult(result);
}

private void handleResult(final String result) {
    WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);
    ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());
    String eventType = websocketData.getEventType();
    String json = GsonUtils.getInstance().toJson(websocketData.getData());
    websocketDataHandler.executor(groupEnum, json, eventType);
}

// 根据类型获取相关的key并处理
public void executor(final ConfigGroupEnum type, final String json, final String eventType) {
    ENUM_MAP.get(type).handle(json, eventType);
}

@Override
public void handle(final String json, final String eventType) {
    List dataList = convert(json);
    if (CollectionUtils.isNotEmpty(dataList)) {
        DataEventTypeEnum eventTypeEnum = DataEventTypeEnum.acquireByName(eventType);
        switch (eventTypeEnum) {
            case REFRESH:
            case MYSELF:
                doRefresh(dataList);
                break;
            case UPDATE:
            case CREATE:
                doUpdate(dataList);
                break;
            case DELETE:
                doDelete(dataList);
                break;
            default:
                break;
        }
    }
}

5). 具体的doRefrush方法,是遍历数据列表的每一条数据,并在实际存储结构PLUGIN_MAP里面(结构为ConcurrentMap),先进行移除,再进行数据的缓存,并初始化相关的插件数据

@Override
protected void doRefresh(final List dataList) {
    // 移除原来map中的数据
    pluginDataSubscriber.refreshPluginDataSelf(dataList);
    // 重新缓存数据
    dataList.forEach(pluginDataSubscriber::onSubscribe);
}



三. 总结

1. 全量数据同步

    1). admin端与bootstrap端分别根据配置文件,进行服务端,客户端的初始化

    2). bootstrap端启动时,发送全量数据同步的请求

    3). admin端接受到全量请求后,查询自己mysql中的全量数据进行发送

    4). bootstrap端收到全量数据,先把自己内存中原来的数据全部清除,再进行缓存


2. 增量数据同步

    5). admin端在页面进行配置后,或者显示的在页面点击同步时,会在admin发送相应的 DatachangeEvent事件

    6). admin端的DataChangeEventDispatcher 收到事件后,根据具体的协议,使用http,zk,nacos,websocket等进行具体的消息发送

    7). bootstrap端收到消息,跟全量流程一样,缓存到自己的内存中


至此,数据同步的流程已经全部梳理完毕。



版权声明:本文由星尘阁原创出品,转载请注明出处!

本文链接:http://www.52xingchen.cn/detail/62




赞助本站,网站的发展离不开你们的支持!
来说两句吧
大侠留个名吧,或者可以使用QQ登录。
: 您已登陆!可以继续留言。
最新评论