跳到主要内容

消息队列集成

目录

  1. 简介
  2. 项目结构
  3. 核心组件
  4. 架构总览
  5. 组件详解
  6. 依赖关系分析
  7. 性能与运维
  8. 故障排查指南
  9. 结论
  10. 附录

简介

本文件面向 yudao-cloud 的消息队列集成,聚焦 Kafka 在系统中的应用场景与实现细节,包括但不限于:

  • 日志上报缓冲与异步处理
  • 业务异步解耦与事件驱动架构
  • 生产者实现方式、消息格式设计与分区策略
  • 消费者处理逻辑、并发消费与幂等性保障
  • 配置参数、性能调优与故障恢复
  • 不同业务模块的消息处理模式与最佳实践

项目结构

围绕消息队列的关键模块与文件分布如下:

  • Kafka 配置与主题定义:yudao-framework/yudao-spring-boot-starter-mq
  • Kafka 运行环境与 Canal 集成:deploy/docker/kafka 与 deploy/docker/canal
  • 系统模块的事件消费者:yudao-module-system
  • Redis MQ(对比与补充):yudao-framework/yudao-spring-boot-starter-mq/redis

Mermaid Diagram Code:

graph TB
subgraph "应用模块"
SYS["系统模块<br/>短信/邮件消费者"]
end
subgraph "消息中间件"
KAFKA["Kafka 集群"]
CANAL["Canal 数据同步"]
REDIS["Redis"]
end
subgraph "基础设施"
DOCKER["Docker Compose<br/>Kafka/Canal"]
MONITOR["Kafdrop 监控"]
end
CANAL --> KAFKA
SYS --> KAFKA
SYS -. 可选 .-> REDIS
DOCKER --> KAFKA
DOCKER --> CANAL
MONITOR --> KAFKA

图表来源

章节来源

核心组件

  • Kafka 配置与主题定义
    • 自定义 Kafka 配置类集中定义主题名称,便于统一管理与扩展。
  • Kafka 运行环境
    • 使用 Bitnami Kafka 镜像,启用 SASL_PLAINTEXT 认证,提供控制器与 Broker 内部通信监听器。
  • Canal 集成
    • 通过 Canal 将 MySQL 变更事件路由到 Kafka,动态主题与分区策略按表/库维度配置。
  • 系统模块事件消费者
    • 基于 Spring 事件机制的异步消费者,处理短信与邮件发送等任务。

章节来源

架构总览

下图展示 Kafka 在 yudao-cloud 中的典型工作流:Canal 将数据库变更写入 Kafka,应用模块通过消费者异步处理业务逻辑。

Mermaid Diagram Code:

sequenceDiagram
participant DB as "MySQL"
participant Canal as "Canal"
participant Kafka as "Kafka Broker"
participant App as "应用模块(系统)"
participant Consumer as "事件消费者"
DB->>Canal : "数据变更"
Canal->>Kafka : "写入主题(动态路由)"
App->>Kafka : "订阅主题"
Kafka-->>App : "拉取消息"
App->>Consumer : "发布领域事件"
Consumer-->>App : "异步处理(短信/邮件)"

图表来源

组件详解

Kafka 配置与主题管理

  • 配置类集中管理主题名称,便于跨模块共享与维护。
  • 运行环境采用 Bitnami Kafka,启用 SASL_PLAINTEXT 并配置客户端用户名/密码。
  • Canal 动态主题与分区策略按库/表正则表达式配置,支持按主键字段进行分区哈希。

Mermaid Diagram Code:

classDiagram
class KafkaProperties {
+TopicConfig topic
}
class TopicConfig {
+String deviceToEs
+String appInstallList
+String blackCallBack
+String blackKillCallBack
}
KafkaProperties --> TopicConfig : "组合"

图表来源

章节来源

Kafka 生产者与消费者配置

  • 生产者与消费者使用 Spring Kafka 配置,生产者启用 JSON 序列化,消费者使用 JSON 反序列化与错误处理反序列化器。
  • 消费组 ID、自动偏移重置策略、缺失主题容错等参数集中于配置文件。

Mermaid Diagram Code:

flowchart TD
Start(["启动"]) --> LoadCfg["加载 Kafka 配置<br/>bootstrap-servers/安全/序列化"]
LoadCfg --> Producer["创建生产者<br/>JSON 序列化"]
LoadCfg --> Consumer["创建消费者<br/>JSON 反序列化/错误处理"]
Producer --> Topic["选择主题(静态/动态)"]
Topic --> Send["发送消息"]
Send --> Consume["消费者订阅/拉取"]
Consume --> Handle["业务处理(异步)"]
Handle --> End(["完成"])

图表来源

章节来源

多租户与拦截器集成

  • 通过 EnvironmentPostProcessor 注册 Kafka 生产者拦截器,将租户信息注入消息头,确保跨服务链路的租户上下文一致。

Mermaid Diagram Code:

sequenceDiagram
participant Env as "Environment"
participant PostProc as "TenantKafkaEnvironmentPostProcessor"
participant Producer as "Kafka Producer"
participant Interceptor as "TenantKafkaProducerInterceptor"
Env->>PostProc : "初始化"
PostProc->>Env : "设置 interceptor.classes"
Producer->>Interceptor : "发送前拦截"
Interceptor-->>Producer : "注入租户头"

图表来源

章节来源

事件驱动与异步消费者

  • 系统模块通过领域事件消息定义短信/邮件发送任务,消费者使用 @EventListener + @Async 实现异步处理,降低请求路径延迟。

Mermaid Diagram Code:

classDiagram
class SmsSendMessage {
+Long logId
+String mobile
+Long channelId
+String apiTemplateId
+KeyValue[] params
}
class MailSendConsumer {
+onMessage(message)
}
class SmsSendConsumer {
+onMessage(message)
}
MailSendConsumer ..> SmsSendMessage : "监听事件"
SmsSendConsumer ..> SmsSendMessage : "监听事件"

图表来源

章节来源

Redis MQ 对比与补充

  • Redis MQ 提供 pub/sub 与 Stream 两种模式,支持拦截器扩展(如多租户),并内置 Pending 消息重投任务,保障高可用。
  • 与 Kafka 相比,Redis MQ 更适合轻量级、低延迟的内部广播或短链路事件。

Mermaid Diagram Code:

classDiagram
class RedisMQTemplate {
+send(message)
+convertAndSend(...)
}
class AbstractRedisChannelMessage {
+getChannel()
}
class AbstractRedisStreamMessage {
+getStreamKey()
}
class AbstractRedisChannelMessageListener {
+onMessage(Message)
}
class YudaoRedisMQConsumerAutoConfiguration {
+redisMessageListenerContainer()
+redisStreamMessageListenerContainer()
}
class RedisPendingMessageResendJob {
+schedule()
}
RedisMQTemplate --> AbstractRedisChannelMessage : "pub/sub"
RedisMQTemplate --> AbstractRedisStreamMessage : "stream"
AbstractRedisChannelMessageListener --> AbstractRedisChannelMessage : "监听"
YudaoRedisMQConsumerAutoConfiguration --> AbstractRedisChannelMessageListener : "注册"
YudaoRedisMQConsumerAutoConfiguration --> RedisPendingMessageResendJob : "创建任务"

图表来源

章节来源

依赖关系分析

  • yudao-spring-boot-starter-mq 同时引入 Spring Kafka、Spring Rabbit、RocketMQ Starter,形成统一的 MQ 抽象与多实现支持。
  • Kafka 在 yudao-cloud 中主要通过 Canal 与业务模块协同,系统模块消费者基于 Spring 事件机制解耦。

Mermaid Diagram Code:

graph LR
MQStarter["yudao-spring-boot-starter-mq"] --> Kafka["spring-kafka"]
MQStarter --> Rabbit["spring-rabbit"]
MQStarter --> Rocket["rocketmq-spring-boot-starter"]
Canal["Canal"] --> Kafka
SYS["系统模块消费者"] --> Kafka

图表来源

章节来源

性能与运维

  • Kafka 运行参数
    • 监听器与安全协议、SASL 认证、分区与动态队列配置,确保高可用与可扩展性。
  • Canal 动态主题与分区
    • 按库/表正则表达式路由,按主键字段进行分区哈希,提升吞吐与有序性。
  • 监控与可视化
    • 使用 Kafdrop 可视化查看主题、消费者组与消息状态,便于问题定位。
  • 生产者/消费者配置要点
    • 生产者:ACK 策略、重试次数、JSON 序列化;消费者:自动偏移重置、缺失主题容错、反序列化委托。
  • Redis MQ 幂等与重投
    • Stream Pending 消息重投任务,结合锁机制避免重复处理。

章节来源

故障排查指南

  • 主题不存在或监听器报错
    • 通过配置 missing-topics-fatal=false 降低启动期风险。
  • 消费偏移与回溯
    • 使用 auto-offset-reset 控制首次消费位置,结合消费者组状态排查。
  • 序列化异常
    • 确认 value-serializer/value-deserializer 配置一致,并在消费者侧设置 spring.json.trusted.packages。
  • Canal 路由与分区
    • 检查 dynamicTopic 与 partitionHash 配置是否匹配目标库/表与主键字段。
  • 监控与诊断
    • 使用 Kafdrop 查看主题分区、消费者 lag、消息堆积情况。

章节来源

结论

  • Kafka 在 yudao-cloud 中承担“数据变更事件”的传输枢纽,配合 Canal 实现数据库到消息队列的高效流转。
  • 通过统一的主题命名、拦截器与配置管理,系统实现了多租户上下文传递与跨模块解耦。
  • Redis MQ 作为补充方案,适用于内部广播与短链路事件,具备 Pending 重投能力,增强可靠性。
  • 建议在生产环境中持续优化分区策略、监控指标与重试退避,确保高吞吐与低延迟。

附录

  • 最佳实践
    • 事件建模:以领域事件为核心,明确消息边界与语义。
    • 幂等性:消费者侧实现幂等处理(如去重表/布隆过滤器),结合重投任务兜底。
    • 分区策略:按业务键(如租户/用户/设备)进行分区,保证顺序与负载均衡。
    • 监控告警:关注主题 lag、消费者组存活、序列化异常与网络抖动。
    • 配置治理:集中管理 Kafka 配置与主题命名,避免分散配置带来的维护成本。
用户文档
AI 助手
Agent 列表
请选择一个 Agent 开始对话
AI 问答