WebSocket组件 (yudao-spring-boot-starter-websocket)
引用文件
本文引用的文件
- JsonWebSocketMessageHandler.java
- WebSocketMessageSender.java
- AbstractWebSocketMessageSender.java
- YudaoWebSocketAutoConfiguration.java
- WebSocketProperties.java
- WebSocketSessionManager.java
- WebSocketSessionManagerImpl.java
- WebSocketMessageListener.java
- JsonWebSocketMessage.java
- LoginUserHandshakeInterceptor.java
- WebSocketFrameworkUtils.java
- LocalWebSocketMessageSender.java
- RedisWebSocketMessageSender.java
- RedisWebSocketMessageConsumer.java
目录
简介
本文件面向 yudao-spring-boot-starter-websocket 的 WebSocket 组件,系统性阐述以下主题:
- 消息处理器 JsonWebSocketMessageHandler 的消息解析与路由分发机制
- 会话管理器 SessionManager 的连接维护、用户绑定、广播推送能力
- 多种消息发送器(Sender)的实现与集成:本地、Redis、Kafka、RabbitMQ、RocketMQ
- 认证机制、权限控制、心跳检测、断线重连等实时通信特性
- 扩展与自定义配置实践
项目结构
该模块位于 yudao-framework 子工程中,采用按“核心能力”分层组织:
- config:自动装配与属性配置
- core:核心处理器、监听器、消息模型、会话管理、安全工具
- sender:消息发送器抽象与多实现(本地、Redis、Kafka、RabbitMQ、RocketMQ)
- util:WebSocket 框架工具类
图表来源
- YudaoWebSocketAutoConfiguration.java
- WebSocketProperties.java
- JsonWebSocketMessageHandler.java
- WebSocketSessionManager.java
- WebSocketSessionManagerImpl.java
- WebSocketMessageListener.java
- JsonWebSocketMessage.java
- WebSocketFrameworkUtils.java
- LoginUserHandshakeInterceptor.java
- AbstractWebSocketMessageSender.java
- LocalWebSocketMessageSender.java
- RedisWebSocketMessageSender.java
- RedisWebSocketMessageConsumer.java
章节来源
核心组件
- 消息处理器:负责文本消息解析、心跳响应、按 type 路由到监听器,并在租户上下文中执行业务逻辑
- 会话管理器:维护 Session 的增删、按用户维度索引、租户隔离、广播查询
- 发送器抽象:统一发送入口,屏蔽不同传输介质差异;具体实现支持本地直发与多 MQ 广播
- 自动装配:基于属性开关与 senderType 动态启用相应功能,注册拦截器、处理器、会话管理器与各发送器 Bean
- 安全与工具:握手时注入登录用户信息,提供会话与用户信息读取工具
章节来源
- JsonWebSocketMessageHandler.java
- WebSocketSessionManager.java
- WebSocketSessionManagerImpl.java
- WebSocketMessageSender.java
- AbstractWebSocketMessageSender.java
- YudaoWebSocketAutoConfiguration.java
- LoginUserHandshakeInterceptor.java
- WebSocketFrameworkUtils.java
架构总览
WebSocket 服务由“自动装配 + 处理器 + 会话管理 + 发送器 + 安全拦截器”构成,支持本地直发与多消息中间件广播。
图表来源
- YudaoWebSocketAutoConfiguration.java
- JsonWebSocketMessageHandler.java
- WebSocketSessionManagerImpl.java
- AbstractWebSocketMessageSender.java
- RedisWebSocketMessageSender.java
组件详解
消息处理器:JsonWebSocketMessageHandler
职责与流程
- 忽略空消息
- 心跳检测:收到 "ping" 返回 "pong"
- 解析为 JsonWebSocketMessage,校验 type
- 依据 type 从监听器集合中查找对应监听器
- 使用监听器泛型参数进行内容反序列化
- 在租户上下文中执行监听器的 onMessage
图表来源
章节来源
会话管理器:WebSocketSessionManager 与默认实现
职责与能力
- 维护 Session 的添加与移除
- 将 Session 按用户类型与用户编号建立索引,支持按用户维度查询
- 提供按 sessionId、userType、userType+userId 查询
- 在多租户场景下,按租户 ID 过滤会话列表,避免跨租户推送
图表来源
章节来源
发送器抽象与实现
统一发送入口
- 支持按 sessionId、userType、userType+userId 三种维度发送
- 统一构造 JsonWebSocketMessage 并序列化为 payload
- 校验 Session 可用性后发送,捕获 IO 异常并记录日志
本地直发
- LocalWebSocketMessageSender:单机场景直连发送,无需外部 MQ
消息中间件广播
- Redis:RedisWebSocketMessageSender 通过 RedisMQTemplate 发布消息,RedisWebSocketMessageConsumer 在各节点消费并回推本地发送
- Kafka:KafkaWebSocketMessageSender 通过 KafkaTemplate 发送,KafkaWebSocketMessageConsumer 消费并回推
- RabbitMQ:RabbitMQWebSocketMessageSender 通过 RabbitTemplate 发送,RabbitMQWebSocketMessageConsumer 消费并回推
- RocketMQ:RocketMQWebSocketMessageSender 通过 RocketMQTemplate 发送,RocketMQWebSocketMessageConsumer 消费并回推
图表来源
- WebSocketMessageSender.java
- AbstractWebSocketMessageSender.java
- WebSocketSessionManagerImpl.java
- RedisWebSocketMessageSender.java
- RedisWebSocketMessageConsumer.java
章节来源
- WebSocketMessageSender.java
- AbstractWebSocketMessageSender.java
- LocalWebSocketMessageSender.java
- RedisWebSocketMessageSender.java
- RedisWebSocketMessageConsumer.java
自动装配与配置
- 条件启用:可通过 yudao.websocket.enable 控制是否启用 WebSocket
- 路径配置:yudao.websocket.path 指定 WebSocket 路径,默认 "/ws"
- 发送器类型:yudao.websocket.sender-type 可选 local、redis、rocketmq、kafka、rabbitmq
- 注册:自动装配注册 HandshakeInterceptor、WebSocketHandler、WebSocketSessionManager、以及对应发送器与消费者 Bean
章节来源
认证与安全
- 握手拦截:LoginUserHandshakeInterceptor 在握手前从安全上下文提取 LoginUser,并注入到 WebSocketSession 的 attributes 中
- 工具方法:WebSocketFrameworkUtils 提供读取登录用户、用户编号、用户类型、租户 ID 的便捷方法
- 会话绑定:SessionManager 在 addSession 时读取 LoginUser 并按 userType+userId 建立索引,便于定向或广播推送
章节来源
依赖关系分析
- 组件耦合
- JsonWebSocketMessageHandler 依赖 WebSocketMessageListener 集合与 SessionManager
- AbstractWebSocketMessageSender 依赖 SessionManager 与消息模型
- 各 Sender 实现依赖对应 MQ 客户端与 SessionManager
- 条件装配
- senderType 决定启用哪一类发送器与消费者 Bean
- Redis 发送器与消费者需先于通用消费者装配,确保订阅通道可用
- 外部依赖
- Spring WebSocket、Spring Security、消息中间件客户端(Kafka、RabbitMQ、RocketMQ、Redis)
图表来源
- YudaoWebSocketAutoConfiguration.java
- JsonWebSocketMessageHandler.java
- AbstractWebSocketMessageSender.java
- RedisWebSocketMessageSender.java
- RedisWebSocketMessageConsumer.java
章节来源
性能与可扩展性
- 本地直发
- 优点:低延迟、部署简单
- 限制:仅适用于单实例,无法跨节点广播
- MQ 广播
- 优点:天然支持多实例广播、解耦与弹性
- 成本:引入网络与序列化开销
- 会话存储
- 使用并发 Map/CopyOnWrite 结构,降低锁竞争
- 租户隔离减少无效遍历
- 发送路径
- 统一封装发送逻辑,避免重复校验与序列化
- 对不可用 Session 进行快速短路,减少 IO 异常传播
[本节为通用建议,不直接分析具体文件]
故障排查指南
- 连接失败
- 检查 yudao.websocket.enable 是否为 true
- 确认 yudao.websocket.path 与前端连接路径一致
- 无消息到达
- 核对消息 type 是否与监听器 getType() 一致
- 检查监听器是否被 Spring 扫描并注册
- 会话未绑定用户
- 确认握手拦截器生效,请求携带 token 并通过鉴权
- 检查 LoginUser 是否正确注入到 Session attributes
- 广播无效
- 若使用 MQ,确认对应发送器与消费者 Bean 已启用
- 检查 MQ 连接、topic/exchange 配置与权限
- 心跳异常
- 前端应发送 "ping",后端返回 "pong"
- 如无响应,排查处理器逻辑与网络链路
章节来源
- YudaoWebSocketAutoConfiguration.java
- JsonWebSocketMessageHandler.java
- LoginUserHandshakeInterceptor.java
结论
该 WebSocket 组件以“处理器 + 会话管理 + 发送器抽象 + 条件装配”为核心,既支持单机直发,又可无缝扩展至多 MQ 广播模式。配合认证拦截与租户隔离,满足多租户、多实例场景下的实时通信需求。通过统一的发送器接口与消息模型,业务侧可专注于消息类型与监听器实现,降低接入成本。
[本节为总结,不直接分析具体文件]
附录:配置与扩展示例
-
基础配置
- 启用 WebSocket:设置 yudao.websocket.enable=true
- 设置连接路径:yudao.websocket.path=/ws
- 选择发送器类型:yudao.websocket.sender-type=local|redis|rocketmq|kafka|rabbitmq
-
Redis 广播
- senderType=redis
- 自动装配 RedisWebSocketMessageSender 与 RedisWebSocketMessageConsumer
-
Kafka 广播
- senderType=kafka
- 配置 yudao.websocket.sender-kafka.topic
- 自动装配 KafkaWebSocketMessageSender 与 KafkaWebSocketMessageConsumer
-
RabbitMQ 广播
- senderType=rabbitmq
- 配置 yudao.websocket.sender-rabbitmq.exchange
- 自动装配 RabbitMQWebSocketMessageSender 与 RabbitMQWebSocketMessageConsumer
-
RocketMQ 广播
- senderType=rocketmq
- 配置 yudao.websocket.sender-rocketmq.topic
- 自动装配 RocketMQWebSocketMessageSender 与 RocketMQWebSocketMessageConsumer
-
扩展步骤
- 新建消息监听器:实现 WebSocketMessageListener
<T>,返回唯一 type - 新建发送器:继承 AbstractWebSocketMessageSender 或实现 WebSocketMessageSender
- 新建消费者:订阅对应 MQ 广播,调用本地发送器回推
- 新建消息监听器:实现 WebSocketMessageListener
章节来源