跳到主要内容

异步消息通信

目录

  1. 简介
  2. 项目结构
  3. 核心组件
  4. 架构总览
  5. 详细组件分析
  6. 依赖关系分析
  7. 性能考量
  8. 故障排查指南
  9. 结论
  10. 附录

简介

本文件面向 yudao-cloud 的异步消息通信能力,系统性介绍基于 Redis、RabbitMQ、Kafka 的多种实现方式与最佳实践。重点覆盖:

  • 多实现选择与适用场景
  • 生产者与消费者的实现模式(含序列化/反序列化)
  • 错误处理与可靠性投递
  • 事件驱动架构设计与幂等性保障
  • 配置示例与性能优化建议

项目结构

yudao-cloud 的消息队列能力由独立的 starter 模块提供,核心位于 yudao-spring-boot-starter-mq,按“实现维度”组织:

  • Redis 实现:模板、通道消息、流消息、监听器、自动装配
  • RabbitMQ 实现:自动装配(JSON 序列化)
  • Kafka 实现:自动装配(通用错误处理器、重试与死信)
  • 事件模型:领域事件定义(如 APK 推送回调)

Mermaid Diagram Code:

graph TB
subgraph "消息队列实现"
R["Redis 实现"]
RMQ["RabbitMQ 实现"]
K["Kafka 实现"]
end
subgraph "事件模型"
EVT["领域事件定义"]
end
R --> |"Pub/Sub 广播"| R1["通道消息监听器"]
R --> |"Stream 集群消费"| R2["流消息监听器"]
R --> |"发送模板"| R3["RedisMQTemplate"]
RMQ --> |"Jackson2Json 序列化"| RMQ1["RabbitMQ 自动装配"]
K --> |"通用错误处理/重试/死信"| K1["Kafka 自动装配"]
EVT --> |"跨模块解耦"| R
EVT --> |"跨模块解耦"| RMQ
EVT --> |"跨模块解耦"| K

图表来源

章节来源

核心组件

  • RedisMQTemplate:统一的 Redis 消息发送入口,支持 Pub/Sub 广播与 Stream 集群消费两种路径,并内置拦截器扩展点。
  • 通道消息与流消息:分别对应 Redis 的 Pub/Sub 与 Stream,提供默认键/频道命名策略。
  • 监听器体系:通道监听器与流监听器分别绑定到对应消费模型。
  • RabbitMQ 自动装配:提供 Jackson2JsonMessageConverter,确保消息体 JSON 序列化一致。
  • Kafka 自动装配:提供通用错误处理器,支持重试、跳过反序列化异常、死信恢复器。

章节来源

架构总览

下图展示消息从生产到消费的整体流程,以及三种实现的差异点:

Mermaid Diagram Code:

sequenceDiagram
participant Producer as "生产者"
participant MQ as "消息中间件"
participant Listener as "消费者监听器"
Producer->>MQ : "发送消息序列化为JSON"
alt "Redis Pub/Sub"
MQ-->>Listener : "广播消息Channel"
else "Redis Stream"
MQ-->>Listener : "集群消费Group/StreamKey"
else "RabbitMQ"
MQ-->>Listener : "路由到队列/交换机"
else "Kafka"
MQ-->>Listener : "分区消费重试/死信"
end
Listener->>Listener : "反序列化并执行业务"

图表来源

详细组件分析

Redis 实现

  • 发送模板与拦截器
    • RedisMQTemplate 提供两类发送方法:Pub/Sub 与 Stream,并在发送前后触发拦截器生命周期钩子,便于扩展(如多租户上下文注入)。
    • 拦截器接口提供发送前/后、消费前/后四个钩子,顺序分别为正序与逆序,确保扩展点的对称性。
  • 通道消息与流消息
    • 通道消息默认频道名为类名;流消息默认 StreamKey 为类名,二者均避免在消息体中重复序列化频道/键名。
  • 监听器与容器
    • 通道监听器基于 RedisMessageListenerContainer,按 ChannelTopic 注册。
    • 流监听器基于 StreamMessageListenerContainer,自动创建消费者组,支持批量拉取、手动确认与异常不中断消费。
    • 消费者名称采用“IP@进程号”组合,避免同机多实例冲突。
    • 启动时校验 Redis 版本不低于 5.0.0,以支持 Stream 能力。
  • 自动装配
    • 生产者自动装配:注入 RedisMQTemplate,并收集全局拦截器。
    • 消费者自动装配:根据是否存在监听器 Bean,动态注册 Pub/Sub 与 Stream 的监听容器。

Mermaid Diagram Code:

classDiagram
class RedisMQTemplate {
-RedisTemplate redisTemplate
-RedisMessageInterceptor[] interceptors
+send(AbstractRedisChannelMessage)
+send(AbstractRedisStreamMessage) RecordId
+addInterceptor(RedisMessageInterceptor)
}
class AbstractRedisMessage {
+Map~String,String~ headers
+getHeader(key) String
+addHeader(key,value) void
}
class AbstractRedisChannelMessage {
+getChannel() String
}
class AbstractRedisStreamMessage {
+getStreamKey() String
}
class RedisMessageInterceptor {
<<interface>>
+sendMessageBefore(message)
+sendMessageAfter(message)
+consumeMessageBefore(message)
+consumeMessageAfter(message)
}
RedisMQTemplate --> AbstractRedisMessage : "序列化/反序列化"
AbstractRedisChannelMessage --|> AbstractRedisMessage
AbstractRedisStreamMessage --|> AbstractRedisMessage
RedisMQTemplate --> RedisMessageInterceptor : "扩展点"

图表来源

章节来源

RabbitMQ 实现

  • 自动装配要点
    • 条件化装配:仅当存在 RabbitMQ 相关类时生效。
    • Jackson2JsonMessageConverter:统一消息体 JSON 序列化,确保跨服务一致性。
  • 多租户扩展
    • 通过 BeanPostProcessor 在消息发送前将租户 ID 写入消息头,在消费侧再回填上下文,实现租户隔离。

Mermaid Diagram Code:

sequenceDiagram
participant Producer as "生产者"
participant RabbitMQ as "RabbitMQ"
participant Interceptor as "多租户拦截器"
participant Consumer as "消费者"
Producer->>Interceptor : "postProcessMessage 写入租户头"
Interceptor->>RabbitMQ : "发送消息JSON"
RabbitMQ-->>Consumer : "投递消息"
Consumer->>Consumer : "从消息头恢复租户上下文"

图表来源

章节来源

Kafka 实现

  • 通用错误处理
    • 使用 DefaultErrorHandler 与 FixedBackOff 配置重试策略(固定间隔、最大次数)。
    • 通过 DeadLetterPublishingRecoverer 将重试耗尽的消息投递至死信主题,避免阻塞主消费链路。
    • 对反序列化异常进行特殊处理:自动跳过问题记录并记录详细日志,防止因个别消息导致整体停滞。
  • 重试与日志
    • 提供重试监听器,输出 Topic、分区、偏移量、Key、异常等关键信息,便于定位问题。

Mermaid Diagram Code:

flowchart TD
Start(["消费开始"]) --> TryConsume["尝试消费消息"]
TryConsume --> Success{"消费成功?"}
Success --> |是| Ack["提交偏移量"]
Success --> |否| IsDeserErr{"是否反序列化异常?"}
IsDeserErr --> |是| Skip["跳过该消息并记录日志"] --> Ack
IsDeserErr --> |否| Retry{"是否达到最大重试次数?"}
Retry --> |否| Backoff["等待固定间隔后重试"]
Backoff --> TryConsume
Retry --> |是| DLQ["投递至死信主题"] --> Ack
Ack --> End(["结束"])

图表来源

章节来源

事件驱动架构与领域事件

  • 事件模型
    • 以领域事件作为跨模块解耦的契约,事件类具备明确的字段与语义,便于生产者与多个消费者协作。
  • 示例事件
    • APK 推送成功回调事件:包含设备标识、任务信息、应用版本等,用于任务模块与报表模块协同处理。

Mermaid Diagram Code:

erDiagram
APK_PUSH_CALLBACK_EVENT {
string mac
string cpuid
int task_id
string package_name
long version_code
datetime push_task_time
}

图表来源

章节来源

依赖关系分析

  • 自动装配导入
    • 通过 AutoConfiguration.imports 统一引入 Redis 生产者/消费者、RabbitMQ、Kafka 的自动装配类,确保按需启用。
  • 组件耦合
    • RedisMQTemplate 与拦截器松耦合,通过接口扩展实现多租户等横切能力。
    • 监听器与容器解耦,容器仅负责注册与生命周期管理,具体反序列化与业务处理由监听器实现。

Mermaid Diagram Code:

graph TB
Imports["AutoConfiguration.imports"] --> RProd["Redis 生产者自动装配"]
Imports --> RCons["Redis 消费者自动装配"]
Imports --> RMQ["RabbitMQ 自动装配"]
Imports --> KA["Kafka 自动装配"]
RProd --> Tpl["RedisMQTemplate"]
RCons --> L1["通道监听器容器"]
RCons --> L2["流监听器容器"]

图表来源

章节来源

性能考量

  • Redis
    • Pub/Sub:适合低延迟广播场景,注意订阅端数量与网络抖动带来的放大效应。
    • Stream:支持消费者组与批量拉取,建议合理设置批次大小与消费者数量,避免过度竞争。
    • 版本要求:确保 Redis 版本≥5.0.0,以获得稳定的 Stream 能力。
  • RabbitMQ
    • 使用 Jackson2Json 序列化,减少跨服务解析差异;合理设置队列/交换机持久化与确认机制。
  • Kafka
    • 合理配置重试间隔与最大重试次数,避免雪崩;对反序列化异常快速跳过,降低整体延迟。
    • 死信主题用于兜底,避免阻塞主消费链路。

故障排查指南

  • Kafka
    • 反序列化异常:自动跳过并记录日志,检查消息体格式与序列化配置。
    • 重试过多:查看重试监听器日志,定位 Topic/分区/偏移量,必要时人工干预死信主题。
  • Redis
    • 版本不满足:启动时报错提示版本过低,升级 Redis 至 5.0+。
    • 消费组未创建:首次启动会自动创建,若失败需检查权限与键空间。
    • 手动确认:确保业务处理完成后正确提交偏移量,避免重复消费。
  • RabbitMQ
    • 多租户上下文丢失:检查消息头是否正确写入与读取,确保拦截器生效。

章节来源

结论

yudao-cloud 的消息队列能力以“统一模板 + 多实现 + 事件模型”为核心,既保证了不同中间件的兼容性,又通过拦截器与错误处理机制提供了可观测性与可靠性。结合本文的配置与优化建议,可在不同业务场景下选择合适的实现路径,并建立完善的幂等与可靠性投递策略。

附录

用户文档
AI 助手
Agent 列表
请选择一个 Agent 开始对话
AI 问答