跳到主要内容

Kafka事件处理

目录

  1. 简介
  2. 项目结构
  3. 核心组件
  4. 架构总览
  5. 详细组件分析
  6. 依赖关系分析
  7. 性能考量
  8. 故障排查指南
  9. 结论
  10. 附录

简介

本文围绕黑名单模块的Kafka事件处理机制展开,重点阐述两类回调消费者的实现原理与处理流程:设备黑名单回调事件(BlackCallbackConsumer)与黑名单杀死进程回调事件(BlackKillCallbackConsumer)。文档覆盖消息接收、数据解析、业务逻辑处理、结果反馈与联动通知,并对事件类型定义、分区策略、消费者组管理、负载均衡、幂等性与重试机制给出实现方案与最佳实践。

项目结构

黑名单模块的Kafka事件处理由“上报模块(task)→ Kafka → 消费者(blacklist)→ 数据库/下游模块(report)”构成的异步流水线组成。关键文件分布如下:

  • 上报与分发:DeviceEventServiceImpl 将不同类型的设备事件按Topic分发至Kafka
  • 黑名单消费者:BlackCallbackConsumer、BlackKillCallbackConsumer
  • 事件DTO:DeviceBlackCallbackEventDTO、BlackKillCallBackEventDTO、ApkUninstallUpdateEvent
  • 配置:KafkaProperties、application-common.yaml
  • 下游联动:ApkUninstallUpdateConsumer(report模块)

Mermaid Diagram Code:

graph TB
subgraph "设备侧"
D["设备"]
end
subgraph "任务模块(task)"
C1["AppEventReportController"]
S1["DeviceEventServiceImpl"]
end
subgraph "Kafka"
T1["black-call-back"]
T2["black-kill-call-back"]
T3["apk-uninstall-update"]
end
subgraph "黑名单模块(blacklist)"
CON1["BlackCallbackConsumer"]
CON2["BlackKillCallbackConsumer"]
DB1["flow_blacklisted_device"]
DB2["app_kill_record"]
end
subgraph "报表模块(report)"
CON3["ApkUninstallUpdateConsumer"]
DB3["apk_push_history"]
end
D --> C1 --> S1
S1 --> T1
S1 --> T2
S1 --> T3
T1 --> CON1 --> DB1
T2 --> CON2 --> DB2
CON1 --> T3 --> CON3 --> DB3

图表来源

章节来源

核心组件

  • BlackCallbackConsumer:消费黑名单卸载/操作回调,入库flow_blacklisted_device,并向apk-uninstall-update Topic发布ApkUninstallUpdateEvent,驱动报表模块补充cpuid与卸载时间。
  • BlackKillCallbackConsumer:消费黑名单杀死进程回调,入库app_kill_record(TDengine超级表)。
  • 事件DTO:DeviceBlackCallbackEventDTO、BlackKillCallBackEventDTO、ApkUninstallUpdateEvent。
  • 配置:KafkaProperties定义Topic映射,application-common.yaml提供具体Topic名称与Kafka消费者/生产者参数。

章节来源

架构总览

黑名单模块Kafka事件处理采用“上报→Kafka→消费者→数据库/下游”的异步架构,具备削峰填谷、解耦与可扩展能力。消费者通过@KafkaListener监听指定Topic,完成数据落库与跨模块联动。

Mermaid Diagram Code:

sequenceDiagram
participant Dev as "设备"
participant Task as "DeviceEventServiceImpl"
participant Kafka as "Kafka"
participant Black as "BlackCallbackConsumer"
participant DB as "flow_blacklisted_device"
participant Report as "ApkUninstallUpdateConsumer"
Dev->>Task : "POST /task/recoedDeviceEvent"
Task->>Kafka : "发送 DeviceBlackCallbackEventDTO"
Kafka-->>Black : "投递 black-call-back"
Black->>DB : "入库 FlowBlacklistedDeviceDO"
Black->>Kafka : "发送 ApkUninstallUpdateEvent"
Kafka-->>Report : "投递 apk-uninstall-update"
Report->>Report : "更新 cpuid/uninstall_time"

图表来源

详细组件分析

BlackCallbackConsumer(黑名单卸载/操作回调)

  • 监听Topic:${kafka.topic.black-call-back}
  • 组合消费者组:${spring.application.name:blacklist-default}-black-call-back-${spring.kafka.consumer.group-id:default-group}
  • 处理流程:
    1. 校验blackListIds非空
    2. 遍历每个blacklistedId,构造FlowBlacklistedDeviceDO并入库
    3. 从Redis查询AppBlacklistedDO以获取packageName
    4. 发布ApkUninstallUpdateEvent到apk-uninstall-update Topic
  • 幂等性与重试:
    • 当前实现未显式声明重试策略,异常被捕获并记录日志,建议结合Spring Kafka的重试与死信队列策略完善

Mermaid Diagram Code:

flowchart TD
Start(["进入 onMessage"]) --> CheckIds["检查 blackListIds 是否为空"]
CheckIds --> |为空| Warn["记录警告并返回"]
CheckIds --> |非空| Loop["遍历 blacklistedId"]
Loop --> BuildDO["构造 FlowBlacklistedDeviceDO"]
BuildDO --> InsertDB["调用 AppBlacklistedService.createFlowBlacklistedDevice"]
InsertDB --> LoadPkg["从Redis加载 AppBlacklistedDO 获取 packageName"]
LoadPkg --> Publish["发送 ApkUninstallUpdateEvent 到 apk-uninstall-update"]
Publish --> Next["继续下一个 blacklistedId"]
Next --> |循环结束| End(["退出"])
Warn --> End

图表来源

章节来源

BlackKillCallbackConsumer(黑名单杀死进程回调)

  • 监听Topic:${kafka.topic.black-kill-call-back}
  • 组合消费者组:${spring.application.name:blacklist-default}-black-kill-call-back-${spring.kafka.consumer.group-id:default-group}
  • 处理流程:
    1. 校验blackKillCallBackEvents非空
    2. 遍历每条记录,解析日期字符串为Date
    3. 构造AppKillRecordDO并调用IAppKillRecordService.insert入库
  • 数据存储:TDengine超级表app_kill_record,按blackListId分表

Mermaid Diagram Code:

flowchart TD
Start(["进入 onMessage"]) --> CheckList["检查 blackKillCallBackEvents 是否为空"]
CheckList --> |为空| Warn["记录警告并返回"]
CheckList --> |非空| Loop["遍历每条记录"]
Loop --> ParseDate["解析日期字符串为 Date"]
ParseDate --> |解析失败| Skip["跳过该条记录"]
ParseDate --> |解析成功| BuildDO["构造 AppKillRecordDO"]
BuildDO --> InsertDB["调用 IAppKillRecordService.insert"]
InsertDB --> Next["继续下一条"]
Next --> |循环结束| End(["退出"])
Skip --> Next
Warn --> End

图表来源

章节来源

事件类型定义

  • DeviceBlackCallbackEventDTO:包含事件ID、设备MAC、CPU ID、黑名单ID列表、创建时间等字段,用于黑名单操作(如卸载)的回调。
  • BlackKillCallBackEventDTO:包含事件码、设备MAC/CPU、杀死事件列表(含黑名单ID、频率、日期),用于黑名单拦截(Kill进程)的回调。
  • ApkUninstallUpdateEvent:用于补充历史记录表中的cpuid与uninstall_time,作为黑名单卸载后的联动通知。

Mermaid Diagram Code:

classDiagram
class DeviceBlackCallbackEventDTO {
+Long id
+String mac
+Long[] blackListIds
+long createTime
+String cpuid
}
class BlackKillCallBackEventDTO {
+int eventCode
+String mac
+String cpuid
+BlackKillCallBackEventReq[] blackKillCallBackEvents
}
class ApkUninstallUpdateEvent {
+String mac
+String cpuid
+String packageName
+Long versionCode
+LocalDateTime uninstallTime
+LocalDateTime pushTaskTime
}

图表来源

章节来源

消息分区策略、消费者组与负载均衡

  • 分区策略:Kafka默认分区器根据消息key进行哈希分区;若未设置key,消息将轮询分区。黑名单回调通常以设备维度(MAC/CPU)区分,建议在生产端按设备标识设置key,确保相同设备的消息落在同一分区,便于顺序处理与幂等。
  • 消费者组:消费者组ID由应用名与group-id拼接而成,不同实例加入同一组实现负载均衡;跨模块(blacklist/report)各自维护独立组,互不影响。
  • 负载均衡:多实例水平扩展时,Kafka将分区均匀分配给组内消费者,实现横向扩展与吞吐提升。

章节来源

幂等性保证、错误处理与重试机制

  • 幂等性:
    • 建议在生产端设置message.key为设备标识+黑名单ID组合,配合Kafka幂等producer与broker端去重,避免重复入库。
    • 消费端可在入库前增加“去重键”(如设备MAC+黑名单ID+事件ID)的Redis缓存校验,防止重复处理。
  • 错误处理:
    • BlackCallbackConsumer与BlackKillCallbackConsumer均捕获异常并记录日志,未抛出异常,不会触发Kafka自动重试。
  • 重试机制:
    • 建议在application-common.yaml中启用Spring Kafka的重试与死信队列(DLQ),对可恢复异常进行指数退避重试,永久失败的消息进入DLQ以便人工干预。
    • 对于ApkUninstallUpdateConsumer,异常会抛出以触发Kafka重试,确保cpuid/uninstall_time补录的可靠性。

章节来源

依赖关系分析

  • DeviceEventServiceImpl:根据事件类型将数据转换为DTO并发送到对应Topic(black-call-back、black-kill-call-back)。
  • BlackCallbackConsumer:消费black-call-back,入库flow_blacklisted_device,并发布apk-uninstall-update。
  • ApkUninstallUpdateConsumer:消费apk-uninstall-update,更新report模块历史记录的cpuid与卸载时间。
  • BlackKillCallbackConsumer:消费black-kill-call-back,入库TDengine超级表app_kill_record。

Mermaid Diagram Code:

graph LR
S["DeviceEventServiceImpl"] --> |black-call-back| C1["BlackCallbackConsumer"]
S --> |black-kill-call-back| C2["BlackKillCallbackConsumer"]
C1 --> |apk-uninstall-update| C3["ApkUninstallUpdateConsumer"]
C1 --> DB1["flow_blacklisted_device"]
C2 --> DB2["app_kill_record(TDengine)"]
C3 --> DB3["apk_push_history"]

图表来源

章节来源

性能考量

  • 异步解耦:通过Kafka削峰填谷,避免设备上报瞬时高并发冲击数据库。
  • 分区与分表:TDengine超级表按黑名单ID分表,降低热点写入;建议按设备维度设置消息key,提升分区均匀性。
  • 并发与批处理:消费者内部逐条入库,建议评估批量写入与事务边界,减少往返开销。
  • 监控与可观测性:开启DEBUG日志开关(如cn.iocoder.yudao.module.blacklist.kafka.consumer)辅助定位问题。

故障排查指南

  • 消费异常:
    • BlackCallbackConsumer/BlackKillCallbackConsumer捕获异常但不抛出,导致Kafka不会自动重试。建议在异常分支抛出异常或启用Spring Kafka重试。
    • ApkUninstallUpdateConsumer对空参数进行校验并抛出异常,触发Kafka重试。
  • Topic缺失:
    • application-common.yaml中设置missing-topics-fatal: false,避免监听主题不存在时报错。
  • 数据一致性:
    • 若出现重复记录,检查生产端是否设置消息key与消费端去重键;确认TDengine分表键与查询条件一致。

章节来源

结论

黑名单模块的Kafka事件处理通过清晰的事件分发、消费者职责划分与下游联动,实现了设备回调的高效异步处理。建议进一步完善幂等性、重试与死信队列策略,优化分区键与批量写入,以提升整体可靠性与吞吐能力。

附录

  • Topic配置与消费者组命名规范见application-common.yaml与消费者注解中的groupId拼接规则。
  • Kafka属性(acks、retries、auto-offset-reset等)在application-common.yaml中集中配置,便于统一治理。

章节来源

用户文档
AI 助手
Agent 列表
请选择一个 Agent 开始对话
AI 问答