异步消息通信
引用文件
本文引用的文件
- RedisMQTemplate.java
- AbstractRedisMessage.java
- AbstractRedisChannelMessage.java
- AbstractRedisStreamMessage.java
- YudaoRedisMQProducerAutoConfiguration.java
- YudaoRedisMQConsumerAutoConfiguration.java
- RedisMessageInterceptor.java
- YudaoRabbitMQAutoConfiguration.java
- KafkaAutoConfiguration.java
- ApkPushCallbackEvent.java
- AbstractRedisChannelMessageListener.java
- AbstractRedisStreamMessageListener.java
- org.springframework.boot.autoconfigure.AutoConfiguration.imports
目录
简介
本文件面向 yudao-cloud 的异步消息通信能力,系统性介绍基于 Redis、RabbitMQ、Kafka 的多种实现方式与最佳实践。重点覆盖:
- 多实现选择与适用场景
- 生产者与消费者的实现模式(含序列化/反序列化)
- 错误处理与可靠性投递
- 事件驱动架构设计与幂等性保障
- 配置示例与性能优化建议
项目结构
yudao-cloud 的消息队列能力由独立的 starter 模块提供,核心位于 yudao-spring-boot-starter-mq,按“实现维度”组织:
- Redis 实现:模板、通道消息、流消息、监听器、自动装配
- RabbitMQ 实现:自动装配(JSON 序列化)
- Kafka 实现:自动装配(通用错误处理器、重试与死信)
- 事件模型:领域事件定义(如 APK 推送回调)
图表来源
- YudaoRedisMQProducerAutoConfiguration.java
- YudaoRedisMQConsumerAutoConfiguration.java
- YudaoRabbitMQAutoConfiguration.java
- KafkaAutoConfiguration.java
- ApkPushCallbackEvent.java
章节来源
核心组件
- RedisMQTemplate:统一的 Redis 消息发送入口,支持 Pub/Sub 广播 与 Stream 集群消费两种路径,并内置拦截器扩展点。
- 通道消息与流消息:分别对应 Redis 的 Pub/Sub 与 Stream,提供默认键/频道命名策略。
- 监听器体系:通道监听器与流监听器分别绑定到对应消费模型。
- RabbitMQ 自动装配:提供 Jackson2JsonMessageConverter,确保消息体 JSON 序列化一致。
- Kafka 自动装配:提供通用错误处理器,支持重试、跳过反序列化异常、死信恢复器。
章节来源
- RedisMQTemplate.java
- AbstractRedisChannelMessage.java
- AbstractRedisStreamMessage.java
- AbstractRedisChannelMessageListener.java
- AbstractRedisStreamMessageListener.java
- YudaoRabbitMQAutoConfiguration.java
- KafkaAutoConfiguration.java
架构总览
下图展示消息从生产到消费的整体流程,以及三种实现的差异点:
图表来源
详细组件分析
Redis 实现
- 发送模板与拦截器
- RedisMQTemplate 提供两类发送方法:Pub/Sub 与 Stream,并在发送前后触发拦截器生命周期钩子,便于扩展(如多租户上下文注入)。
- 拦截器接口提供发送前/后、消费前/后四个钩子,顺序分别为正序与逆序,确保扩展点的对称性。
- 通道消息与流消息
- 通道消息默认频道名为类名;流消息默认 StreamKey 为类名,二者均避免在消息体中重复序列化频道/键名。
- 监听器与容器
- 通道监听器基于 RedisMessageListenerContainer,按 ChannelTopic 注册。
- 流监听器基于 StreamMessageListenerContainer,自动创建消费者组,支持批量拉取、手动确认与异常不中断消费。
- 消费者名称采用“IP@进程号”组合,避免同机多实例冲突。
- 启动时校验 Redis 版本不低于 5.0.0,以支持 Stream 能力。
- 自动装配
- 生产者自动装配:注入 RedisMQTemplate,并收集全局拦截器。
- 消费者自动装配:根据是否存在监听器 Bean,动态注册 Pub/Sub 与 Stream 的监听容器。
图表来源
- RedisMQTemplate.java
- AbstractRedisMessage.java
- AbstractRedisChannelMessage.java
- AbstractRedisStreamMessage.java
- RedisMessageInterceptor.java
章节来源
- RedisMQTemplate.java
- AbstractRedisChannelMessage.java
- AbstractRedisStreamMessage.java
- YudaoRedisMQConsumerAutoConfiguration.java
- YudaoRedisMQProducerAutoConfiguration.java
RabbitMQ 实现
- 自动装配要点
- 条件化装配:仅当存在 RabbitMQ 相关类时生效。
- Jackson2JsonMessageConverter:统一消息体 JSON 序列化,确保跨服务一致性。
- 多租户扩展
- 通过 BeanPostProcessor 在消息发送前将租户 ID 写入消息头,在消费侧再回填上下文,实现租户隔离。
图表来源
章节来源
Kafka 实现
- 通用错误处理
- 使用 DefaultErrorHandler 与 FixedBackOff 配置重试策略(固定间隔、最大次数)。
- 通过 DeadLetterPublishingRecoverer 将重试耗尽的消息投递至死信主题,避免阻塞主消费链路。
- 对反序列化异常进行特殊处理:自动跳过问题记录并记录详细日志,防止因个别消息导致整体停滞。
- 重试与日志
- 提供重试监听器,输出 Topic、分区、偏移量、Key、异常等关键信息,便于定位问题。
图表来源
章节来源
事件驱动架构与领域事件
- 事件模型
- 以领域事件作为跨模块解耦的契约,事件类具备明确的字段与语义,便于生产者与多个消费者协作。
- 示例事件
- APK 推送成功回调事件:包含设备标识、任务信息、应用版本等,用于任务模块与报表模块协同处理。
图表来源
章节来源
依赖关系分析
- 自动装配导入
- 通过 AutoConfiguration.imports 统一引入 Redis 生产者/消费者、RabbitMQ、Kafka 的自动装配类,确保按需启用。
- 组件耦合
- RedisMQTemplate 与拦截器 松耦合,通过接口扩展实现多租户等横切能力。
- 监听器与容器解耦,容器仅负责注册与生命周期管理,具体反序列化与业务处理由监听器实现。
图表来源
章节来源
性能考量
- Redis
- Pub/Sub:适合低延迟广播场景,注意订阅端数量与网络抖动带来的放大效应 。
- Stream:支持消费者组与批量拉取,建议合理设置批次大小与消费者数量,避免过度竞争。
- 版本要求:确保 Redis 版本≥5.0.0,以获得稳定的 Stream 能力。
- RabbitMQ
- 使用 Jackson2Json 序列化,减少跨服务解析差异;合理设置队列/交换机持久化与确认机制。
- Kafka
- 合理配置重试间隔与最大重试次数,避免雪崩;对反序列化异常快速跳过,降低整体延迟。
- 死信主题用于兜底,避免阻塞主消费链路。
故障排查指南
- Kafka
- 反序列化异常:自动跳过并记录日志,检查消息体格式与序列化配置。
- 重试过多:查看重试监听器日志,定位 Topic/分区/偏移量,必要时人工干预死信主题。
- Redis
- 版本不满足:启动时报错提示版本过低,升级 Redis 至 5.0+。
- 消费组未创建:首次启动会自动创建,若失败需检查权限与键空间。
- 手动确认:确保业务处理完成后正确提交偏移量,避免重复消费。
- RabbitMQ
- 多租户上下文丢失:检查消息头是否正确写入与读取,确保拦截器生效。
章节来源
结论
yudao-cloud 的消息队列能力以“统一模板 + 多实现 + 事件模型”为核心,既保证了不同中间件的兼容性,又通过拦截器与错误处理机制提供了可观测性与可靠性。结合本文的配置与优化建议,可在不同业务场景下选择合适的实现路径,并建立完善的幂等与可靠性投递策略。
附录
- 配置示例(路径指引)
- Redis 生产者模板:YudaoRedisMQProducerAutoConfiguration.java
- Redis 消费者容器(Pub/Sub/Stream):YudaoRedisMQConsumerAutoConfiguration.java
- RabbitMQ 序列化配置:YudaoRabbitMQAutoConfiguration.java
- Kafka 通用错误处理:KafkaAutoConfiguration.java
- 领域事件示例:ApkPushCallbackEvent.java