Soul 网关源码学习(6) - 从依赖的角度理解 Soul 网关 (2) - WebSocket 数据同步机制

soul admin 依赖组件

soul admin 是一个 spring web 的工程,在项目依赖中引入了 spring-boot-starter-web,前面的功能概述中也提到了 soul 的可插拔插件的特性,在 soul 的 pom 文件中查看 soul 的依赖看到了 soul-common
soul-commonpom 中定义了对 lombok, gson, guava, commons-lang3, spring-boot-starter-json, jackson-databind, slf4j-api 的依赖。

soul admin 中还有几类依赖,关于 zookeeper 相关的客户端依赖,关于 websocket 相关的依赖,nacos 配置中心相关的依赖,这些依赖和 soul admin 和网关 soul bootstrap 之间的数据同步方式有关系,前面说到过 soul 网关提供了多种数据同步的方式将数据从 soul-admin 数据库同步到 soul-bootstrap.
这些方式包括: Http 接口长轮训zookeeper watcher 机制同步, websocket 配置信息推送, nacos 注册中心同步

soul admin WebSocket 配置信息推送

WebSocket 是一种应用层网络通讯协议,可在单个连接上进行全双工通信,位于 OSI 七层模型的应用层, 其实现原理是使用了其传输层的 TCP 连接来实现全双工通讯。在 soul admin 工程中引入了依赖 spring-boot-starter-websocket, 也就是说当 soul admin 同时也作为 WebSocket Server
启动。

WebSocket 服务端

WebSocket 的 Spring Bean 在 yml 文件中的配置 soul.sync.websocket.enabled=true 配置后被注册到 Spring IoC 容器当中

@Configuration
@ConditionalOnProperty(name = "soul.sync.websocket.enabled", havingValue = "true", matchIfMissing = true)
@EnableConfigurationProperties(WebsocketSyncProperties.class)
static class WebsocketListener {

/**
* Config event listener data changed listener.
*
* @return the data changed listener
*/
@Bean
@ConditionalOnMissingBean(WebsocketDataChangedListener.class)
public DataChangedListener websocketDataChangedListener() {
// 这里初始化 WebSocket 通道数据变更监听
return new WebsocketDataChangedListener();
}

/**
* Websocket collector websocket collector.
*
* @return the websocket collector
*/
@Bean
@ConditionalOnMissingBean(WebsocketCollector.class)
public WebsocketCollector websocketCollector() {
// 实例化 WebSocket Server
return new WebsocketCollector();
}

/**
* Server endpoint exporter server endpoint exporter.
*
* @return the server endpoint exporter
*/
@Bean
@ConditionalOnMissingBean(ServerEndpointExporter.class)
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}

上面的代码声明了 WebSocket 相关的 Bean 的配置,soul admin 启动之后容器中注册了这些 Bean 之后,soul admin 同时也是 WebSocket Server 的角色,其中 WebSocket Server 相关的定义在类 WebsocketCollector 中:

@Slf4j
@ServerEndpoint("/websocket") // 采用注解 @ServerEndPoint("/websocket") 配置 WebSocket Server 通讯的路径
public class WebsocketCollector {

// 定义 WebSocket 的会话集合, 采用的 COW 类型的集合
private static final Set<Session> SESSION_SET = new CopyOnWriteArraySet<>();

private static Session session;

/**
* On open.
*
* @param session the session
*/
@OnOpen
public void onOpen(final Session session) {
log.info("websocket on open successful....");
SESSION_SET.add(session);
}

/**
* On message.
*
* @param message the message
* @param session the session
*/
@OnMessage
public void onMessage(final String message, final Session session) {
if (message.equals(DataEventTypeEnum.MYSELF.name())) {
WebsocketCollector.session = session;
SpringBeanUtils.getInstance().getBean(SyncDataService.class).syncAll(DataEventTypeEnum.MYSELF);
}
}

/**
* On close.
*
* @param session the session
*/
@OnClose
public void onClose(final Session session) {
SESSION_SET.remove(session);
WebsocketCollector.session = null;
}

/**
* On error.
*
* @param session the session
* @param error the error
*/
@OnError
public void onError(final Session session, final Throwable error) {
SESSION_SET.remove(session);
WebsocketCollector.session = null;
log.error("websocket collection error: ", error);
}

/**
* Send.
*
* @param message the message
* @param type the type
*/
public static void send(final String message, final DataEventTypeEnum type) {
if (StringUtils.isNotBlank(message)) {
// 如果 DataEventTypeEnum 的类型是 MYSELF, 经过 DEBUG 可以看到在 SoulWebSocketClient 完成握手之后会发送一个
// 这个类型的消息到 Server 以完成配置信息的初始化
if (DataEventTypeEnum.MYSELF == type) {
try {
// 在当前会话中推送消息到客户端
session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("websocket send result is exception: ", e);
}
return;
}
for (Session session : SESSION_SET) {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("websocket send result is exception: ", e);
}
}
}
}
}

以上的代码中可以看到在 WebsocketCollector 中的几个方法对应了 WebSocket 通讯的几个事件的处理:

  • OnOpen: 当有 WebSocket 客户端建立到 WebSocket Server 的连接时触发, 可以看到代码中的处理逻辑是将当前客户端到服务端的会话信息保存到 WebCollector 中定义的会话集合当中。
  • OnMessage: 当有客户端有消息发送, 这里会触发方法进行接收数据处理。
  • OnClose: 当有客户端到服务器的连接断开就会触发 onClose, 这里的处理方法是将会话集合中的会话移除,将 WebsocketCollector 私有常量 session 置为空。
  • OnError: 当客户端到服务器端的连接出现通讯异常的时候触发。

WebSocket 配置信息推送的主要方法在 org.dromara.soul.admin.listener.websocket.WebsocketCollector#send,这个方法的调用方在 org.dromara.soul.admin.listener.websocket.WebsocketDataChangedListener
org.dromara.soul.admin.listener.websocket.WebsocketDataChangedListener 作为 org.dromara.soul.admin.listener.DataChangedListener 在 Spring Event 的机制中会响应 org.dromara.soul. admin.listener.DataChangedEvent 事件,响应事件的代码在 org.dromara.soul.admin.listener.DataChangedEventDispatcher#onApplicationEvent 方法中。

可以看到 DataChangedEvent 的事件定义如下:

public class DataChangedEvent extends ApplicationEvent {

private DataEventTypeEnum eventType;

private ConfigGroupEnum groupKey;

/**
* Instantiates a new Data changed event.
*
* @param groupKey the group key
* @param type the type
* @param source the source
*/
public DataChangedEvent(final ConfigGroupEnum groupKey, final DataEventTypeEnum type, final List<?> source) {
super(source);
this.eventType = type;
this.groupKey = groupKey;
}

/**
* Gets event type.
*
* @return the event type
*/
DataEventTypeEnum getEventType() {
return eventType;
}

@Override
public List<?> getSource() {
return (List<?>) super.getSource();
}

/**
* Gets group key.
*
* @return the group key
*/
public ConfigGroupEnum getGroupKey() {
return this.groupKey;
}

}

在 soul admin 的插件, Selector, Rule 等等信息的操作后台代码中可以看到发送 DataEventType 事件, 结合上述的功能链, 就可以通过 WebSocket 完成由 WebSocket Server 到连接到 WebSocket Server 的客户端之间的配置信息推送。

WebSocket 客户端

Soul WebSocket 客户端相关的配置是通过 Spring Boot Starter : soul-spring-boot-starter-sync-data-websocket 实现的:

// Spring 容器中存在 WebsocketSyncDataService.class,
// 并且Spring 的配置 source 中存在 soul.sync.websocket.urls 才会加载该配置
@Configuration
@ConditionalOnClass(WebsocketSyncDataService.class)
@ConditionalOnProperty(prefix = "soul.sync.websocket", name = "urls")
@Slf4j
public class WebsocketSyncDataConfiguration {

/**
* Websocket sync data service.
*
* @param websocketConfig the websocket config
* @param pluginSubscriber the plugin subscriber
* @param metaSubscribers the meta subscribers
* @param authSubscribers the auth subscribers
* @return the sync data service
*/
// 创建 websocket 客户端, 可以支持多个服务端连接方式
@Bean
public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
log.info("you use websocket sync soul data.......");
return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),
metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
}

/**
* Config websocket config.
*
* @return the websocket config
*/
// 读取用户配置的 websocket urls, 支持配置多个 soul admin url
@Bean
@ConfigurationProperties(prefix = "soul.sync.websocket")
public WebsocketConfig websocketConfig() {
return new WebsocketConfig();
}
}
@Data
public class WebsocketConfig {

/**
* if have more soul admin url,please config like this.
* 127.0.0.1:8888,127.0.0.1:8889
*/
private String urls;
}

WebSocket 数据同步客户端与 WebSocket 服务端的连接处理在 WebsocketSyncDataService 中处理。

总结

今天从 soul admin 依赖的角度分析了 soul admin 中依赖所赋予 soul admin 的功能,以此带出了 soul 网关高性能的基础 - bootstrap 组件运行时依赖数据存放在内存中,这个过程中的数据同步方式,解析了通过 WebSocket 方式在 soul admin 和 soul
bootstrap 之间同步数据的功能。

文章作者: David Liu
文章链接: https://davidliu.now.sh/2021/01/20/soul_dependency_discoveryI/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 David Liu's Blog