跳到主要内容

WebSocket组件 (yudao-spring-boot-starter-websocket)

目录

  1. 简介
  2. 项目结构
  3. 核心组件
  4. 架构总览
  5. 组件详解
  6. 依赖关系分析
  7. 性能与可扩展性
  8. 故障排查指南
  9. 结论
  10. 附录:配置与扩展示例

简介

本文件面向 yudao-spring-boot-starter-websocket 的 WebSocket 组件,系统性阐述以下主题:

  • 消息处理器 JsonWebSocketMessageHandler 的消息解析与路由分发机制
  • 会话管理器 SessionManager 的连接维护、用户绑定、广播推送能力
  • 多种消息发送器(Sender)的实现与集成:本地、Redis、Kafka、RabbitMQ、RocketMQ
  • 认证机制、权限控制、心跳检测、断线重连等实时通信特性
  • 扩展与自定义配置实践

项目结构

该模块位于 yudao-framework 子工程中,采用按“核心能力”分层组织:

  • config:自动装配与属性配置
  • core:核心处理器、监听器、消息模型、会话管理、安全工具
  • sender:消息发送器抽象与多实现(本地、Redis、Kafka、RabbitMQ、RocketMQ)
  • util:WebSocket 框架工具类

Mermaid Diagram Code:

graph TB
subgraph "配置"
P["WebSocketProperties<br/>yudao.websocket.*"]
C["YudaoWebSocketAutoConfiguration<br/>自动装配与条件配置"]
end
subgraph "核心"
H["JsonWebSocketMessageHandler<br/>消息解析与路由"]
L["WebSocketMessageListener<br/>消息监听器接口"]
M["JsonWebSocketMessage<br/>消息帧模型"]
S["WebSocketSessionManager<br/>会话管理接口"]
SI["WebSocketSessionManagerImpl<br/>默认实现"]
U["WebSocketFrameworkUtils<br/>会话与用户工具"]
A["LoginUserHandshakeInterceptor<br/>握手拦截器"]
end
subgraph "发送器"
AS["AbstractWebSocketMessageSender<br/>发送器抽象"]
LS["LocalWebSocketMessageSender"]
RS["RedisWebSocketMessageSender"]
RC["RedisWebSocketMessageConsumer"]
end
P --> C
C --> H
C --> S
C --> AS
H --> L
H --> SI
S --> SI
AS --> SI
RS --> RC

图表来源

章节来源

核心组件

  • 消息处理器:负责文本消息解析、心跳响应、按 type 路由到监听器,并在租户上下文中执行业务逻辑
  • 会话管理器:维护 Session 的增删、按用户维度索引、租户隔离、广播查询
  • 发送器抽象:统一发送入口,屏蔽不同传输介质差异;具体实现支持本地直发与多 MQ 广播
  • 自动装配:基于属性开关与 senderType 动态启用相应功能,注册拦截器、处理器、会话管理器与各发送器 Bean
  • 安全与工具:握手时注入登录用户信息,提供会话与用户信息读取工具

章节来源

架构总览

WebSocket 服务由“自动装配 + 处理器 + 会话管理 + 发送器 + 安全拦截器”构成,支持本地直发与多消息中间件广播。

Mermaid Diagram Code:

graph TB
Client["客户端"] --> WS["Spring WebSocket<br/>路径: yudao.websocket.path"]
WS --> HI["LoginUserHandshakeInterceptor<br/>注入登录用户"]
HI --> DH["JsonWebSocketMessageHandler<br/>解析/路由/心跳"]
DH --> SM["WebSocketSessionManagerImpl<br/>会话索引/租户隔离"]
DH --> Ls["WebSocketMessageListener<br/>业务监听器集合"]
subgraph "发送路径"
subgraph "本地直发"
S1["AbstractWebSocketMessageSender<br/>统一发送入口"] --> SM
end
subgraph "消息中间件广播"
S2["Redis/Kafka/RabbitMQ/RocketMQ Sender"] --> MQ["MQ Broker"]
MQ --> Csm["对应 Consumer"] --> S1
end
end

图表来源

组件详解

消息处理器:JsonWebSocketMessageHandler

职责与流程

  • 忽略空消息
  • 心跳检测:收到 "ping" 返回 "pong"
  • 解析为 JsonWebSocketMessage,校验 type
  • 依据 type 从监听器集合中查找对应监听器
  • 使用监听器泛型参数进行内容反序列化
  • 在租户上下文中执行监听器的 onMessage

Mermaid Diagram Code:

flowchart TD
Start(["进入 handleTextMessage"]) --> Empty{"空消息?"}
Empty --> |是| End["结束"]
Empty --> |否| Ping{"是否为 'ping'?"}
Ping --> |是| Pong["发送 'pong'"] --> End
Ping --> |否| Parse["解析为 JsonWebSocketMessage"]
Parse --> ValidType{"type 是否有效?"}
ValidType --> |否| LogErr["记录错误"] --> End
ValidType --> |是| Find["根据 type 查找监听器"]
Find --> Found{"找到监听器?"}
Found --> |否| LogErr2["记录错误"] --> End
Found --> |是| Deserialize["按监听器泛型反序列化内容"]
Deserialize --> Tenant["在租户上下文中执行 onMessage"]
Tenant --> End

图表来源

章节来源

会话管理器:WebSocketSessionManager 与默认实现

职责与能力

  • 维护 Session 的添加与移除
  • 将 Session 按用户类型与用户编号建立索引,支持按用户维度查询
  • 提供按 sessionId、userType、userType+userId 查询
  • 在多租户场景下,按租户 ID 过滤会话列表,避免跨租户推送

Mermaid Diagram Code:

classDiagram
class WebSocketSessionManager {
+addSession(session)
+removeSession(session)
+getSession(id)
+getSessionList(userType)
+getSessionList(userType, userId)
}
class WebSocketSessionManagerImpl {
-idSessions : Map<String, WebSocketSession>
-userSessions : Map[userType, Map[userId, List[WebSocketSession]]]
+addSession(session)
+removeSession(session)
+getSession(id)
+getSessionList(userType)
+getSessionList(userType, userId)
}
WebSocketSessionManager <|.. WebSocketSessionManagerImpl

图表来源

章节来源

发送器抽象与实现

统一发送入口

  • 支持按 sessionId、userType、userType+userId 三种维度发送
  • 统一构造 JsonWebSocketMessage 并序列化为 payload
  • 校验 Session 可用性后发送,捕获 IO 异常并记录日志

本地直发

  • LocalWebSocketMessageSender:单机场景直连发送,无需外部 MQ

消息中间件广播

  • Redis:RedisWebSocketMessageSender 通过 RedisMQTemplate 发布消息,RedisWebSocketMessageConsumer 在各节点消费并回推本地发送
  • Kafka:KafkaWebSocketMessageSender 通过 KafkaTemplate 发送,KafkaWebSocketMessageConsumer 消费并回推
  • RabbitMQ:RabbitMQWebSocketMessageSender 通过 RabbitTemplate 发送,RabbitMQWebSocketMessageConsumer 消费并回推
  • RocketMQ:RocketMQWebSocketMessageSender 通过 RocketMQTemplate 发送,RocketMQWebSocketMessageConsumer 消费并回推

Mermaid Diagram Code:

sequenceDiagram
participant Biz as "业务代码"
participant Sender as "WebSocketMessageSender"
participant Abs as "AbstractWebSocketMessageSender"
participant SM as "WebSocketSessionManager"
participant MQ as "消息中间件"
participant Csm as "对应 Consumer"
Biz->>Sender : send(userType, userId, type, content)
Sender->>Abs : send(..., type, content)
Abs->>SM : 查询会话列表
SM-->>Abs : 返回会话集合
Abs->>Abs : 构造 JsonWebSocketMessage 并序列化
alt 本地直发
Abs->>SM : 遍历会话并 sendMessage
else MQ 广播
Abs->>MQ : 发布广播消息
MQ-->>Csm : 消费广播
Csm->>Abs : 回推本地发送
end

图表来源

章节来源

自动装配与配置

  • 条件启用:可通过 yudao.websocket.enable 控制是否启用 WebSocket
  • 路径配置:yudao.websocket.path 指定 WebSocket 路径,默认 "/ws"
  • 发送器类型:yudao.websocket.sender-type 可选 local、redis、rocketmq、kafka、rabbitmq
  • 注册:自动装配注册 HandshakeInterceptor、WebSocketHandler、WebSocketSessionManager、以及对应发送器与消费者 Bean

章节来源

认证与安全

  • 握手拦截:LoginUserHandshakeInterceptor 在握手前从安全上下文提取 LoginUser,并注入到 WebSocketSession 的 attributes 中
  • 工具方法:WebSocketFrameworkUtils 提供读取登录用户、用户编号、用户类型、租户 ID 的便捷方法
  • 会话绑定:SessionManager 在 addSession 时读取 LoginUser 并按 userType+userId 建立索引,便于定向或广播推送

章节来源

依赖关系分析

  • 组件耦合
    • JsonWebSocketMessageHandler 依赖 WebSocketMessageListener 集合与 SessionManager
    • AbstractWebSocketMessageSender 依赖 SessionManager 与消息模型
    • 各 Sender 实现依赖对应 MQ 客户端与 SessionManager
  • 条件装配
    • senderType 决定启用哪一类发送器与消费者 Bean
    • Redis 发送器与消费者需先于通用消费者装配,确保订阅通道可用
  • 外部依赖
    • Spring WebSocket、Spring Security、消息中间件客户端(Kafka、RabbitMQ、RocketMQ、Redis)

Mermaid Diagram Code:

graph LR
H["JsonWebSocketMessageHandler"] --> Ls["WebSocketMessageListener*"]
H --> SM["WebSocketSessionManagerImpl"]
AS["AbstractWebSocketMessageSender"] --> SM
LS["LocalWebSocketMessageSender"] --> AS
RS["RedisWebSocketMessageSender"] --> AS
RC["RedisWebSocketMessageConsumer"] --> RS

图表来源

章节来源

性能与可扩展性

  • 本地直发
    • 优点:低延迟、部署简单
    • 限制:仅适用于单实例,无法跨节点广播
  • MQ 广播
    • 优点:天然支持多实例广播、解耦与弹性
    • 成本:引入网络与序列化开销
  • 会话存储
    • 使用并发 Map/CopyOnWrite 结构,降低锁竞争
    • 租户隔离减少无效遍历
  • 发送路径
    • 统一封装发送逻辑,避免重复校验与序列化
    • 对不可用 Session 进行快速短路,减少 IO 异常传播

[本节为通用建议,不直接分析具体文件]

故障排查指南

  • 连接失败
    • 检查 yudao.websocket.enable 是否为 true
    • 确认 yudao.websocket.path 与前端连接路径一致
  • 无消息到达
    • 核对消息 type 是否与监听器 getType() 一致
    • 检查监听器是否被 Spring 扫描并注册
  • 会话未绑定用户
    • 确认握手拦截器生效,请求携带 token 并通过鉴权
    • 检查 LoginUser 是否正确注入到 Session attributes
  • 广播无效
    • 若使用 MQ,确认对应发送器与消费者 Bean 已启用
    • 检查 MQ 连接、topic/exchange 配置与权限
  • 心跳异常
    • 前端应发送 "ping",后端返回 "pong"
    • 如无响应,排查处理器逻辑与网络链路

章节来源

结论

该 WebSocket 组件以“处理器 + 会话管理 + 发送器抽象 + 条件装配”为核心,既支持单机直发,又可无缝扩展至多 MQ 广播模式。配合认证拦截与租户隔离,满足多租户、多实例场景下的实时通信需求。通过统一的发送器接口与消息模型,业务侧可专注于消息类型与监听器实现,降低接入成本。

[本节为总结,不直接分析具体文件]

附录:配置与扩展示例

  • 基础配置

    • 启用 WebSocket:设置 yudao.websocket.enable=true
    • 设置连接路径:yudao.websocket.path=/ws
    • 选择发送器类型:yudao.websocket.sender-type=local|redis|rocketmq|kafka|rabbitmq
  • Redis 广播

    • senderType=redis
    • 自动装配 RedisWebSocketMessageSender 与 RedisWebSocketMessageConsumer
  • Kafka 广播

    • senderType=kafka
    • 配置 yudao.websocket.sender-kafka.topic
    • 自动装配 KafkaWebSocketMessageSender 与 KafkaWebSocketMessageConsumer
  • RabbitMQ 广播

    • senderType=rabbitmq
    • 配置 yudao.websocket.sender-rabbitmq.exchange
    • 自动装配 RabbitMQWebSocketMessageSender 与 RabbitMQWebSocketMessageConsumer
  • RocketMQ 广播

    • senderType=rocketmq
    • 配置 yudao.websocket.sender-rocketmq.topic
    • 自动装配 RocketMQWebSocketMessageSender 与 RocketMQWebSocketMessageConsumer
  • 扩展步骤

    • 新建消息监听器:实现 WebSocketMessageListener<T>,返回唯一 type
    • 新建发送器:继承 AbstractWebSocketMessageSender 或实现 WebSocketMessageSender
    • 新建消费者:订阅对应 MQ 广播,调用本地发送器回推

章节来源

用户文档
AI 助手
Agent 列表
请选择一个 Agent 开始对话
AI 问答