跳到主要内容

消息队列组件 (yudao-spring-boot-starter-mq)

目录

  1. 简介
  2. 项目结构
  3. 核心组件
  4. 架构总览
  5. 详细组件分析
  6. 依赖关系分析
  7. 性能与可靠性
  8. 故障排查指南
  9. 结论
  10. 附录:扩展点与最佳实践

简介

本文件面向 yudao-spring-boot-starter-mq 消息队列组件,聚焦于 Redis 消息队列的生产者与消费者抽象、消息序列化、拦截器扩展、消息可靠性保障(含重试与死信思路)、以及事件总线与消息监听器的集成方式。文档同时给出架构图、流程图与类图,帮助读者快速理解与落地。

项目结构

该组件位于 yudao-framework/yudao-spring-boot-starter-mq 中,采用按功能域划分的包结构:

  • redis 核心能力:生产者模板、消费者自动装配、消息模型与拦截器
  • 自动装配:生产者与消费者的条件化 Bean 注册

Mermaid Diagram Code:

graph TB
subgraph "Redis 消息队列模块"
P["生产者自动装配<br/>YudaoRedisMQProducerAutoConfiguration"]
C["消费者自动装配<br/>YudaoRedisMQConsumerAutoConfiguration"]
T["消息模板<br/>RedisMQTemplate"]
M["消息模型<br/>AbstractRedisMessage"]
I["拦截器接口<br/>RedisMessageInterceptor"]
end
P --> T
C --> T
T --> M
T --> I

图表来源

章节来源

核心组件

  • 生产者模板:封装 Redis 发送逻辑(Pub/Sub 与 Stream),内置拦截器链路,负责消息序列化与发送前后钩子
  • 消费者自动装配:根据监听器类型自动注册 Pub/Sub 与 Stream 的监听容器,并进行版本校验与消费者组管理
  • 消息模型:统一的消息头结构,便于扩展与跨模块传递上下文
  • 拦截器:提供发送/消费前后的扩展点,支持多租户等横切需求

章节来源

架构总览

整体架构围绕“生产者模板 + 自动装配 + 监听器”的模式展开,生产者通过模板统一发送消息,消费者通过自动装配注册监听容器,拦截器贯穿发送与消费两端,形成可插拔的扩展机制。

Mermaid Diagram Code:

graph TB
subgraph "应用层"
APP["业务代码"]
end
subgraph "消息生产侧"
PRODUCER["RedisMQTemplate<br/>send(pubsub/stream)"]
INTERCEPTORS["拦截器链<br/>sendMessageBefore/After"]
end
subgraph "消息传输"
PUBSUB["Redis Pub/Sub"]
STREAM["Redis Stream"]
end
subgraph "消息消费侧"
CONTAINER1["RedisMessageListenerContainer<br/>Pub/Sub 监听"]
CONTAINER2["StreamMessageListenerContainer<br/>Stream 监听"]
LISTENER1["AbstractRedisChannelMessageListener"]
LISTENER2["AbstractRedisStreamMessageListener"]
REDIS_CHECK["Redis 版本校验"]
end
APP --> PRODUCER
PRODUCER --> INTERCEPTORS
PRODUCER --> PUBSUB
PRODUCER --> STREAM
PUBSUB --> CONTAINER1
STREAM --> CONTAINER2
CONTAINER1 --> LISTENER1
CONTAINER2 --> LISTENER2
CONTAINER2 --> REDIS_CHECK

图表来源

详细组件分析

RedisMQTemplate:统一生产者模板

  • 功能要点
    • 提供 Pub/Sub 与 Stream 两种发送路径
    • 统一序列化为 JSON 字符串
    • 发送前后执行拦截器链(顺序/逆序)
  • 关键行为
    • send(pubsub):convertAndSend(channel, json)
    • send(stream):opsForStream().add(newRecord().ofObject(json).withStreamKey(...))

Mermaid Diagram Code:

classDiagram
class RedisMQTemplate {
-RedisTemplate redisTemplate
-RedisMessageInterceptor[] interceptors
+send(message : AbstractRedisChannelMessage)
+send(message : AbstractRedisStreamMessage) : RecordId
+addInterceptor(interceptor)
-sendMessageBefore(message)
-sendMessageAfter(message)
}
class AbstractRedisMessage {
+Map~String,String~ headers
+getHeader(key) : String
+addHeader(key, value)
}
class RedisMessageInterceptor {
<<interface>>
+sendMessageBefore(message)
+sendMessageAfter(message)
+consumeMessageBefore(message)
+consumeMessageAfter(message)
}
RedisMQTemplate --> AbstractRedisMessage : "使用"
RedisMQTemplate --> RedisMessageInterceptor : "维护拦截器链"

图表来源

章节来源

生产者自动装配:YudaoRedisMQProducerAutoConfiguration

  • 职责
    • 注册 RedisMQTemplate Bean
    • 将全局拦截器注入模板
  • 依赖
    • StringRedisTemplate
    • List<RedisMessageInterceptor>

Mermaid Diagram Code:

sequenceDiagram
participant AC as "自动装配类"
participant RT as "StringRedisTemplate"
participant INT as "拦截器列表"
participant T as "RedisMQTemplate"
AC->>RT : 获取 Redis 连接工厂
AC->>INT : 收集拦截器
AC->>T : 构造模板并注入拦截器
AC-->>AC : 返回 RedisMQTemplate Bean

图表来源

章节来源

消费者自动装配:YudaoRedisMQConsumerAutoConfiguration

  • 职责
    • 条件化注册 Pub/Sub 监听容器
    • 条件化注册 Stream 监听容器与消费者组
    • 注册 Redis 待消费消息重发作业(定时任务)
    • 校验 Redis 版本(最低 5.x)
  • 关键点
    • Pub/Sub:RedisMessageListenerContainer + ChannelTopic
    • Stream:StreamMessageListenerContainer + 消费者组 + 手动 ACK
    • 版本校验:通过 Redis INFO redis_version 判断主版本

Mermaid Diagram Code:

sequenceDiagram
participant AC as "自动装配类"
participant L1 as "Channel 监听器"
participant L2 as "Stream 监听器"
participant RC as "Redis 连接"
participant RS as "Redis 版本校验"
AC->>RC : 获取连接工厂
AC->>L1 : 注册 Pub/Sub 监听器
AC->>L2 : 注册 Stream 监听器(消费者组)
AC->>RS : 校验 Redis 版本 >= 5
AC-->>AC : 完成容器注册

图表来源

章节来源

消息模型与拦截器

  • AbstractRedisMessage:统一消息头 Map,便于透传上下文
  • RedisMessageInterceptor:提供发送/消费前后回调,支持多租户等横切逻辑

Mermaid Diagram Code:

classDiagram
class AbstractRedisMessage {
+headers : Map~String,String~
+getHeader(key)
+addHeader(key, value)
}
class RedisMessageInterceptor {
<<interface>>
+sendMessageBefore(message)
+sendMessageAfter(message)
+consumeMessageBefore(message)
+consumeMessageAfter(message)
}

图表来源

章节来源

发送流程(拦截器链)

Mermaid Diagram Code:

flowchart TD
START(["进入 send(...)"]) --> BEFORE["遍历拦截器 sendMessageBefore"]
BEFORE --> SEND["执行具体发送逻辑(Pub/Sub 或 Stream)"]
SEND --> AFTER["逆序遍历拦截器 sendMessageAfter"]
AFTER --> END(["结束"])

图表来源

章节来源

依赖关系分析

  • 组件内聚与耦合
    • RedisMQTemplate 与拦截器解耦,通过接口扩展
    • 自动装配类仅负责 Bean 注册与条件判断,职责单一
    • 消费者自动装配对监听器类型进行条件化注册,避免无用容器
  • 外部依赖
    • Spring Data Redis(Pub/Sub 与 Stream)
    • Redisson(分布式锁/互斥,用于重试作业)
    • Hutool(系统信息与字符串工具)

Mermaid Diagram Code:

graph LR
AC1["YudaoRedisMQProducerAutoConfiguration"] --> TPL["RedisMQTemplate"]
AC2["YudaoRedisMQConsumerAutoConfiguration"] --> TPL
AC2 --> L1["AbstractRedisChannelMessageListener"]
AC2 --> L2["AbstractRedisStreamMessageListener"]
TPL --> MSG["AbstractRedisMessage"]
TPL --> INTF["RedisMessageInterceptor"]

图表来源

章节来源

性能与可靠性

  • 性能特性
    • Pub/Sub:轻量广播,适合事件总线与低延迟场景
    • Stream:具备持久化、消费者组、手动 ACK,适合可靠投递与水平扩展
    • 批量拉取:Stream 容器默认批量大小可按需调整
  • 可靠性保障
    • 手动 ACK:消费者处理失败不丢失消息,结合重试策略提升可用性
    • 版本校验:强制 Redis 5+,确保 Stream 能力可用
    • 待消费消息重发:定时任务扫描 Pending 列表并重发
  • 事务与一致性
    • 当前实现未直接提供“事务消息”语义;可通过“发送 + 成功回调 + 手动 ACK”组合实现最终一致
    • 如需强一致,建议引入外部事务协调或专用事务消息中间件

[本节为通用指导,无需特定文件引用]

故障排查指南

  • Redis 版本过低
    • 现象:启动时报错提示 Redis 版本低于 5.0.0
    • 处理:升级 Redis 至 5.0+,或降级组件版本
    • 参考:消费者自动装配中的版本校验逻辑
  • 消费者组未创建
    • 现象:首次启动无消息消费
    • 处理:确认监听器已注册,消费者组自动创建逻辑生效
  • 手动 ACK 导致堆积
    • 现象:Pending 队列持续增长
    • 处理:检查业务处理耗时与异常分支,优化处理逻辑;启用重试与死信思路
  • Pub/Sub 监听未生效
    • 现象:消息未被消费
    • 处理:确认 ChannelTopic 与监听器通道一致,且监听器被 Spring 管理

章节来源

结论

yudao-spring-boot-starter-mq 以 Redis 为核心,提供了统一的生产者模板与自动装配的消费者容器,配合拦截器扩展机制,能够覆盖事件总线与可靠消息投递两大场景。通过 Pub/Sub 与 Stream 的组合,既满足低延迟广播,也满足高可靠消费与水平扩展。对于事务消息与更复杂的可靠性诉求,可在现有基础上通过重试、死信与外部协调实现。

[本节为总结性内容,无需特定文件引用]

附录:扩展点与最佳实践

  • 自定义消息转换器
    • 当前使用 JSON 序列化;如需二进制/压缩/加密,可在拦截器中对消息头进行约定并在发送/消费两端解析
  • 自定义消息过滤器
    • 通过拦截器在 consumeMessageBefore 中进行过滤,或在监听器内部根据消息头进行判定
  • 监控与告警
    • 建议结合 Stream Pending 队列长度、消费耗时、异常计数等指标建立告警
  • 死信与重试
    • 建议为关键业务设置最大重试次数与死信队列,结合 Pending 检测与定时重发作业实现
  • 多租户与上下文透传
    • 使用消息头透传租户标识、用户标识等上下文,拦截器中统一处理

[本节为通用指导,无需特定文件引用]

用户文档
AI 助手
Agent 列表
请选择一个 Agent 开始对话
AI 问答