跳到主要内容

WebSocket消息推送

目录

  1. 简介
  2. 项目结构
  3. 核心组件
  4. 架构总览
  5. 详细组件分析
  6. 依赖关系分析
  7. 性能考量
  8. 故障排查指南
  9. 结论
  10. 附录

简介

本文件系统性阐述该仓库中WebSocket消息推送能力的设计与实现,覆盖连接建立、消息发送、会话管理、安全机制、消息格式与协议、集群广播与负载均衡、API与客户端集成、错误处理与性能优化等内容。读者无需深入源码即可理解如何在多节点环境下实现可靠的单播、广播与群组推送。

项目结构

WebSocket能力主要集中在“框架”模块中的自动装配与核心组件,业务侧提供示例监听器与消息体;同时通过配置项控制连接路径与消息发送器类型,支持本地直连与多种中间件广播(Redis、RocketMQ、RabbitMQ、Kafka)。

Mermaid Diagram Code:

graph TB
subgraph "框架核心"
A["WebSocketProperties<br/>配置项"] --> B["YudaoWebSocketAutoConfiguration<br/>自动装配"]
B --> C["JsonWebSocketMessageHandler<br/>消息处理器"]
B --> D["WebSocketSessionManagerImpl<br/>会话管理"]
B --> E["LoginUserHandshakeInterceptor<br/>握手拦截器"]
B --> F["WebSocketSessionHandlerDecorator<br/>会话装饰器"]
B --> G["WebSocketMessageSender 实现们<br/>local/redis/rocketmq/rabbitmq/kafka"]
end
subgraph "业务示例"
H["DemoWebSocketMessageListener<br/>消息监听器"]
I["DemoSendMessage / DemoReceiveMessage<br/>消息体"]
end
subgraph "外部中间件"
J["Redis"]
K["RocketMQ"]
L["RabbitMQ"]
M["Kafka"]
end
H --> C
C --> D
G --> D
G --> J
G --> K
G --> L
G --> M

图表来源

章节来源

核心组件

  • 配置与自动装配:通过配置项控制连接路径与发送器类型,自动注册处理器、拦截器、会话管理器与各发送器实现。
  • 消息处理器:基于JSON帧解析type路由到对应监听器,内置心跳“ping/pong”处理。
  • 会话管理:维护id与用户维度的会话索引,支持并发会话装饰,连接/断开自动登记与移除。
  • 安全与鉴权:握手阶段注入登录用户上下文;授权策略允许WebSocket路径放行。
  • 发送器:抽象统一的发送接口,提供本地直连与多中间件广播实现,满足单机与集群场景。

章节来源

架构总览

下图展示从客户端到业务监听器、再到会话管理与发送器的完整链路,以及多节点广播路径。

Mermaid Diagram Code:

sequenceDiagram
participant Client as "客户端"
participant WS as "WebSocket处理器<br/>JsonWebSocketMessageHandler"
participant Sess as "会话管理<br/>WebSocketSessionManagerImpl"
participant Sender as "消息发送器<br/>WebSocketMessageSender"
participant Bus as "消息总线<br/>Redis/RocketMQ/RabbitMQ/Kafka"
Client->>WS : "文本帧(JSON)"
WS->>WS : "解析type并查找监听器"
WS->>Sess : "按用户/会话获取目标会话"
alt "单播/群发"
Sender->>Sess : "查询会话"
Sender->>Client : "发送TextMessage(JSON)"
else "集群广播"
Sender->>Bus : "发布广播消息"
Bus-->>Sender : "消费广播消息"
Sender->>Sess : "定位会话并发送"
end

图表来源

详细组件分析

连接建立与握手

  • 握手拦截器在握手前从安全框架提取登录用户,注入到WebSocket会话属性,便于后续鉴权与会话管理。
  • 会话装饰器对底层会话进行并发增强,并在连接建立/关闭时自动登记/移除会话。
  • 自动装配注册WebSocket处理器与路径,允许跨域。

Mermaid Diagram Code:

sequenceDiagram
participant Client as "客户端"
participant Inter as "握手拦截器<br/>LoginUserHandshakeInterceptor"
participant Decor as "会话装饰器<br/>WebSocketSessionHandlerDecorator"
participant SessMgr as "会话管理<br/>WebSocketSessionManagerImpl"
Client->>Inter : "请求握手(携带token)"
Inter->>Inter : "提取登录用户并注入属性"
Inter-->>Client : "握手通过"
Client->>Decor : "建立连接"
Decor->>SessMgr : "afterConnectionEstablished<br/>登记会话"
Client-->>Decor : "断开连接"
Decor->>SessMgr : "afterConnectionClosed<br/>移除会话"

图表来源

章节来源

消息发送与路由

  • 消息处理器解析JSON帧,根据type路由到对应监听器,支持空消息与心跳“ping/pong”。
  • 发送器抽象统一了单播、群发、按会话发送三种能力,并在内部将消息序列化为JSON帧。
  • 本地发送器直接遍历目标会话并发送;集群广播通过中间件实现跨节点转发。

Mermaid Diagram Code:

flowchart TD
Start(["收到文本帧"]) --> Empty{"空消息?"}
Empty -- 是 --> End(["忽略"])
Empty -- 否 --> Heartbeat{"心跳ping?"}
Heartbeat -- 是 --> Pong["发送pong"] --> End
Heartbeat -- 否 --> Parse["解析JSON帧"]
Parse --> TypeEmpty{"type为空?"}
TypeEmpty -- 是 --> LogErr["记录错误"] --> End
TypeEmpty -- 否 --> Route["按type查找监听器"]
Route --> Found{"找到监听器?"}
Found -- 否 --> LogErr2["记录错误"] --> End
Found -- 是 --> Tenant["按租户执行监听器"]
Tenant --> End

图表来源

章节来源

会话管理

  • 会话管理器维护两层索引:按id与按用户类型+用户id的并发安全集合,支持按用户类型/用户id/会话id检索。
  • 连接建立时登记,断开时清理,避免悬挂会话影响发送。

Mermaid Diagram Code:

classDiagram
class WebSocketSessionManager {
+addSession(session)
+removeSession(session)
+getSession(id)
+getSessionList(userType)
+getSessionList(userType, userId)
}
class WebSocketSessionManagerImpl {
-idSessions
-userSessions
+addSession(session)
+removeSession(session)
+getSession(id)
+getSessionList(userType)
+getSessionList(userType, userId)
}
WebSocketSessionManager <|.. WebSocketSessionManagerImpl

图表来源

章节来源

安全机制

  • 连接认证:握手拦截器从安全框架获取登录用户并注入会话属性,确保后续鉴权可用。
  • 访问控制:WebSocket路径授权策略允许访问,结合全局安全配置实现细粒度控制。
  • 租户隔离:消息处理时按会话租户ID执行,避免跨租户消息泄露。

Mermaid Diagram Code:

sequenceDiagram
participant Sec as "安全框架"
participant Inter as "握手拦截器"
participant Sess as "WebSocketSession"
participant SecCfg as "授权策略"
Sec->>Inter : "获取登录用户"
Inter->>Sess : "注入登录用户属性"
SecCfg->>SecCfg : "放行WebSocket路径"

图表来源

章节来源

消息格式与协议

  • 消息帧采用JSON结构,包含type与content字段,由处理器解析并路由。
  • 业务监听器通过WebSocketMessageSender发送对象消息,内部自动序列化为JSON字符串。
  • 心跳:服务端识别“ping”,直接回“pong”。

Mermaid Diagram Code:

classDiagram
class JsonWebSocketMessage {
+String type
+String content
}
class WebSocketMessageListener {
+onMessage(session, message)
+getType() String
}
class DemoWebSocketMessageListener {
+onMessage(session, DemoSendMessage)
+getType() String
}
JsonWebSocketMessage --> WebSocketMessageListener : "按type路由"
DemoWebSocketMessageListener ..|> WebSocketMessageListener

图表来源

章节来源

多种推送方式

  • 单播:按用户类型+用户id精准定位会话发送。
  • 广播:按用户类型向该类型所有在线会话广播。
  • 群组推送:可在业务监听器中组合多个用户id进行多次单播或一次广播。
  • 示例:Demo监听器演示了单发与群发两种场景。

Mermaid Diagram Code:

flowchart TD
A["收到业务消息"] --> B{"是否指定接收人?"}
B -- 是 --> C["按用户类型+用户id单播"]
B -- 否 --> D["按用户类型广播"]
C --> E["发送JSON帧"]
D --> E

图表来源

章节来源

集群部署与负载均衡

  • 发送器类型可通过配置切换:local、redis、rocketmq、rabbitmq、kafka。
  • Redis/RocketMQ/RabbitMQ/Kafka广播:发送器将消息转为中间件消息,消费者在各节点拉取并调用本地发送器完成最终投递。
  • 负载均衡:多实例部署时,任一节点均可作为入口,通过中间件实现跨节点广播。

Mermaid Diagram Code:

graph TB
subgraph "节点A"
A1["业务监听器"]
A2["发送器(local/redis/rocketmq/rabbitmq/kafka)"]
end
subgraph "节点B"
B1["业务监听器"]
B2["发送器(local/redis/rocketmq/rabbitmq/kafka)"]
end
subgraph "消息总线"
R["Redis"]
RMQ["RocketMQ"]
RM["RabbitMQ"]
K["Kafka"]
end
A1 --> A2
B1 --> B2
A2 --> R
A2 --> RMQ
A2 --> RM
A2 --> K
R --> B2
RMQ --> B2
RM --> B2
K --> B2

图表来源

章节来源

API接口与客户端集成

  • RPC请求体:提供WebSocket发送请求DTO,包含会话id、用户类型、用户id、消息类型与内容。
  • 客户端集成要点:
    • 连接路径:由配置项决定,默认“/ws”。
    • 认证:握手时携带token参数,交由安全过滤器认证。
    • 心跳:客户端定期发送“ping”,服务端回“pong”。
    • 消息格式:发送JSON帧,包含type与content。

章节来源

依赖关系分析

  • 组件耦合:处理器依赖监听器集合进行路由;发送器依赖会话管理器与中间件模板;自动装配集中注册所有组件。
  • 外部依赖:Redis、RocketMQ、RabbitMQ、Kafka等中间件通过各自的模板与消费者参与广播。
  • 循环依赖:通过构造器注入与条件装配避免循环依赖问题。

Mermaid Diagram Code:

graph LR
Handler["JsonWebSocketMessageHandler"] --> Listener["WebSocketMessageListener"]
Sender["WebSocketMessageSender"] --> SessMgr["WebSocketSessionManager"]
AutoCfg["YudaoWebSocketAutoConfiguration"] --> Handler
AutoCfg --> SessMgr
AutoCfg --> Sender
Sender --> Redis["RedisWebSocketMessageSender"]
Sender --> RMQ["RocketMQWebSocketMessageSender"]
Sender --> Rabbit["RabbitMQWebSocketMessageSender"]
Sender --> Kafka["KafkaWebSocketMessageSender"]

图表来源

章节来源

性能考量

  • 并发会话:装饰器限制发送时间与缓冲区大小,避免阻塞与内存溢出。
  • 发送批量化:在业务侧可合并多次单播为一次广播,减少网络往返。
  • 心跳保活:利用“ping/pong”维持长连接,降低无效连接占用。
  • 中间件选择:Redis适合轻量广播;RocketMQ/RabbitMQ/Kafka具备更强的可靠性与扩展性。

章节来源

故障排查指南

  • 连接失败:检查握手拦截器是否正确注入登录用户,确认WebSocket路径与跨域设置。
  • 无消息到达:确认消息type与监听器一致,检查消息处理器是否解析成功。
  • 会话丢失:核对会话管理器登记/移除逻辑,关注并发装饰器配置。
  • 广播不生效:检查发送器类型与中间件消费者是否启用,确认topic/exchange配置。

章节来源

结论

该WebSocket框架提供了从连接、鉴权、消息路由到多节点广播的完整能力,既适用于单机开发,也能通过中间件实现高可用与水平扩展。通过统一的发送器接口与JSON消息协议,业务侧可以快速实现单播、广播与群组推送,并结合心跳与并发装饰器保障稳定性与性能。

附录

配置项清单

  • 连接路径:yudao.websocket.path,默认“/ws”
  • 发送器类型:yudao.websocket.sender-type,可选local/redis/rocketmq/rabbitmq/kafka
  • RocketMQ主题:yudao.websocket.sender-rocketmq.topic
  • RabbitMQ交换机:yudao.websocket.sender-rabbitmq.exchange
  • Kafka主题:yudao.websocket.sender-kafka.topic

章节来源

用户文档
AI 助手
Agent 列表
请选择一个 Agent 开始对话
AI 问答