跳到主要内容

事件驱动机制

目录

  1. 引言
  2. 项目结构
  3. 核心组件
  4. 架构总览
  5. 详细组件分析
  6. 依赖分析
  7. 性能考虑
  8. 故障排查指南
  9. 结论
  10. 附录

引言

本技术文档围绕规则引擎的事件驱动机制展开,系统性介绍事件监听体系、事件分发与异步处理、幂等性与重试策略、以及与规则缓存、业务流程的协同工作方式。重点覆盖以下事件类型与监听器:

  • 规则变更事件:RuleAlterEvent 与 RuleAlterEventListener
  • 业务流程事件:BpmRuleStatusListener 对审批流状态变化的响应
  • 缓存清理事件:LocalCacheClearEvent 与 LocalCacheClearEventListener
  • 业务流程无法启动事件:RuleBpmNoStartEvent 与 RuleBpmNoStartEventListener
  • 规则编辑权限检查事件:RuleCanEditEvent/RuleCanEditBusEvent 与 RuleCanEditEventListener

项目结构

规则引擎事件相关代码主要分布在两个模块:

  • yudao-module-rule-api:事件定义、事件监听器基类、事件工具类
  • yudao-module-rule-biz:业务监听器(审批流)、事件工具调用方

Mermaid Diagram Code:

graph TB
subgraph "规则引擎API层"
DTO1["RuleAlterEvent<br/>规则变更事件DTO"]
DTO2["LocalCacheClearEvent<br/>本地缓存清理事件DTO"]
DTO3["RuleBpmNoStartEvent<br/>流程无法启动事件DTO"]
DTO4["RuleCanEditEvent<br/>规则编辑检查请求DTO"]
DTO5["RuleCanEditBusEvent<br/>规则编辑检查回调DTO"]
L1["RuleAlterEventListener<br/>规则变更监听器"]
L2["LocalCacheClearEventListener<br/>本地缓存清理监听器"]
L3["RuleBpmNoStartEventListener<br/>流程无法启动监听器"]
L4["RuleCanEditEventListener<br/>规则编辑检查监听器"]
end
subgraph "规则引擎业务层"
U["RuleUtil<br/>事件发送工具"]
BL["BpmRuleStatusListener<br/>审批流状态监听器"]
end
subgraph "基础设施"
BUS["Spring Cloud Bus<br/>集群广播"]
KAFKA["Kafka<br/>消息通道"]
LITEFLOW["LiteFlow<br/>规则引擎"]
REDIS["Redis<br/>本地缓存"]
end
DTO1 --> L1
DTO2 --> L2
DTO3 --> L3
DTO4 --> L4
DTO5 --> L4
U --> BUS
U --> KAFKA
L1 --> LITEFLOW
L2 --> REDIS
BL --> U

图表来源

章节来源

核心组件

  • 事件定义与广播
    • RuleAlterEvent:规则中心变更通知,通过 Spring Cloud Bus 广播至集群所有实例,触发 LiteFlow 规则链的更新或移除。
    • LocalCacheClearEvent:本地缓存清理通知,通过 Spring Cloud Bus 广播,各实例随机延迟后清理本地缓存,避免缓存雪崩。
    • RuleBpmNoStartEvent:规则绑定的业务需要审批但流程未启动的通知,通过 Kafka 发送并落库,支持重试与补偿。
    • RuleCanEditEvent/RuleCanEditBusEvent:规则编辑权限检查请求与回调,采用“Kafka 请求 + Spring Cloud Bus 广播回调”的跨模块通信。
  • 事件监听器
    • RuleAlterEventListener:接收 RuleAlterEvent,更新或移除 LiteFlow 规则链。
    • LocalCacheClearEventListener:接收 LocalCacheClearEvent,延迟清理本地缓存。
    • RuleBpmNoStartEventListener:接收 RuleBpmNoStartEvent,按业务类型过滤并更新消息状态。
    • RuleCanEditEventListener:接收 RuleCanEditEvent,处理编辑检查并广播回调结果。

章节来源

架构总览

事件驱动的整体流程如下:

  • 规则中心或业务侧通过 RuleUtil 发送事件(Spring Cloud Bus 或 Kafka)。
  • 监听器通过 @EventListener 或 @KafkaListener 接收事件,执行相应处理。
  • 处理完成后,可能更新 LiteFlow 规则链、清理本地缓存、更新业务消息状态或广播回调结果。

Mermaid Diagram Code:

sequenceDiagram
participant Biz as "业务/规则中心"
participant Util as "RuleUtil"
participant Bus as "Spring Cloud Bus"
participant Kafka as "Kafka"
participant L1 as "RuleAlterEventListener"
participant L2 as "LocalCacheClearEventListener"
participant L3 as "RuleBpmNoStartEventListener"
participant L4 as "RuleCanEditEventListener"
Biz->>Util : "调用发送事件方法"
alt 规则变更事件
Util->>Bus : "publishEvent(RuleAlterEvent)"
Bus-->>L1 : "广播事件"
L1->>L1 : "更新/LiteFlow 移除规则链"
else 本地缓存清理事件
Util->>Bus : "publishEvent(LocalCacheClearEvent)"
Bus-->>L2 : "广播事件"
L2->>L2 : "随机延迟后清理本地缓存"
else 流程无法启动事件
Util->>Kafka : "发送 RuleBpmNoStartEvent"
Kafka-->>L3 : "消费事件"
L3->>L3 : "更新消息状态/补偿"
else 规则编辑检查事件
Util->>Kafka : "发送 RuleCanEditEvent"
Kafka-->>L4 : "消费事件"
L4->>Bus : "广播 RuleCanEditBusEvent 回调"
end

图表来源

详细组件分析

规则变更事件与监听器(RuleAlterEvent/RuleAlterEventListener)

  • 触发条件
    • 规则中心变更规则元数据(主要是 EL 表达式),通过 RuleUtil.sendRuleAlterEvent 广播。
  • 处理逻辑
    • 监听器接收事件后,若 EL 数据为空则移除 LiteFlow 规则链;否则重建规则链。
  • 异步与幂等
    • 通过 Spring Cloud Bus 广播,天然异步;监听器内部对空 EL 的处理具备幂等性(多次移除不会报错)。
  • 性能与可靠性
    • LiteFlow 规则链的更新/移除为轻量操作;异常会被捕获并记录,不影响上游事件发送。

Mermaid Diagram Code:

sequenceDiagram
participant Center as "规则中心"
participant Util as "RuleUtil"
participant Bus as "Spring Cloud Bus"
participant Listener as "RuleAlterEventListener"
participant Lite as "LiteFlow"
Center->>Util : "sendRuleAlterEvent(event)"
Util->>Bus : "publishEvent(RuleAlterEvent)"
Bus-->>Listener : "广播事件"
Listener->>Listener : "判空/更新或移除规则链"
Listener->>Lite : "更新/LiteFlow 移除"
Listener-->>Center : "日志记录成功/失败"

图表来源

章节来源

本地缓存清理事件与监听器(LocalCacheClearEvent/LocalCacheClearEventListener)

  • 触发条件
    • 规则中心或业务侧通过 RuleUtil.sendLocalCacheClearEvent 广播缓存清理指令。
  • 处理逻辑
    • 监听器接收事件后,计算 1ms~10s 随机延迟,然后清理指定缓存名称或键。
  • 异步与幂等
    • 使用 ScheduledExecutorService 延迟执行,避免主线程阻塞;对空键清理具备幂等性。
  • 性能与可靠性
    • 随机延迟有效分散缓存失效压力,防止缓存雪崩;异常被捕获记录。

Mermaid Diagram Code:

flowchart TD
Start(["收到 LocalCacheClearEvent"]) --> Calc["计算随机延迟(1ms~10s)"]
Calc --> Schedule["提交延迟任务"]
Schedule --> Exec["执行清理:清空缓存或指定键"]
Exec --> Log["记录清理日志"]
Log --> End(["结束"])

图表来源

章节来源

业务流程无法启动事件与监听器(RuleBpmNoStartEvent/RuleBpmNoStartEventListener)

  • 触发条件
    • 规则状态变为“未启动”且需审批时,通过 RuleUtil.sendBpmNoStartEvent 发送事件。
  • 处理逻辑
    • 监听器按业务类型过滤事件,消费后更新消息状态为成功;异常时更新为失败并记录堆栈。
  • 异步与重试
    • 通过 Kafka 发送并落库,支持重试与死信队列兜底;消息表记录 retryTopic 便于重试路由。

Mermaid Diagram Code:

sequenceDiagram
participant Biz as "业务/规则中心"
participant Util as "RuleUtil"
participant Kafka as "Kafka"
participant Listener as "RuleBpmNoStartEventListener"
participant API as "RuleBusinessApi"
Biz->>Util : "sendBpmNoStartEvent(...)"
Util->>Kafka : "发送 RuleBpmNoStartEvent"
Kafka-->>Listener : "消费事件"
Listener->>Listener : "过滤业务类型"
alt 成功
Listener->>API : "更新消息状态=成功"
else 失败
Listener->>API : "更新消息状态=失败+异常信息"
end

图表来源

章节来源

规则编辑权限检查事件与监听器(RuleCanEditEvent/RuleCanEditBusEvent/RuleCanEditEventListener)

  • 触发条件
    • 规则中心需要检查某规则是否允许编辑,向业务系统发送 RuleCanEditEvent(Kafka)。
  • 处理逻辑
    • 业务系统监听后,按自身规则判断是否可编辑,并通过 Spring Cloud Bus 广播 RuleCanEditBusEvent 回调。
    • 规则中心轮询等待回调,超时或失败则抛出异常。
  • 异步与幂等
    • 请求通过 Kafka,回调通过 Spring Cloud Bus 广播;使用 TimedCache 存储回调结果,具备超时自动清理能力。

Mermaid Diagram Code:

sequenceDiagram
participant Rule as "规则中心"
participant Kafka as "Kafka"
participant Biz as "业务系统"
participant Bus as "Spring Cloud Bus"
Rule->>Kafka : "发送 RuleCanEditEvent"
Kafka-->>Biz : "消费事件"
Biz->>Bus : "广播 RuleCanEditBusEvent(允许/不允许)"
Rule->>Rule : "轮询等待回调/超时处理"

图表来源

章节来源

业务流程事件与规则联动(BpmRuleStatusListener)

  • 触发条件
    • 审批流状态变化事件(来自 BPM 模块)携带规则业务信息。
  • 处理逻辑
    • 监听器根据业务类型与更新类型,调用业务服务进行解绑或处理审批结果。
    • 对删除类型的业务,直接走解绑逻辑,兼容多种业务主表删除场景。

Mermaid Diagram Code:

flowchart TD
Evt["BpmProcessInstanceStatusEvent"] --> Check["解析业务信息<br/>校验业务类型"]
Check --> Del{"删除类型?"}
Del -- 是 --> Unbind["调用解绑服务"]
Del -- 否 --> Handle["调用审批结果处理服务"]
Unbind --> End["结束"]
Handle --> End

图表来源

章节来源

依赖分析

  • Spring Cloud Bus 配置
    • 通过 BusCustomConfiguration 扫描事件 DTO 包路径,启用 RemoteApplicationEvent 的序列化与广播。
  • Kafka 依赖
    • 事件发送与消费依赖 KafkaTemplate 与 @KafkaListener,支持重试与死信队列。
  • LiteFlow
    • 规则变更监听器直接操作 LiteFlow 规则链,实现动态更新。
  • Redis 本地缓存
    • 本地缓存清理监听器通过 LocalCacheUtils 清理指定缓存或键。

Mermaid Diagram Code:

graph LR
BusCfg["BusCustomConfiguration<br/>扫描事件DTO包"] --> DTOs["事件DTO包"]
RuleUtil --> Bus["Spring Cloud Bus"]
RuleUtil --> Kafka["Kafka"]
L1["RuleAlterEventListener"] --> Lite["LiteFlow"]
L2["LocalCacheClearEventListener"] --> Redis["LocalCacheUtils"]

图表来源

章节来源

性能考虑

  • 异步处理
    • Spring Cloud Bus 与 Kafka 均为异步传输,避免阻塞主线程。
  • 缓存清理抖动控制
    • LocalCacheClearEventListener 使用 1ms~10s 随机延迟,降低缓存雪崩风险。
  • 规则链更新
    • RuleAlterEventListener 对空 EL 的处理具备幂等性,减少无效操作。
  • 轮询与超时
    • RuleCanEditEvent 使用定时轮询与超时控制,避免长时间占用线程。

故障排查指南

  • 规则变更未生效
    • 检查 RuleUtil 是否成功发布 RuleAlterEvent;确认监听器日志是否记录成功/失败。
    • 若 EL 为空,监听器会移除规则链;确认业务侧是否期望移除。
  • 缓存清理未触发
    • 检查 LocalCacheClearEvent 的广播是否到达;确认监听器是否收到事件并记录延迟与清理日志。
    • 若缓存键为空,监听器会清理整个缓存名称。
  • 流程无法启动消息未处理
    • 检查 Kafka 主题与消费者组配置;确认 RuleBpmNoStartEventListener 是否按业务类型过滤正确。
    • 查看消息表状态是否更新为成功或失败,并检查异常堆栈。
  • 编辑检查超时
    • 检查 RuleCanEditEvent 是否成功发送到 Kafka;确认业务系统是否广播 RuleCanEditBusEvent。
    • 查看轮询缓存是否正确设置回调结果,以及超时阈值是否合理。

章节来源

结论

规则引擎的事件驱动机制通过 Spring Cloud Bus 与 Kafka 实现跨模块、跨实例的异步通信,结合 LiteFlow 与本地缓存工具,实现了规则链的动态更新与缓存的有序失效。业务流程事件与编辑检查事件分别采用不同的传输与协调策略,确保在多节点部署下的高可用与一致性。建议在生产环境中关注事件广播范围、消费者组配置与缓存抖动控制,以获得更稳定的性能表现。

附录

  • 事件配置与调试要点
    • Spring Cloud Bus:确认 BusCustomConfiguration 的事件 DTO 包扫描路径与广播目标。
    • Kafka:核对主题名称、消费者组与重试配置;必要时开启死信队列。
    • 日志:关注监听器的关键日志输出,定位事件接收与处理异常。
  • 最佳实践
    • 事件幂等:对空 EL 的规则链移除、缓存全量清理等操作应具备幂等性。
    • 异步优先:尽量使用异步事件,避免阻塞关键路径。
    • 随机延迟:缓存清理使用随机延迟,避免集中失效。
    • 超时与重试:编辑检查与流程消息处理应设置合理的超时与重试策略。
用户文档
AI 助手
Agent 列表
请选择一个 Agent 开始对话
AI 问答