跳到主要内容

04-消息管理

核心概念:规则引擎 | 04-消息管理 | 概览

消息管理模块用于记录规则中心与业务系统之间的异步消息传递详情,解决未引入分布式事务场景下的数据一致性问题。


核心概念:规则引擎 | 04-消息管理 | 为什么需要消息管理

为什么需要消息管理

当规则中心的数据发生变更(如规则配置修改、业务绑定变更)时,需要通知各业务系统进行相应的数据同步。由于规则中心与业务系统是独立的微服务,无法使用本地事务保证数据一致性,因此采用 Kafka 消息队列 进行异步通知。

消息管理模块的作用:

  • 记录消息状态:追踪每条消息的发送和处理结果
  • 失败重试:对处理失败的消息支持手动重发
  • 问题排查:记录异常信息,便于定位问题

核心概念:规则引擎 | 04-消息管理 | 1. 消息列表

1. 消息列表

消息列表展示了规则中心发送给业务系统的所有消息记录。

列表查询

1.1 列表字段说明

字段说明
ID消息记录主键
父ID父消息ID,0表示原始消息,非0表示子消息(按业务类型拆分)
规则ID关联的规则ID
消息唯一标识消息的全局唯一标识,用于幂等处理
业务类型消息对应的业务类型
业务ID业务记录ID(如任务ID、广告ID等)
消息消息内容(JSON格式)
状态消息处理状态
已重发次数消息重试次数
kafka主题消息发送的Kafka主题
异常消息处理失败时的异常信息
创建时间消息创建时间

1.2 业务类型

业务类型

业务类型说明
任务推送任务推送业务的消息
launcher广告配置Launcher广告配置的消息
黑名单黑名单业务的消息
uota升级UOTA升级业务的消息
域名分发之域名域名分发业务的消息
域名分发之UOTA域名分发关联UOTA的消息

1.3 消息状态

状态说明
待处理消息已创建,等待处理
处理成功业务系统已成功处理
处理失败业务系统处理失败

核心概念:规则引擎 | 04-消息管理 | 2. 消息重试

2. 消息重试

当消息处理失败时,可以通过重试功能重新发送消息到业务系统。

重发消息到业务系统

2.1 重试操作

点击列表中的 "重试" 按钮,可打开重试配置弹窗:

配置项说明
父ID父消息ID
规则ID关联的规则ID
消息唯一标识消息的全局唯一标识
业务类型消息对应的业务类型
业务ID业务记录ID
消息消息内容(JSON格式)
状态当前消息状态
已重复次数已重试次数
kafka主题可修改重发到的Kafka主题

2.2 重要提示

注意

父ID等于0时,重发消息会导致所有相关业务都收到数据。

当父ID为0时,表示这是一条原始消息,可能会被多个业务系统订阅。重发此类消息会导致所有订阅该消息的业务系统都收到通知。


核心概念:规则引擎 | 04-消息管理 | 3. 其他操作

3. 其他操作

3.1 忽略消息

对于待处理或处理失败的消息,可以选择 "忽略" 操作,不再处理该消息。

适用场景:

  • 消息已过期,无需再处理
  • 业务已通过其他方式解决

3.2 删除消息

点击 "删除" 按钮可删除消息记录。

3.3 查看详情

对于处理失败的消息,可以点击 "详情" 查看完整的异常堆栈信息,便于问题排查。


核心概念:规则引擎 | 04-消息管理 | 4. 业务系统集成

4. 业务系统集成

业务系统需要继承 RuleBpmNoStartEventListener 类来订阅规则中心的变更消息。

4.1 集成方式

@Component
public class MyBusinessEventListener extends RuleBpmNoStartEventListener {

@Override
protected Integer getBusinessType() {
return BusinessTypeEnum.MY_BUSINESS.getType();
}

@Override
protected void onEvent(RuleBpmNoStartEvent event) {
// 处理消息
Long ruleId = event.getRuleId();
Long businessId = event.getBusinessId();
// ... 业务处理逻辑
}
}

4.2 消息处理流程

Mermaid Diagram Code:

flowchart TD
    A[规则中心数据变更] --> B[创建消息记录]
    B --> C[发送到Kafka]
    C --> D[业务系统监听]
    D --> E{处理成功?}
    E -->|是| F[更新消息状态为成功]
    E -->|否| G[更新消息状态为失败]
    G --> H[记录异常信息]
    H --> I[等待重试]

4.3 注意事项

  1. 幂等处理:业务系统需要根据 messageId 做幂等处理,避免重复消费
  2. 异常处理:处理失败时应抛出异常,让消息中心记录失败状态
  3. 业务类型过滤:通过重写 getBusinessType()getBusinessTypes() 方法过滤只处理自己业务类型的消息
开发文档
AI 助手
Agent 列表
请选择一个 Agent 开始对话
AI 问答