From 021dd345e4753cbef55fca4e8a419d5f5455adef Mon Sep 17 00:00:00 2001 From: MIST <7036508@qq.com> Date: Sat, 27 Jun 2026 06:57:35 +0800 Subject: [PATCH] feat: implement Context Intelligence v2 module MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## 目标 实现 Context Intelligence v2 模块(`vip.mate.context.intelligence`),通过动态探测每个 `(provider, model)` 的真实上下文窗口,替代原先硬编码的 128K 静态配置,优化历史消息裁剪与 token 预算分配。 **核心目标**: - 自动学习每个模型的 effective context window,按需分配 token 预算 - 信号采集与推理主线程解耦,零阻塞 - 三层降级保证:动态 snapshot → yml 静态配置 → 128K 硬编码兜底 - 零侵入扩展:新增感知维度只需加 `@EventListener`,ReasoningNode 不变 --- ## v1 复盘 → v2 重构 ### v1 问题清单(附录 A) v1 `vip.mate.context.adaptive` 模块存在 5 个 Bug、4 个死代码、2 个风格问题: | # | v1 问题 | 影响 | |---|---------|------| | Bug 1 | `loopContextWindowTokens` 参数顺序反转 | 窗口查询错位 | | Bug 2 | `GatewayDistribution.recordOverflow` 从未被调用 | 溢出时多样性检测失效(死代码路径) | | Bug 3 | `RUNTIME_MODEL_TYPE` 未注册到 `KeyStrategyFactory` | 跨节点丢失 | | Bug 4 | final-answer 路径不采集成功信号 | 仅 tool-call 路径采集,覆盖不全 | | Bug 5 | `latencyMs` 永远为 0 | 压力推断失效 | | 死代码 1-4 | 7 个 Properties 字段 / `actualModelStates` / `ModelFamily.contextProfile` / `MIN_OBSERVATION_HOURS` 均未使用 | 代码膨胀 | ### v1 架构问题 | 维度 | v1 问题 | v2 改进 | |------|---------|---------| | 通信模式 | 同步直调(ReasoningNode → Monitor → Tracker) | 事件驱动(ApplicationEventPublisher + @EventListener) | | 读写模型 | 读写同路径,强一致 | CQRS 分离,最终一致 | | 信号覆盖 | 仅 tool-call 路径(遗漏 final-answer) | 每次 LLM 调用统一发事件,全覆盖 | | 主线程阻塞 | DB 落库同步阻塞推理线程 | 事件发布 O(1),DB 落库异步 | | 组件耦合 | ReasoningNode 直接依赖 3 层组件 | 仅依赖 `ApplicationEventPublisher` | | P10 clamp | 外部直接修改状态机 `effectiveWindow` 字段 | 移到预算规划器内作为读取天花板,状态机自治 | | 扩展方式 | 改 ReasoningNode 加调用点 | 监听器内加分支或新增 `@EventListener` | --- ## v2 机制 ### 架构:EDA + CQRS ``` ReasoningNode(推理主线程) │ ├─ 读路径(sync, lock-free, 每次 ReAct 循环) │ EnvSnapshotStore.get() → TokenBudgetPlanner.plan() → LoopBudgetConfig → LoopMessageBudgeter │ └─ 写路径(event publish, O(1) non-blocking) ApplicationEventPublisher.publishEvent(LlmSuccessSignal / LlmOverflowSignal) │ ▼ ContextSignalProcessor(@EventListener) ├─ onSuccess: @Async(signalExecutor 线程池) └─ onOverflow: sync(确保 PTL 重试前 confidenceUpper 已更新) ``` ### 核心设计原则 1. **EDA 事件驱动**:信号采集与状态更新通过 Spring 事件总线异步解耦,推理主线程零阻塞 2. **CQRS 读写分离**:写路径(事件消费 + 状态机更新)异步;读路径(snapshot 查询 + 预算规划)同步无锁 3. **调整机制与环境感知分离**:WindowProbe 状态机(上探激进/下探保守)与多样性检测/压力推断是独立关注点 4. **三层降级**:动态 snapshot → yml 静态配置 → 硬编码 fallback,任一层故障不阻断推理 5. **零侵入扩展**:新增感知维度只需监听器内加分支或新增 `@EventListener` ### 三层降级(§8.1) | 层级 | 触发条件 | 数据来源 | |------|----------|----------| | Tier 1 | EnvSnapshot 有效(`effectiveWindow > 0`) | 动态探测值 + 多因子预算规划 | | Tier 2 | EnvSnapshot 为空 | yml `getDefaultMaxInputTokens()` | | Tier 3 | yml 未配置 | 硬编码 `DEFAULT_LOOP_CONTEXT_WINDOW_TOKENS = 128_000` | ### 状态机(WindowProbe.Phase) ``` COLD → PROBING → BINARY_SEARCH → STABLE ↓ DEGRADED(压力升级时降级) ``` - **COLD**:冷启动,用 32K seed - **PROBING**:采样填满 SAMPLE_SIZE/2 时上探 - **BINARY_SEARCH**:二分收敛(收敛阈值 10%) - **STABLE**:稳定窗口,闲置 30 分钟后重探 - **DEGRADED**:压力升级时降级(连续成功 5 次恢复一级) --- ## 数据流 ### 6 条核心数据流 | 数据流 | 方向 | 同步/异步 | 触发频率 | |--------|------|-----------|----------| | 信号采集 | ReasoningNode → Event Bus | sync publish, O(1) | 每次 LLM 调用 | | 成功处理 | Event Bus → Processor → Probe/Diversity/Pressure/Snapshot/DB | async | 每次成功调用 | | 溢出处理 | Event Bus → Processor → Probe/Diversity/Pressure/Snapshot/DB | sync | 每次溢出 | | 快照刷新 | Processor → EnvSnapshotStore | 仅值变化时 | phase 切换/压力变化/多样性变化 | | 预算规划 | ReasoningNode → EnvSnapshotStore → TokenBudgetPlanner | sync, lock-free | 每次 ReAct 循环 | | DB 持久化 | Processor → Repository → DB | async | phase 切换/首次成功 | ### 成功处理流程(异步,7 步) 1. `WindowProbeRegistry.recordSuccess` → 返回 `ProbeUpdate(snapshot, previousPhase)` 2. `BackendDiversityRegistry.recordSuccess` — 多样性检测 3. `PressureInferencer.recordSuccess` — 压力推断(连续成功降级) 4. 计算 safeWindow(多样性 P10) 5. `EnvSnapshotStore.refreshIfChanged` — 条件 CAS(仅关键字段变化时刷新) 6. 条件 DB 持久化(phase 变化或首次成功时) 7. `PersistRetryQueue.retryOnce` — 重试之前失败的持久化 ### 溢出处理流程(同步,5 步) 1. `WindowProbeRegistry.recordOverflow` — 窗口收缩(`overflowShrinkRatio = 0.85`) 2. `BackendDiversityRegistry.recordOverflow` — 修复 v1 Bug 2 3. `PressureInferencer.recordOverflow` — 压力升级 4. `EnvSnapshotStore.refreshIfChanged` — 溢出必然导致窗口收缩,必刷新 5. `WindowStateRepository.persist` — 同步 DB 写入(溢出是低频事件) ### 预算规划流程(读路径,4 步) 1. 确定 base window:`EnvSnapshot.effectiveWindow` 2. 应用 P10 多样性天花板:`min(effectiveWindow, safeWindow)`(仅读取,不修改状态机) 3. 压力因子比例调整:NORMAL 0.60 / ELEVATED 0.70 / HIGH 0.80 / CRITICAL 0.85 4. 计算预算:`historyBudget = available × historyRatio`,`keepTail = historyBudget × keepTailRatio` --- ## v2 组件清单 ### 19 个组件,9 个子包 | 子包 | 组件 | 职责 | |------|------|------| | `event` | `LlmSuccessSignal` | 成功信号事件(异步消费) | | `event` | `LlmOverflowSignal` | 溢出信号事件(同步消费) | | `listener` | `ContextSignalProcessor` | 信号处理器(写路径入口,@EventListener) | | `snapshot` | `EnvSnapshot` | 不可变环境快照(读路径数据源) | | `snapshot` | `EnvSnapshotStore` | 快照缓存(ConcurrentHashMap + AtomicReference) | | `probe` | `WindowProbe` | 窗口探测状态机(5 阶段) | | `probe` | `WindowProbeRegistry` | 多模型探测注册表(ConcurrentHashMap + compute 原子操作) | | `probe` | `WindowProbeSnapshot` | 状态快照 record(用于 DB 持久化和锁外读取) | | `probe` | `ProbeUpdate` | recordSuccess/recordOverflow 返回值 | | `probe` | `WindowStateLoader` | 启动时从 DB 恢复状态 | | `perception` | `BackendDiversityTracker` | 后端多样性检测(per-model 实例,P10 安全窗口) | | `perception` | `BackendDiversityRegistry` | 多模型 tracker 注册表 | | `perception` | `PressureInferencer` | 压力推断(连续成功/溢出/延迟 → ResourcePressure) | | `budget` | `TokenBudgetPlanner` | 多因子预算规划器(读路径出口) | | `budget` | `TokenBudget` | 预算结果 record | | `budget` | `TokenBudgetTrace` | 决策追踪 record(监控/调试用) | | `persist` | `WindowStateRepository` | DB 读写(JdbcTemplate) | | `persist` | `PersistRetryQueue` | 持久化失败重试队列(内存,单次重试) | | `config` | `ContextIntelligenceProperties` | 配置属性(@ConfigurationProperties) | | `config` | `SignalExecutorConfig` | 专用线程池配置 | | `metrics` | `ContextIntelMetrics` | 可观测指标网关(9 项 Micrometer 指标) | | `enums` | `ResourcePressure` | 压力等级枚举(NORMAL/ELEVATED/HIGH/CRITICAL) | | `enums` | `ContextProfile` | 模型上下文画像枚举 | | root | `ContextIntelligenceAutoConfiguration` | 自动配置 | ### v1 → v2 组件映射 | v1 组件 | v2 组件 | 说明 | |---------|---------|------| | `AdaptiveContextTracker` | `WindowProbeRegistry` + `ContextSignalProcessor` | 拆分为注册表 + 处理器 | | `ModelWindowState` | `WindowProbe` | 重新设计,移除 `gatewayMode` 字段 | | `GatewayDistribution` | `BackendDiversityTracker` | 重新设计,per-model 实例 | | `ContextPressureMonitor` | `ContextSignalProcessor` + `PressureInferencer` | 拆分为处理器 + 推断器 | | `DynamicBudgetAllocator` | `TokenBudgetPlanner` | 重新设计,必须读取压力 | | `AdaptiveContextProperties` | `ContextIntelligenceProperties` | 所有字段生效(v1 有 7 个死字段) | --- ## 改动清单 ### 新增文件(28 个) - **v2 模块**:24 个 Java 文件(`vip.mate.context.intelligence.**`) - **数据库迁移**:3 个 SQL(`V161__mate_model_context_state.sql`,h2/mysql/kingbase 三方言) - **设计文档**:`docs/context-intelligence-design -1.md` ### 修改文件(7 个) | 文件 | 改动 | |------|------| | `ReasoningNode.java` | 三层 fallback 预算配置 + LlmSuccessSignal/LlmOverflowSignal 信号发布 | | `AgentGraphBuilder.java` | 注入 v2 组件到 ReasoningNode | | `LoopBudgetConfig.java` | 新增 `fromBudget(TokenBudget)` 工厂方法 | | `MateClawStateKeys.java` | 新增 `RUNTIME_MODEL_TYPE` 状态键 | | `StateGraphReActAgent.java` | 注入 modelType 用于 ContextProfile 阈值判断 | | `StateGraphPlanExecuteAgent.java` | 注入 modelType 用于 ContextProfile 阈值判断 | | `application.yml` | 启用 v2 模块(`mateclaw.context.intelligence.enabled=true`) | --- ## 验证结果 | 验证项 | 结果 | |--------|------| | `mvn compile`(JDK 21) | BUILD SUCCESS,1242 源文件编译通过 | | Flyway migration V161 | Successfully applied,155 migrations 全部通过 | | @SpringBootTest | 3 tests passed | | 代码注释英文化 | 30 个文件,97 处翻译,0 中文残留 | --- ## 可观测指标(9 项) | 指标 | 类型 | Tags | 说明 | |------|------|-------|------| | `context.intel.signal.process.duration` | Timer | type=success\|overflow | 信号处理耗时 | | `context.intel.phase.transition` | Counter | provider,model,from,to | 状态机 phase 切换次数 | | `context.intel.snapshot.refresh` | Counter | provider,model | 快照刷新次数 | | `context.intel.db.persist.failure` | Counter | provider,model | DB 落库失败次数 | | `context.intel.signal.dropped` | Counter | reason | 信号丢弃次数 | | `context.intel.pressure.level` | Gauge | provider,model | 当前压力等级 0/1/2/3 | | `context.intel.effective.window` | Gauge | provider,model | 当前有效窗口 tokens | | `context.intel.phase` | Gauge | provider,model | 当前状态机阶段 0/1/2/3/4 | | `context.intel.diversity.detected` | Gauge | provider,model | 是否检测到多后端 0/1 | --- ## 配置说明 ```yaml mateclaw: context: intelligence: enabled: true # 模块总开关 metrics: enabled: true # 指标监控开关 # 以下为可选项(含默认值) budget: normal-history-ratio: 0.60 # 正常压力下历史消息占比 elevated-history-ratio: 0.70 high-history-ratio: 0.80 critical-history-ratio: 0.85 output-reserve-ratio: 0.15 # 输出预留比例 keep-tail-ratio: 0.60 # 尾部保留比例 probe: sample-size: 20 # 采样数 overflow-shrink-ratio: 0.85 # 溢出收缩比例 binary-convergence: 0.10 # 二分收敛阈值 stale-reprobe-ms: 1800000 # 闲置重探间隔(30 分钟) ``` --- ## 已知限制 1. **冷启动期**:首次调用时 snapshot 为空,走 legacy fallback,需几轮对话后动态预算才生效 2. **不调整 max_output_tokens**:v2 仅优化输入侧(历史消息裁剪),输出上限仍为静态 16384 3. **legacy 比例不可配置**:`TokenBudget.legacy()` 硬编码 0.60/0.40/0.60/0.36(有意设计,保证 v1 兼容和平滑预热) --- ## 后续待办 - [ ] P1:动态 max_output_tokens(接入 `buildChatOptions()`) - [ ] P2:冷启动优化(预填 EnvSnapshotStore from DB) - [ ] P4:硬编码值可配置化(minTailMessages、targetMaxMessages 等) - [ ] 长期:多模型差异化配置 --- .../vip/mate/agent/AgentGraphBuilder.java | 14 +- .../mate/agent/context/LoopBudgetConfig.java | 25 ++ .../agent/graph/StateGraphReActAgent.java | 2 + .../mate/agent/graph/node/ReasoningNode.java | 142 ++++++- .../plan/StateGraphPlanExecuteAgent.java | 2 + .../agent/graph/state/MateClawStateKeys.java | 8 + .../ContextIntelligenceAutoConfiguration.java | 68 ++++ .../intelligence/budget/TokenBudget.java | 35 ++ .../budget/TokenBudgetPlanner.java | 130 +++++++ .../intelligence/budget/TokenBudgetTrace.java | 41 ++ .../config/ContextIntelligenceProperties.java | 217 +++++++++++ .../config/SignalExecutorConfig.java | 43 +++ .../intelligence/enums/ContextProfile.java | 94 +++++ .../intelligence/enums/ResourcePressure.java | 37 ++ .../intelligence/event/LlmOverflowSignal.java | 26 ++ .../intelligence/event/LlmSuccessSignal.java | 29 ++ .../listener/ContextSignalProcessor.java | 198 ++++++++++ .../metrics/ContextIntelMetrics.java | 190 ++++++++++ .../perception/BackendDiversityRegistry.java | 74 ++++ .../perception/BackendDiversityTracker.java | 181 +++++++++ .../perception/PressureInferencer.java | 151 ++++++++ .../persist/PersistRetryQueue.java | 69 ++++ .../persist/WindowStateRepository.java | 203 ++++++++++ .../intelligence/probe/ProbeUpdate.java | 23 ++ .../intelligence/probe/WindowProbe.java | 353 ++++++++++++++++++ .../probe/WindowProbeRegistry.java | 207 ++++++++++ .../probe/WindowProbeSnapshot.java | 43 +++ .../intelligence/probe/WindowStateLoader.java | 24 ++ .../intelligence/snapshot/EnvSnapshot.java | 44 +++ .../snapshot/EnvSnapshotStore.java | 125 +++++++ .../src/main/resources/application.yml | 7 + .../h2/V161__mate_model_context_state.sql | 26 ++ .../V161__mate_model_context_state.sql | 24 ++ .../mysql/V161__mate_model_context_state.sql | 38 ++ 34 files changed, 2890 insertions(+), 3 deletions(-) create mode 100644 mateclaw-server/src/main/java/vip/mate/context/intelligence/ContextIntelligenceAutoConfiguration.java create mode 100644 mateclaw-server/src/main/java/vip/mate/context/intelligence/budget/TokenBudget.java create mode 100644 mateclaw-server/src/main/java/vip/mate/context/intelligence/budget/TokenBudgetPlanner.java create mode 100644 mateclaw-server/src/main/java/vip/mate/context/intelligence/budget/TokenBudgetTrace.java create mode 100644 mateclaw-server/src/main/java/vip/mate/context/intelligence/config/ContextIntelligenceProperties.java create mode 100644 mateclaw-server/src/main/java/vip/mate/context/intelligence/config/SignalExecutorConfig.java create mode 100644 mateclaw-server/src/main/java/vip/mate/context/intelligence/enums/ContextProfile.java create mode 100644 mateclaw-server/src/main/java/vip/mate/context/intelligence/enums/ResourcePressure.java create mode 100644 mateclaw-server/src/main/java/vip/mate/context/intelligence/event/LlmOverflowSignal.java create mode 100644 mateclaw-server/src/main/java/vip/mate/context/intelligence/event/LlmSuccessSignal.java create mode 100644 mateclaw-server/src/main/java/vip/mate/context/intelligence/listener/ContextSignalProcessor.java create mode 100644 mateclaw-server/src/main/java/vip/mate/context/intelligence/metrics/ContextIntelMetrics.java create mode 100644 mateclaw-server/src/main/java/vip/mate/context/intelligence/perception/BackendDiversityRegistry.java create mode 100644 mateclaw-server/src/main/java/vip/mate/context/intelligence/perception/BackendDiversityTracker.java create mode 100644 mateclaw-server/src/main/java/vip/mate/context/intelligence/perception/PressureInferencer.java create mode 100644 mateclaw-server/src/main/java/vip/mate/context/intelligence/persist/PersistRetryQueue.java create mode 100644 mateclaw-server/src/main/java/vip/mate/context/intelligence/persist/WindowStateRepository.java create mode 100644 mateclaw-server/src/main/java/vip/mate/context/intelligence/probe/ProbeUpdate.java create mode 100644 mateclaw-server/src/main/java/vip/mate/context/intelligence/probe/WindowProbe.java create mode 100644 mateclaw-server/src/main/java/vip/mate/context/intelligence/probe/WindowProbeRegistry.java create mode 100644 mateclaw-server/src/main/java/vip/mate/context/intelligence/probe/WindowProbeSnapshot.java create mode 100644 mateclaw-server/src/main/java/vip/mate/context/intelligence/probe/WindowStateLoader.java create mode 100644 mateclaw-server/src/main/java/vip/mate/context/intelligence/snapshot/EnvSnapshot.java create mode 100644 mateclaw-server/src/main/java/vip/mate/context/intelligence/snapshot/EnvSnapshotStore.java create mode 100644 mateclaw-server/src/main/resources/db/migration/h2/V161__mate_model_context_state.sql create mode 100644 mateclaw-server/src/main/resources/db/migration/kingbase/V161__mate_model_context_state.sql create mode 100644 mateclaw-server/src/main/resources/db/migration/mysql/V161__mate_model_context_state.sql diff --git a/mateclaw-server/src/main/java/vip/mate/agent/AgentGraphBuilder.java b/mateclaw-server/src/main/java/vip/mate/agent/AgentGraphBuilder.java index a1222fa67..9b4181c0a 100644 --- a/mateclaw-server/src/main/java/vip/mate/agent/AgentGraphBuilder.java +++ b/mateclaw-server/src/main/java/vip/mate/agent/AgentGraphBuilder.java @@ -132,6 +132,11 @@ public class AgentGraphBuilder { private final vip.mate.goal.service.GoalFollowupService goalFollowupService; private final vip.mate.goal.config.GoalProperties goalProperties; + // Context Intelligence v2 components (design doc §10.2) + private final org.springframework.context.ApplicationEventPublisher eventPublisher; + private final vip.mate.context.intelligence.snapshot.EnvSnapshotStore envSnapshotStore; + private final vip.mate.context.intelligence.budget.TokenBudgetPlanner tokenBudgetPlanner; + /** * Auto-grant resolver wired into the executor so an active * {@code mate_approval_grant} row can skip {@code createPending()} for matching @@ -650,6 +655,9 @@ CompiledGraph buildPlanExecuteGraph(AgentToolSet toolSet, ChatModel chatModel, i .addStrategy(MateClawStateKeys.LLM_CALL_COUNT, KeyStrategy.REPLACE) .addStrategy(MateClawStateKeys.RUNTIME_MODEL_NAME, KeyStrategy.REPLACE) .addStrategy(MateClawStateKeys.RUNTIME_PROVIDER_ID, KeyStrategy.REPLACE) + // v1 Bug 3 fix: RUNTIME_MODEL_TYPE was missing registration, causing it to be lost on multi-node merges; + // ContextProfile.fromModelType fell back to defaults. See design doc §10.3. + .addStrategy(MateClawStateKeys.RUNTIME_MODEL_TYPE, KeyStrategy.REPLACE) // SourceEvidenceLedger: ActionNode 把每轮 ToolResponse 抽取出的 // (sourcePaths, sourceSymbols, failedPaths) merge 进这个 ledger, // 后续 ReasoningNode / FinalAnswerNode 调 validateAnswer 校验 @@ -856,7 +864,8 @@ CompiledGraph buildReActGraph(AgentToolSet toolSet, ChatModel chatModel, int max ReasoningNode reasoningNode = new ReasoningNode(chatModel, toolSet, reasoningEffort, supportsReasoningEffort, streamingHelper, conversationWindowManager, streamTracker, 0, wikiContextService, - skillCatalogRenderer, toolDisclosureService, progressLedgerService); + skillCatalogRenderer, toolDisclosureService, progressLedgerService, + eventPublisher, envSnapshotStore, tokenBudgetPlanner); ActionNode actionNode = new ActionNode(executor, streamTracker); ObservationProcessor observationProcessor = new ObservationProcessor(graphObservationProperties); ObservationNode observationNode = new ObservationNode(observationProcessor, streamTracker); @@ -937,6 +946,9 @@ CompiledGraph buildReActGraph(AgentToolSet toolSet, ChatModel chatModel, int max .addStrategy(MateClawStateKeys.COMPLETION_TOKENS, KeyStrategy.REPLACE) .addStrategy(MateClawStateKeys.RUNTIME_MODEL_NAME, KeyStrategy.REPLACE) .addStrategy(MateClawStateKeys.RUNTIME_PROVIDER_ID, KeyStrategy.REPLACE) + // v1 Bug 3 fix: RUNTIME_MODEL_TYPE was missing registration, causing it to be lost on multi-node merges; + // ContextProfile.fromModelType fell back to defaults. See design doc §10.3. + .addStrategy(MateClawStateKeys.RUNTIME_MODEL_TYPE, KeyStrategy.REPLACE) // SourceEvidenceLedger: ActionNode 把每轮 ToolResponse 抽取出的 // (sourcePaths, sourceSymbols, failedPaths) merge 进这个 ledger, // 后续 ReasoningNode / FinalAnswerNode 调 validateAnswer 校验 diff --git a/mateclaw-server/src/main/java/vip/mate/agent/context/LoopBudgetConfig.java b/mateclaw-server/src/main/java/vip/mate/agent/context/LoopBudgetConfig.java index 01228d6dd..f42fc5b48 100644 --- a/mateclaw-server/src/main/java/vip/mate/agent/context/LoopBudgetConfig.java +++ b/mateclaw-server/src/main/java/vip/mate/agent/context/LoopBudgetConfig.java @@ -1,5 +1,7 @@ package vip.mate.agent.context; +import vip.mate.context.intelligence.budget.TokenBudget; + /** * Configuration for per-reasoning-loop message budgeting. * @@ -137,4 +139,27 @@ public LoopBudgetConfig withReservedPrefixTokens(int reservedPrefixTokens) { return new LoopBudgetConfig(triggerTokens, keepTailTokens, minTailMessages, tailSoftCeilingRatio, reservedPrefixTokens, targetMaxMessages); } + + /** + * v2 factory method: derive LoopBudgetConfig from {@link TokenBudget} (design doc §5.5 / §10.1). + *

+ * Replaces v1's fromAllocation. {@code triggerTokens} and {@code keepTailTokens} are taken directly from + * TokenBudget (computed multi-factorially by {@code TokenBudgetPlanner.plan}); other fields reuse + * the sensible defaults from {@link #forContext}. + *

+ * Boundary protection: TokenBudget may produce values below MIN_TRIGGER_TOKENS / + * MIN_TAIL_TOKENS under very small windows (e.g. 4K); Math.max floors them here, and when tail ≥ trigger + * falls back to {@code trigger - MIN_TRIGGER_TOKENS} (same protection logic as {@link #forContext}). + * + * @param budget budget planned by v2 TokenBudgetPlanner (non-null) + * @return LoopBudgetConfig (validated via compact constructor) + */ + public static LoopBudgetConfig fromBudget(TokenBudget budget) { + int trigger = Math.max(MIN_TRIGGER_TOKENS, budget.compactTriggerTokens()); + int tail = Math.max(MIN_TAIL_TOKENS, budget.keepTailTokens()); + if (tail >= trigger) { + tail = Math.max(MIN_TAIL_TOKENS, trigger - MIN_TRIGGER_TOKENS); + } + return new LoopBudgetConfig(trigger, tail, 4, 1.5, 0, 200); + } } diff --git a/mateclaw-server/src/main/java/vip/mate/agent/graph/StateGraphReActAgent.java b/mateclaw-server/src/main/java/vip/mate/agent/graph/StateGraphReActAgent.java index 6e0fcd01a..7314e3594 100644 --- a/mateclaw-server/src/main/java/vip/mate/agent/graph/StateGraphReActAgent.java +++ b/mateclaw-server/src/main/java/vip/mate/agent/graph/StateGraphReActAgent.java @@ -524,6 +524,8 @@ private Map buildInitialState(String userMessage, String convers inputs.put(COMPLETION_TOKENS, 0); inputs.put(RUNTIME_MODEL_NAME, modelName != null ? modelName : ""); inputs.put(RUNTIME_PROVIDER_ID, runtimeProviderId != null ? runtimeProviderId : ""); + // Context Intelligence v2: inject modelType for ContextProfile.fromModelType() threshold check + inputs.put(RUNTIME_MODEL_TYPE, runtimeModelConfig != null && runtimeModelConfig.getModelType() != null ? runtimeModelConfig.getModelType() : ""); inputs.put(TRACE_ID, UUID.randomUUID().toString().substring(0, 8)); // Multimodal sidecar routing — null when the turn carries no media or diff --git a/mateclaw-server/src/main/java/vip/mate/agent/graph/node/ReasoningNode.java b/mateclaw-server/src/main/java/vip/mate/agent/graph/node/ReasoningNode.java index 0ec0da7dd..7559eb3da 100644 --- a/mateclaw-server/src/main/java/vip/mate/agent/graph/node/ReasoningNode.java +++ b/mateclaw-server/src/main/java/vip/mate/agent/graph/node/ReasoningNode.java @@ -15,6 +15,7 @@ import org.springframework.ai.model.tool.ToolCallingChatOptions; import org.springframework.ai.openai.OpenAiChatOptions; import org.springframework.ai.tool.ToolCallback; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.util.StringUtils; import vip.mate.agent.AgentToolSet; import vip.mate.agent.GraphEventPublisher; @@ -29,6 +30,12 @@ import vip.mate.agent.graph.state.MateClawStateAccessor; import vip.mate.agent.graph.state.MateClawStateKeys; import vip.mate.agent.graph.state.SourceEvidenceLedger; +import vip.mate.context.intelligence.budget.TokenBudget; +import vip.mate.context.intelligence.budget.TokenBudgetPlanner; +import vip.mate.context.intelligence.event.LlmOverflowSignal; +import vip.mate.context.intelligence.event.LlmSuccessSignal; +import vip.mate.context.intelligence.snapshot.EnvSnapshot; +import vip.mate.context.intelligence.snapshot.EnvSnapshotStore; import vip.mate.channel.web.ChatStreamTracker; @@ -350,6 +357,14 @@ private boolean hasWikiTool() { */ private final vip.mate.agent.progress.ProgressLedgerService progressLedgerService; + /** + * Context Intelligence v2 components (design doc §10.1). All three can be null: + * legacy callers / test constructors that don't inject them fall back to the yml fallback in {@link #loopContextWindowTokens()}. + */ + private final ApplicationEventPublisher eventPublisher; + private final EnvSnapshotStore envSnapshotStore; + private final TokenBudgetPlanner tokenBudgetPlanner; + public ReasoningNode(ChatModel chatModel, AgentToolSet toolSet, String reasoningEffort, NodeStreamingChatHelper streamingHelper, ConversationWindowManager conversationWindowManager, @@ -448,6 +463,31 @@ public ReasoningNode(ChatModel chatModel, AgentToolSet toolSet, String reasoning vip.mate.skill.runtime.SkillCatalogRenderer skillCatalogRenderer, vip.mate.tool.disclosure.ToolDisclosureService toolDisclosureService, vip.mate.agent.progress.ProgressLedgerService progressLedgerService) { + this(chatModel, toolSet, reasoningEffort, supportsReasoningEffort, streamingHelper, + conversationWindowManager, streamTracker, maxOutputTokens, wikiContextService, + skillCatalogRenderer, toolDisclosureService, progressLedgerService, + null, null, null); + } + + /** + * Context Intelligence v2 primary constructor (design doc §10.2). + *

+ * Adds 3 parameters: {@code eventPublisher} / {@code envSnapshotStore} / {@code tokenBudgetPlanner}. + * All three can be null (legacy callers / test scenarios); when null, falls back to yml + hardcoded fallback, + * with behavior identical to v1. + */ + public ReasoningNode(ChatModel chatModel, AgentToolSet toolSet, String reasoningEffort, + boolean supportsReasoningEffort, + NodeStreamingChatHelper streamingHelper, + ConversationWindowManager conversationWindowManager, + ChatStreamTracker streamTracker, int maxOutputTokens, + vip.mate.wiki.service.WikiContextService wikiContextService, + vip.mate.skill.runtime.SkillCatalogRenderer skillCatalogRenderer, + vip.mate.tool.disclosure.ToolDisclosureService toolDisclosureService, + vip.mate.agent.progress.ProgressLedgerService progressLedgerService, + ApplicationEventPublisher eventPublisher, + EnvSnapshotStore envSnapshotStore, + TokenBudgetPlanner tokenBudgetPlanner) { this.chatModel = chatModel; this.toolSet = toolSet; this.toolCallbacks = toolSet.callbacks(); @@ -461,6 +501,9 @@ public ReasoningNode(ChatModel chatModel, AgentToolSet toolSet, String reasoning this.wikiContextService = wikiContextService; this.skillCatalogRenderer = skillCatalogRenderer; this.progressLedgerService = progressLedgerService; + this.eventPublisher = eventPublisher; + this.envSnapshotStore = envSnapshotStore; + this.tokenBudgetPlanner = tokenBudgetPlanner; } /** @@ -477,6 +520,74 @@ private int loopContextWindowTokens() { return DEFAULT_LOOP_CONTEXT_WINDOW_TOKENS; } + /** + * Context Intelligence v2: build LoopBudgetConfig (design doc §5.5 / §10.1). + *

+ * Three-tier fallback (§8.1): + *

    + *
  1. envSnapshotStore + tokenBudgetPlanner available and snapshot valid → multi-factor budget planning
  2. + *
  3. snapshot empty (enabled=false or cold start) → fallback to yml loopContextWindowTokens()
  4. + *
  5. yml also unconfigured → fallback to DEFAULT_LOOP_CONTEXT_WINDOW_TOKENS = 128_000
  6. + *
+ * Any exception is caught and falls back to forContext; never affects the reasoning main flow. + */ + private LoopBudgetConfig buildLoopBudgetConfig( + String providerId, String modelName, + int systemTokens, int toolsTokens, int reservedPrefixTokens) { + // Tier 1: attempt dynamic budget + if (envSnapshotStore != null && tokenBudgetPlanner != null) { + try { + EnvSnapshot snapshot = envSnapshotStore.get(providerId, modelName); + int fallbackWindow = loopContextWindowTokens(); + TokenBudget budget = tokenBudgetPlanner.plan(snapshot, systemTokens, toolsTokens, fallbackWindow); + return LoopBudgetConfig.fromBudget(budget) + .withReservedPrefixTokens(reservedPrefixTokens); + } catch (Exception e) { + log.debug("[ReasoningNode] Context Intelligence plan failed, falling back to forContext: {}", + e.getMessage()); + } + } + // Tier 2 + 3: yml / hardcoded fallback + return LoopBudgetConfig.forContext(loopContextWindowTokens()) + .withReservedPrefixTokens(reservedPrefixTokens); + } + + /** + * Context Intelligence v2: publish LLM overflow signal (design doc §5.3). + *

+ * Synchronous event (@EventListener without @Async), ensures confidenceUpper is updated before PTL retry. + * Silently skipped when eventPublisher is null (v1 compatibility). + */ + private void publishOverflowSignal(String provider, String modelName, String modelType, + int attemptedTokens, String traceId) { + if (eventPublisher == null) return; + try { + eventPublisher.publishEvent(new LlmOverflowSignal( + provider, modelName, modelType, attemptedTokens, traceId)); + } catch (Exception e) { + log.debug("[ReasoningNode] publishOverflowSignal failed (non-fatal): {}", e.getMessage()); + } + } + + /** + * Context Intelligence v2: publish LLM success signal (design doc §5.2). + *

+ * Asynchronous event (@Async("signalExecutor")), does not block the reasoning thread. + * Silently skipped when eventPublisher is null (v1 compatibility). + */ + private void publishSuccessSignal(String provider, String modelName, String modelType, + int promptTokens, int completionTokens, + long latencyMs, String traceId) { + if (eventPublisher == null) return; + try { + eventPublisher.publishEvent(new LlmSuccessSignal( + provider, modelName, modelType, + promptTokens, completionTokens, latencyMs, traceId)); + } catch (Exception e) { + log.debug("[ReasoningNode] publishSuccessSignal failed (non-fatal): {}", e.getMessage()); + } + } + public ReasoningNode(ChatModel chatModel, AgentToolSet toolSet, String reasoningEffort, NodeStreamingChatHelper streamingHelper, ConversationWindowManager conversationWindowManager) { @@ -511,6 +622,9 @@ public ReasoningNode(ChatModel chatModel, List toolCallbacks) { this.wikiContextService = null; this.skillCatalogRenderer = null; this.progressLedgerService = null; + this.eventPublisher = null; + this.envSnapshotStore = null; + this.tokenBudgetPlanner = null; } @Override @@ -596,8 +710,15 @@ public Map apply(OverAllState state) throws Exception { int toolsTokens = TokenEstimator.estimateToolsTokens(toolCallbacks); int loopReservedPrefixTokens = systemTokens + toolsTokens + maxOutputTokens + LOOP_PREFIX_AUXILIARY_RESERVE_TOKENS; - LoopBudgetConfig loopCfg = LoopBudgetConfig.forContext(loopContextWindowTokens()) - .withReservedPrefixTokens(loopReservedPrefixTokens); + + // Context Intelligence v2: prefer deriving budget from EnvSnapshot + TokenBudgetPlanner + // (design doc §5.5 / §10.1). When any of the three is null or snapshot is empty, falls back to + // the yml fallback LoopBudgetConfig.forContext(loopContextWindowTokens()). + String ctxRuntimeModelName = state.value(MateClawStateKeys.RUNTIME_MODEL_NAME, ""); + String ctxRuntimeProviderId = state.value(MateClawStateKeys.RUNTIME_PROVIDER_ID, ""); + LoopBudgetConfig loopCfg = buildLoopBudgetConfig( + ctxRuntimeProviderId, ctxRuntimeModelName, + systemTokens, toolsTokens, loopReservedPrefixTokens); LoopMessageBudgeter.Result budgeted = LOOP_BUDGETER.budget(messages, loopCfg); // Only log when the budget actually modified the list — a triggered- // but-no-op pass is normal (history fits comfortably under the tail @@ -743,6 +864,11 @@ public Map apply(OverAllState state) throws Exception { "llmCallCount", nextLlmCallCount )); + // Context Intelligence v2: wrap timing for LlmSuccessSignal.latencyMs + long llmCallStartMs = System.currentTimeMillis(); + String ctxTraceId = state.value(MateClawStateKeys.TRACE_ID, ""); + String ctxRuntimeModelType = state.value(MateClawStateKeys.RUNTIME_MODEL_TYPE, ""); + NodeStreamingChatHelper.StreamResult result; try { result = streamingHelper.streamCall(chatModel, prompt, conversationId, "reasoning"); @@ -751,6 +877,9 @@ public Map apply(OverAllState state) throws Exception { // Prompt 仍带 wiki / runtime context;早期的 tail-only 路径会把 // wiki 段一起丢掉,重试后的 prompt 比原始更短少一层信息。 if (result.isPromptTooLong() && conversationWindowManager != null) { + // Context Intelligence v2: publish overflow signal (synchronous, effective before retry) + publishOverflowSignal(ctxRuntimeProviderId, ctxRuntimeModelName, ctxRuntimeModelType, + result.promptTokens(), ctxTraceId); log.warn("[ReasoningNode] Prompt too long, attempting STRUCTURED compaction and retry"); // MateClawStateAccessor.agentId() returns String per state @@ -843,6 +972,15 @@ public Map apply(OverAllState state) throws Exception { .build(); } + // Context Intelligence v2: publish success signal (asynchronous, does not block reasoning) + // Only published when the final result is non-PTL / non-user-stopped — PTL already published the overflow signal above, + // and user-stop carries no valid token information. + if (!result.isPromptTooLong() && !result.stopped()) { + long latencyMs = System.currentTimeMillis() - llmCallStartMs; + publishSuccessSignal(ctxRuntimeProviderId, ctxRuntimeModelName, ctxRuntimeModelType, + result.promptTokens(), result.completionTokens(), latencyMs, ctxTraceId); + } + // ======= 处理 StreamResult ======= // 用户主动停止且有部分内容 diff --git a/mateclaw-server/src/main/java/vip/mate/agent/graph/plan/StateGraphPlanExecuteAgent.java b/mateclaw-server/src/main/java/vip/mate/agent/graph/plan/StateGraphPlanExecuteAgent.java index 7710241da..217030650 100644 --- a/mateclaw-server/src/main/java/vip/mate/agent/graph/plan/StateGraphPlanExecuteAgent.java +++ b/mateclaw-server/src/main/java/vip/mate/agent/graph/plan/StateGraphPlanExecuteAgent.java @@ -299,6 +299,8 @@ private Map buildInitialState(String userMessage, String convers inputs.put(MateClawStateKeys.COMPLETION_TOKENS, 0); inputs.put(MateClawStateKeys.RUNTIME_MODEL_NAME, modelName != null ? modelName : ""); inputs.put(MateClawStateKeys.RUNTIME_PROVIDER_ID, runtimeProviderId != null ? runtimeProviderId : ""); + // Context Intelligence v2: inject modelType for ContextProfile.fromModelType() threshold check + inputs.put(MateClawStateKeys.RUNTIME_MODEL_TYPE, runtimeModelConfig != null && runtimeModelConfig.getModelType() != null ? runtimeModelConfig.getModelType() : ""); inputs.put(MateClawStateKeys.TRACE_ID, UUID.randomUUID().toString().substring(0, 8)); if (currentTurn.routingDecision() != null diff --git a/mateclaw-server/src/main/java/vip/mate/agent/graph/state/MateClawStateKeys.java b/mateclaw-server/src/main/java/vip/mate/agent/graph/state/MateClawStateKeys.java index 56b66517c..060871597 100644 --- a/mateclaw-server/src/main/java/vip/mate/agent/graph/state/MateClawStateKeys.java +++ b/mateclaw-server/src/main/java/vip/mate/agent/graph/state/MateClawStateKeys.java @@ -159,6 +159,14 @@ private MateClawStateKeys() { // ===== 运行时模型快照(REPLACE 策略,buildInitialState 注入)===== public static final String RUNTIME_MODEL_NAME = "runtime_model_name"; public static final String RUNTIME_PROVIDER_ID = "runtime_provider_id"; + /** + * Runtime model type (chat / reasoning / embedding / ...), injected by buildInitialState. + *

+ * Context Intelligence v2 uses {@code ContextProfile.fromModelType(modelType)} + * to decide whether to participate in dynamic window probing ({@code isDynamic() && hasTextContext()}). + * v1 failed to register this key, causing it to be lost on multi-node merges, and ContextProfile inference fell back to defaults. + */ + public static final String RUNTIME_MODEL_TYPE = "runtime_model_type"; // ===== RFC-052: Tool returnDirect 与数据隔离 ===== diff --git a/mateclaw-server/src/main/java/vip/mate/context/intelligence/ContextIntelligenceAutoConfiguration.java b/mateclaw-server/src/main/java/vip/mate/context/intelligence/ContextIntelligenceAutoConfiguration.java new file mode 100644 index 000000000..fe5fbf4bb --- /dev/null +++ b/mateclaw-server/src/main/java/vip/mate/context/intelligence/ContextIntelligenceAutoConfiguration.java @@ -0,0 +1,68 @@ +package vip.mate.context.intelligence; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import vip.mate.context.intelligence.config.ContextIntelligenceProperties; +import vip.mate.context.intelligence.listener.ContextSignalProcessor; +import vip.mate.context.intelligence.metrics.ContextIntelMetrics; +import vip.mate.context.intelligence.perception.BackendDiversityRegistry; +import vip.mate.context.intelligence.perception.PressureInferencer; +import vip.mate.context.intelligence.persist.PersistRetryQueue; +import vip.mate.context.intelligence.persist.WindowStateRepository; +import vip.mate.context.intelligence.probe.WindowProbeRegistry; +import vip.mate.context.intelligence.snapshot.EnvSnapshotStore; + +/** + * Context Intelligence v2 auto-configuration entry. + *

+ * Assembly strategy (following project conventions, see {@code HookAutoConfiguration} / {@code MemoryAutoConfiguration}): + *

+ * + *

Default enabled=false (design doc §11 Phase 1: new module coexists without affecting the existing system). + * After integration is complete, explicitly enable it via {@code application.yml}.

+ * + * @author MateClaw Team + */ +@Configuration +@EnableConfigurationProperties(ContextIntelligenceProperties.class) +public class ContextIntelligenceAutoConfiguration { + + /** + * Signal processor bean (write path entry). + *

+ * Registered only when {@code mateclaw.context.intelligence.enabled=true}. + * When disabled, no events are published and no listeners exist; the read path relies on EnvSnapshotStore returning EMPTY to trigger fallback. + */ + @Bean + @ConditionalOnProperty(name = "mateclaw.context.intelligence.enabled", havingValue = "true") + public ContextSignalProcessor contextSignalProcessor( + WindowProbeRegistry windowProbeRegistry, + BackendDiversityRegistry diversityRegistry, + PressureInferencer pressureInferencer, + EnvSnapshotStore envSnapshotStore, + WindowStateRepository windowStateRepository, + PersistRetryQueue persistRetryQueue, + ContextIntelMetrics metrics) { + return new ContextSignalProcessor( + windowProbeRegistry, + diversityRegistry, + pressureInferencer, + envSnapshotStore, + windowStateRepository, + persistRetryQueue, + metrics); + } +} diff --git a/mateclaw-server/src/main/java/vip/mate/context/intelligence/budget/TokenBudget.java b/mateclaw-server/src/main/java/vip/mate/context/intelligence/budget/TokenBudget.java new file mode 100644 index 000000000..650951ba5 --- /dev/null +++ b/mateclaw-server/src/main/java/vip/mate/context/intelligence/budget/TokenBudget.java @@ -0,0 +1,35 @@ +package vip.mate.context.intelligence.budget; + +/** + * Token budget (return value of {@link TokenBudgetPlanner}). + *

+ * Computed by the multi-factor budget planner from effectiveWindow + pressure + diversity. + * + * @param historyBudget available tokens for history messages + * @param injectionBudget available tokens for injected content (memory/wiki/tool) + * @param compactTriggerTokens threshold that triggers compaction + * @param keepTailTokens tail-retained tokens + * @param trace decision trace (for monitoring, nullable) + * @author MateClaw Team + */ +public record TokenBudget( + int historyBudget, + int injectionBudget, + int compactTriggerTokens, + int keepTailTokens, + TokenBudgetTrace trace +) { + /** + * fallback factory: derive from yml static values when the snapshot is empty. + *

+ * Ratios are consistent with v1: history 60% / injection 40% / compactTrigger 60% / keepTail 36%. + */ + public static TokenBudget legacy(int fallbackWindow) { + return new TokenBudget( + (int) (fallbackWindow * 0.60), // historyBudget + (int) (fallbackWindow * 0.40), // injectionBudget + (int) (fallbackWindow * 0.60), // compactTrigger + (int) (fallbackWindow * 0.36), // keepTail + null); // no trace + } +} diff --git a/mateclaw-server/src/main/java/vip/mate/context/intelligence/budget/TokenBudgetPlanner.java b/mateclaw-server/src/main/java/vip/mate/context/intelligence/budget/TokenBudgetPlanner.java new file mode 100644 index 000000000..dd8fa7cb1 --- /dev/null +++ b/mateclaw-server/src/main/java/vip/mate/context/intelligence/budget/TokenBudgetPlanner.java @@ -0,0 +1,130 @@ +package vip.mate.context.intelligence.budget; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import vip.mate.context.intelligence.config.ContextIntelligenceProperties; +import vip.mate.context.intelligence.enums.ResourcePressure; +import vip.mate.context.intelligence.snapshot.EnvSnapshot; + +/** + * Multi-factor budget planner (exit of the read path). + *

+ * Combines EnvSnapshot's effectiveWindow + pressure + diversity to compute the token budget + * for each round of the ReAct loop. + *

+ * Three-tier fallback (§8.1): + *

    + *
  1. EnvSnapshot valid (effectiveWindow > 0) -> dynamic value + multi-factor budget planning
  2. + *
  3. EnvSnapshot empty -> fall back to yml (caller passes fallbackWindow)
  4. + *
  5. No yml config either -> fall back to 128K (handled by caller)
  6. + *
+ *

+ * Key design (§5.5 / C.5): + *

+ * + * @author MateClaw Team + */ +@Slf4j +@Component +public class TokenBudgetPlanner { + + private final ContextIntelligenceProperties props; + + public TokenBudgetPlanner(ContextIntelligenceProperties props) { + this.props = props; + } + + /** + * Plan the token budget. + * + * @param snapshot environment snapshot (may be EMPTY) + * @param systemTokens token count of the system message + * @param toolsTokens token count of tool declarations + * @param fallbackWindow yml/hardcoded fallback window (used when snapshot is invalid) + * @return TokenBudget (includes trace for monitoring) + */ + public TokenBudget plan(EnvSnapshot snapshot, int systemTokens, int toolsTokens, int fallbackWindow) { + // Step 1: determine the base window + int baseWindow = calcBaseWindow(snapshot); + if (baseWindow <= 0) { + return buildLegacyBudget(fallbackWindow, systemTokens, toolsTokens, "fallback_yml"); + } + + // Step 2: apply the diversity ceiling (P10 clamp) + int safeWindowApplied = 0; + int ceiledWindow = baseWindow; + if (snapshot.diversityDetected() && snapshot.safeWindow() > 0 && snapshot.safeWindow() < baseWindow) { + safeWindowApplied = snapshot.safeWindow(); + ceiledWindow = snapshot.safeWindow(); + } + + // Step 3: adjust the allocation ratio based on the pressure factor + double historyRatio = adjustPressureRatio(snapshot.pressure()); + + // Step 4: compute the budget for each part + ContextIntelligenceProperties.Budget budgetCfg = props.getBudget(); + int outputReserve = (int) (ceiledWindow * budgetCfg.getOutputReserveRatio()); + int available = ceiledWindow - systemTokens - toolsTokens - outputReserve; + if (available <= 0) { + return buildLegacyBudget(fallbackWindow, systemTokens, toolsTokens, "fallback_hardcode"); + } + + int historyBudget = (int) (available * historyRatio); + int injectionBudget = available - historyBudget; + int compactTrigger = historyBudget; + int keepTail = (int) (historyBudget * budgetCfg.getKeepTailRatio()); + + String reason = safeWindowApplied > 0 ? "diversity_clamp" : "normal"; + TokenBudgetTrace trace = new TokenBudgetTrace( + baseWindow, ceiledWindow, safeWindowApplied, + snapshot.pressure(), historyRatio, + systemTokens, toolsTokens, outputReserve, + false, // legacyFallback + null, 0, 0, // phase/confidence fields not populated yet (requires extra Registry lookup) + reason + ); + return new TokenBudget(historyBudget, injectionBudget, compactTrigger, keepTail, trace); + } + + // ==================== private methods (C.5 normalization)==================== + + private int calcBaseWindow(EnvSnapshot snapshot) { + if (snapshot == null || !snapshot.isAvailable()) { + return 0; + } + return Math.max(0, snapshot.effectiveWindow()); + } + + private double adjustPressureRatio(ResourcePressure pressure) { + ContextIntelligenceProperties.Budget budgetCfg = props.getBudget(); + return switch (pressure) { + case NORMAL -> budgetCfg.getNormalHistoryRatio(); // default 0.60 + case ELEVATED -> budgetCfg.getElevatedHistoryRatio(); // default 0.70 + case HIGH -> budgetCfg.getHighHistoryRatio(); // default 0.80 + case CRITICAL -> budgetCfg.getCriticalHistoryRatio(); // default 0.85 + }; + } + + private TokenBudget buildLegacyBudget(int fallbackWindow, int systemTokens, int toolsTokens, String reason) { + TokenBudget legacy = TokenBudget.legacy(fallbackWindow); + TokenBudgetTrace trace = new TokenBudgetTrace( + 0, 0, 0, + ResourcePressure.NORMAL, 0.60, + systemTokens, toolsTokens, 0, + true, // legacyFallback + null, 0, 0, + reason + ); + return new TokenBudget( + legacy.historyBudget(), + legacy.injectionBudget(), + legacy.compactTriggerTokens(), + legacy.keepTailTokens(), + trace + ); + } +} diff --git a/mateclaw-server/src/main/java/vip/mate/context/intelligence/budget/TokenBudgetTrace.java b/mateclaw-server/src/main/java/vip/mate/context/intelligence/budget/TokenBudgetTrace.java new file mode 100644 index 000000000..9e112cb39 --- /dev/null +++ b/mateclaw-server/src/main/java/vip/mate/context/intelligence/budget/TokenBudgetTrace.java @@ -0,0 +1,41 @@ +package vip.mate.context.intelligence.budget; + +import vip.mate.context.intelligence.enums.ResourcePressure; + +/** + * Budget decision trace record (for monitoring/debugging, to locate the root cause of budget anomalies). + *

+ * Named {@code TokenBudgetTrace} rather than {@code BudgetTrace}, because the nested record + * {@code LoopMessageBudgeter.BudgetTrace} already exists; a top-level class with the same name + * would cause import ambiguity. + * + * @param baseWindow original effectiveWindow + * @param ceiledWindow window after applying the P10 ceiling + * @param safeWindowApplied safeWindow actually applied (0 means not applied) + * @param pressure pressure level at decision time + * @param historyRatio history ratio actually used + * @param systemTokens system message tokens + * @param toolsTokens tool declaration tokens + * @param outputReserve reserved output tokens + * @param legacyFallback whether the yml fallback was used + * @param phase WindowProbe phase at decision time (COLD/PROBING/...), null means unknown + * @param confidenceLower lower bound of the state machine confidence at decision time + * @param confidenceUpper upper bound of the state machine confidence at decision time + * @param reason decision path marker: normal / diversity_clamp / pressure_adjust / fallback_yml / fallback_hardcode + * @author MateClaw Team + */ +public record TokenBudgetTrace( + int baseWindow, + int ceiledWindow, + int safeWindowApplied, + ResourcePressure pressure, + double historyRatio, + int systemTokens, + int toolsTokens, + int outputReserve, + boolean legacyFallback, + String phase, + int confidenceLower, + int confidenceUpper, + String reason +) {} diff --git a/mateclaw-server/src/main/java/vip/mate/context/intelligence/config/ContextIntelligenceProperties.java b/mateclaw-server/src/main/java/vip/mate/context/intelligence/config/ContextIntelligenceProperties.java new file mode 100644 index 000000000..baf5c04f0 --- /dev/null +++ b/mateclaw-server/src/main/java/vip/mate/context/intelligence/config/ContextIntelligenceProperties.java @@ -0,0 +1,217 @@ +package vip.mate.context.intelligence.config; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * Context Intelligence module configuration properties. + *

+ * See design doc Appendix C.9 for full configuration (authoritative config). + * + * @author MateClaw Team + */ +@ConfigurationProperties(prefix = "mateclaw.context.intelligence") +public class ContextIntelligenceProperties { + + /** Module master switch; when false, the read path falls back to yml + hardcoded values */ + private boolean enabled = false; + + /** Backend diversity detection switch: auto (auto-detect) / off (disabled) */ + private String diversityDetection = "auto"; + + /** Pressure inference switch */ + private boolean pressureInference = true; + + /** DB persistence switch */ + private boolean dbPersist = true; + + /** Dedicated thread pool config */ + private Executor executor = new Executor(); + + /** Window probe state machine parameters */ + private Probe probe = new Probe(); + + /** Diversity detection parameters */ + private Diversity diversity = new Diversity(); + + /** Pressure inference parameters */ + private Pressure pressure = new Pressure(); + + /** Budget allocation ratios */ + private Budget budget = new Budget(); + + /** Idle model cleanup */ + private AutoClean autoClean = new AutoClean(); + + /** Metrics monitoring */ + private Metrics metrics = new Metrics(); + + // --- getters/setters --- + + public boolean isEnabled() { return enabled; } + public void setEnabled(boolean enabled) { this.enabled = enabled; } + + public String getDiversityDetection() { return diversityDetection; } + public void setDiversityDetection(String diversityDetection) { this.diversityDetection = diversityDetection; } + + public boolean isPressureInference() { return pressureInference; } + public void setPressureInference(boolean pressureInference) { this.pressureInference = pressureInference; } + + public boolean isDbPersist() { return dbPersist; } + public void setDbPersist(boolean dbPersist) { this.dbPersist = dbPersist; } + + public Executor getExecutor() { return executor; } + public void setExecutor(Executor executor) { this.executor = executor; } + + public Probe getProbe() { return probe; } + public void setProbe(Probe probe) { this.probe = probe; } + + public Diversity getDiversity() { return diversity; } + public void setDiversity(Diversity diversity) { this.diversity = diversity; } + + public Pressure getPressure() { return pressure; } + public void setPressure(Pressure pressure) { this.pressure = pressure; } + + public Budget getBudget() { return budget; } + public void setBudget(Budget budget) { this.budget = budget; } + + public AutoClean getAutoClean() { return autoClean; } + public void setAutoClean(AutoClean autoClean) { this.autoClean = autoClean; } + + public Metrics getMetrics() { return metrics; } + public void setMetrics(Metrics metrics) { this.metrics = metrics; } + + // --- nested classes --- + + public static class Executor { + private int coreSize = 8; + private int maxQueueSize = 500; + + public int getCoreSize() { return coreSize; } + public void setCoreSize(int coreSize) { this.coreSize = coreSize; } + + public int getMaxQueueSize() { return maxQueueSize; } + public void setMaxQueueSize(int maxQueueSize) { this.maxQueueSize = maxQueueSize; } + } + + public static class Probe { + private int sampleSize = 20; + private double overflowShrinkRatio = 0.85; + private double binaryConvergence = 0.10; + private long staleReprobeMs = 1_800_000; + private int coldSeedFallback = 32_768; + private int globalCeiling = 2_000_000; + private long stablePersistIntervalMs = 180_000; + + public int getSampleSize() { return sampleSize; } + public void setSampleSize(int sampleSize) { this.sampleSize = sampleSize; } + + public double getOverflowShrinkRatio() { return overflowShrinkRatio; } + public void setOverflowShrinkRatio(double overflowShrinkRatio) { this.overflowShrinkRatio = overflowShrinkRatio; } + + public double getBinaryConvergence() { return binaryConvergence; } + public void setBinaryConvergence(double binaryConvergence) { this.binaryConvergence = binaryConvergence; } + + public long getStaleReprobeMs() { return staleReprobeMs; } + public void setStaleReprobeMs(long staleReprobeMs) { this.staleReprobeMs = staleReprobeMs; } + + public int getColdSeedFallback() { return coldSeedFallback; } + public void setColdSeedFallback(int coldSeedFallback) { this.coldSeedFallback = coldSeedFallback; } + + public int getGlobalCeiling() { return globalCeiling; } + public void setGlobalCeiling(int globalCeiling) { this.globalCeiling = globalCeiling; } + + public long getStablePersistIntervalMs() { return stablePersistIntervalMs; } + public void setStablePersistIntervalMs(long stablePersistIntervalMs) { this.stablePersistIntervalMs = stablePersistIntervalMs; } + } + + public static class Diversity { + private int maxObservations = 500; + private int safePercentile = 10; + private double successVarianceThreshold = 5.0; + private double overflowRatioThreshold = 3.0; + private int decayHours = 24; + + public int getMaxObservations() { return maxObservations; } + public void setMaxObservations(int maxObservations) { this.maxObservations = maxObservations; } + + public int getSafePercentile() { return safePercentile; } + public void setSafePercentile(int safePercentile) { this.safePercentile = safePercentile; } + + public double getSuccessVarianceThreshold() { return successVarianceThreshold; } + public void setSuccessVarianceThreshold(double successVarianceThreshold) { this.successVarianceThreshold = successVarianceThreshold; } + + public double getOverflowRatioThreshold() { return overflowRatioThreshold; } + public void setOverflowRatioThreshold(double overflowRatioThreshold) { this.overflowRatioThreshold = overflowRatioThreshold; } + + public int getDecayHours() { return decayHours; } + public void setDecayHours(int decayHours) { this.decayHours = decayHours; } + } + + public static class Pressure { + private int successDegradeStep = 5; + private int overflowEscalateThreshold = 3; + private long latencyEscalateMs = 30_000; + private int minLatencySamples = 3; + + public int getSuccessDegradeStep() { return successDegradeStep; } + public void setSuccessDegradeStep(int successDegradeStep) { this.successDegradeStep = successDegradeStep; } + + public int getOverflowEscalateThreshold() { return overflowEscalateThreshold; } + public void setOverflowEscalateThreshold(int overflowEscalateThreshold) { this.overflowEscalateThreshold = overflowEscalateThreshold; } + + public long getLatencyEscalateMs() { return latencyEscalateMs; } + public void setLatencyEscalateMs(long latencyEscalateMs) { this.latencyEscalateMs = latencyEscalateMs; } + + public int getMinLatencySamples() { return minLatencySamples; } + public void setMinLatencySamples(int minLatencySamples) { this.minLatencySamples = minLatencySamples; } + } + + public static class Budget { + private double normalHistoryRatio = 0.60; + private double elevatedHistoryRatio = 0.70; + private double highHistoryRatio = 0.80; + private double criticalHistoryRatio = 0.85; + private double outputReserveRatio = 0.15; + private double keepTailRatio = 0.60; + + public double getNormalHistoryRatio() { return normalHistoryRatio; } + public void setNormalHistoryRatio(double v) { this.normalHistoryRatio = v; } + + public double getElevatedHistoryRatio() { return elevatedHistoryRatio; } + public void setElevatedHistoryRatio(double v) { this.elevatedHistoryRatio = v; } + + public double getHighHistoryRatio() { return highHistoryRatio; } + public void setHighHistoryRatio(double v) { this.highHistoryRatio = v; } + + public double getCriticalHistoryRatio() { return criticalHistoryRatio; } + public void setCriticalHistoryRatio(double v) { this.criticalHistoryRatio = v; } + + public double getOutputReserveRatio() { return outputReserveRatio; } + public void setOutputReserveRatio(double v) { this.outputReserveRatio = v; } + + public double getKeepTailRatio() { return keepTailRatio; } + public void setKeepTailRatio(double v) { this.keepTailRatio = v; } + } + + public static class AutoClean { + private boolean enabled = true; + private long idleThresholdMs = 3_600_000; + private long cleanIntervalMs = 3_600_000; + + public boolean isEnabled() { return enabled; } + public void setEnabled(boolean enabled) { this.enabled = enabled; } + + public long getIdleThresholdMs() { return idleThresholdMs; } + public void setIdleThresholdMs(long idleThresholdMs) { this.idleThresholdMs = idleThresholdMs; } + + public long getCleanIntervalMs() { return cleanIntervalMs; } + public void setCleanIntervalMs(long cleanIntervalMs) { this.cleanIntervalMs = cleanIntervalMs; } + } + + public static class Metrics { + private boolean enabled = true; + + public boolean isEnabled() { return enabled; } + public void setEnabled(boolean enabled) { this.enabled = enabled; } + } +} diff --git a/mateclaw-server/src/main/java/vip/mate/context/intelligence/config/SignalExecutorConfig.java b/mateclaw-server/src/main/java/vip/mate/context/intelligence/config/SignalExecutorConfig.java new file mode 100644 index 000000000..deef6490a --- /dev/null +++ b/mateclaw-server/src/main/java/vip/mate/context/intelligence/config/SignalExecutorConfig.java @@ -0,0 +1,43 @@ +package vip.mate.context.intelligence.config; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Dedicated thread pool config for signal processing. + *

+ * Provides an independent thread pool for {@code @Async("signalExecutor")}, + * isolated from the virtual thread executor of the global {@code AsyncSecurityConfig}, + * to prevent signal processing tasks from crowding out business async task quota. + *

+ * Rejection policy uses {@code CallerRunsPolicy}: when the queue is full, falls back to the caller thread + * (reactor thread); losing a signal is not fatal (the next one will arrive), but fallback is safer than discard. + *

+ * Created only when {@code mateclaw.context.intelligence.enabled=true}; + * when disabled there are no listeners and thus no need for a thread pool. + * + * @author MateClaw Team + */ +@Configuration +public class SignalExecutorConfig { + + @Bean("signalExecutor") + @ConditionalOnProperty(name = "mateclaw.context.intelligence.enabled", havingValue = "true") + public ThreadPoolTaskExecutor signalExecutor(ContextIntelligenceProperties props) { + ThreadPoolTaskExecutor ex = new ThreadPoolTaskExecutor(); + int core = props.getExecutor().getCoreSize(); + ex.setCorePoolSize(core); + ex.setMaxPoolSize(core * 2); + ex.setQueueCapacity(props.getExecutor().getMaxQueueSize()); + ex.setThreadNamePrefix("ctx-signal-"); + ex.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + ex.setWaitForTasksToCompleteOnShutdown(true); + ex.setAwaitTerminationSeconds(10); + ex.initialize(); + return ex; + } +} diff --git a/mateclaw-server/src/main/java/vip/mate/context/intelligence/enums/ContextProfile.java b/mateclaw-server/src/main/java/vip/mate/context/intelligence/enums/ContextProfile.java new file mode 100644 index 000000000..6d1f7e3af --- /dev/null +++ b/mateclaw-server/src/main/java/vip/mate/context/intelligence/enums/ContextProfile.java @@ -0,0 +1,94 @@ +package vip.mate.context.intelligence.enums; + +/** + * Model context profile enum. + *

+ * Used for gating: {@code profile.isDynamic() && profile.hasTextContext()} determines whether + * a model participates in context intelligence (dynamic window probing + budget planning). + *

+ * The v2 initial release does not implement modelType routing (chat vs reasoning allocation ratio + * differences); it is only used to determine whether context intelligence is enabled. + * + * @author MateClaw Team + */ +public enum ContextProfile { + + /** Dynamic chat model, participates in context intelligence */ + DYNAMIC_CHAT(true, true), + + /** Dynamic reasoning model, participates in context intelligence */ + DYNAMIC_REASONING(true, true), + + /** Fixed-window chat model, does not participate in dynamic probing */ + FIXED_CHAT(false, true), + + /** Fixed-window reasoning model, does not participate in dynamic probing */ + FIXED_REASONING(false, true), + + /** Embedding model, no text context */ + FIXED_EMBEDDING(false, false), + + /** Rerank model, no text context */ + FIXED_RERANK(false, false), + + /** Audio model */ + AUDIO(false, false), + + /** Image generation model */ + IMAGE_GEN(false, false), + + /** Image understanding model */ + IMAGE_VISION(false, false), + + /** Video model */ + VIDEO(false, false), + + /** Not applicable */ + NOT_APPLICABLE(false, false); + + private final boolean dynamic; + private final boolean textContext; + + ContextProfile(boolean dynamic, boolean textContext) { + this.dynamic = dynamic; + this.textContext = textContext; + } + + /** Whether this is a dynamic-window model (participates in WindowProbe probing) */ + public boolean isDynamic() { + return dynamic; + } + + /** Whether the model has text context (participates in TokenBudgetPlanner budget planning) */ + public boolean hasTextContext() { + return textContext; + } + + /** + * Infer the profile from a modelType string. + *

+ * Common values: {@code chat} / {@code reasoning} / {@code code} / {@code embedding} / + * {@code rerank} / {@code audio} / {@code image_gen} / {@code image_vision} / {@code video} + * + * @param modelType model type string (from DB model_type or state RUNTIME_MODEL_TYPE) + * @return the corresponding ContextProfile, defaults to DYNAMIC_CHAT + */ + public static ContextProfile fromModelType(String modelType) { + if (modelType == null || modelType.isBlank()) { + return DYNAMIC_CHAT; + } + String t = modelType.trim().toLowerCase(); + return switch (t) { + case "chat" -> DYNAMIC_CHAT; + case "reasoning" -> DYNAMIC_REASONING; + case "code" -> DYNAMIC_CHAT; + case "embedding" -> FIXED_EMBEDDING; + case "rerank" -> FIXED_RERANK; + case "audio" -> AUDIO; + case "image_gen", "image-gen", "imagegen" -> IMAGE_GEN; + case "image_vision", "image-vision", "imagevision" -> IMAGE_VISION; + case "video" -> VIDEO; + default -> DYNAMIC_CHAT; + }; + } +} diff --git a/mateclaw-server/src/main/java/vip/mate/context/intelligence/enums/ResourcePressure.java b/mateclaw-server/src/main/java/vip/mate/context/intelligence/enums/ResourcePressure.java new file mode 100644 index 000000000..e242e7628 --- /dev/null +++ b/mateclaw-server/src/main/java/vip/mate/context/intelligence/enums/ResourcePressure.java @@ -0,0 +1,37 @@ +package vip.mate.context.intelligence.enums; + +/** + * Resource pressure level enum. + *

+ * Used by {@code PressureInferencer} to infer the current resource pressure of the LLM backend, + * which affects the history/injection allocation ratio in {@code TokenBudgetPlanner}. + *

+ * The numeric mapping is used for Micrometer Gauge reporting (0/1/2/3). + * + * @author MateClaw Team + */ +public enum ResourcePressure { + + /** Normal pressure, history ratio 60% */ + NORMAL(0), + + /** Mild pressure, history ratio 70% */ + ELEVATED(1), + + /** High pressure, history ratio 80% */ + HIGH(2), + + /** Severe pressure, history ratio 85% */ + CRITICAL(3); + + private final int gaugeValue; + + ResourcePressure(int gaugeValue) { + this.gaugeValue = gaugeValue; + } + + /** Micrometer Gauge reporting value */ + public int gaugeValue() { + return gaugeValue; + } +} diff --git a/mateclaw-server/src/main/java/vip/mate/context/intelligence/event/LlmOverflowSignal.java b/mateclaw-server/src/main/java/vip/mate/context/intelligence/event/LlmOverflowSignal.java new file mode 100644 index 000000000..987aa9db6 --- /dev/null +++ b/mateclaw-server/src/main/java/vip/mate/context/intelligence/event/LlmOverflowSignal.java @@ -0,0 +1,26 @@ +package vip.mate.context.intelligence.event; + +/** + * LLM overflow call signal event. + *

+ * Published by {@code ReasoningNode} when {@code streamCall} returns {@code isPromptTooLong()=true}, + * consumed synchronously via the Spring event bus (without {@code @Async}). + *

+ * Synchronous handling ensures {@code confidenceUpper} is updated before retry, + * which affects binary search convergence on consecutive PTL events. + * + * @param provider provider ID + * @param modelName model name used by the request + * @param modelType model type, e.g. "chat"/"reasoning"/"code" + * @param attemptedTokens estimated prompt token count before overflow + * @param traceId 8-char UUID prefix of the graph state + * + * @author MateClaw Team + */ +public record LlmOverflowSignal( + String provider, + String modelName, + String modelType, + int attemptedTokens, + String traceId +) {} diff --git a/mateclaw-server/src/main/java/vip/mate/context/intelligence/event/LlmSuccessSignal.java b/mateclaw-server/src/main/java/vip/mate/context/intelligence/event/LlmSuccessSignal.java new file mode 100644 index 000000000..6fd272dc6 --- /dev/null +++ b/mateclaw-server/src/main/java/vip/mate/context/intelligence/event/LlmSuccessSignal.java @@ -0,0 +1,29 @@ +package vip.mate.context.intelligence.event; + +/** + * LLM successful call signal event. + *

+ * Published by {@code ReasoningNode} after each {@code streamCall} returns a non-overflow result, + * consumed asynchronously via the Spring event bus ({@code @Async("signalExecutor")}). + *

+ * Carries data used to update the window probe state machine, the backend diversity tracker, and the pressure inferencer. + * + * @param provider provider ID (from graph state RUNTIME_PROVIDER_ID) + * @param modelName model name used by the request (from graph state RUNTIME_MODEL_NAME) + * @param modelType model type, e.g. "chat"/"reasoning"/"code" (from graph state RUNTIME_MODEL_TYPE) + * @param promptTokens actual prompt token count returned by the API (from StreamResult.promptTokens()) + * @param completionTokens completion token count returned by the API (from StreamResult.completionTokens()) + * @param latencyMs total elapsed time of this streamCall (timed around the ReasoningNode call) + * @param traceId 8-char UUID prefix of the graph state (from MateClawStateKeys.TRACE_ID) + * + * @author MateClaw Team + */ +public record LlmSuccessSignal( + String provider, + String modelName, + String modelType, + int promptTokens, + int completionTokens, + long latencyMs, + String traceId +) {} diff --git a/mateclaw-server/src/main/java/vip/mate/context/intelligence/listener/ContextSignalProcessor.java b/mateclaw-server/src/main/java/vip/mate/context/intelligence/listener/ContextSignalProcessor.java new file mode 100644 index 000000000..350a8d453 --- /dev/null +++ b/mateclaw-server/src/main/java/vip/mate/context/intelligence/listener/ContextSignalProcessor.java @@ -0,0 +1,198 @@ +package vip.mate.context.intelligence.listener; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; +import vip.mate.context.intelligence.event.LlmOverflowSignal; +import vip.mate.context.intelligence.event.LlmSuccessSignal; +import vip.mate.context.intelligence.metrics.ContextIntelMetrics; +import vip.mate.context.intelligence.perception.BackendDiversityRegistry; +import vip.mate.context.intelligence.perception.BackendDiversityTracker; +import vip.mate.context.intelligence.perception.PressureInferencer; +import vip.mate.context.intelligence.persist.PersistRetryQueue; +import vip.mate.context.intelligence.persist.WindowStateRepository; +import vip.mate.context.intelligence.probe.ProbeUpdate; +import vip.mate.context.intelligence.probe.WindowProbeRegistry; +import vip.mate.context.intelligence.snapshot.EnvSnapshotStore; + +import java.time.Duration; + +/** + * Signal processor (write path entry). + *

+ * Listens to {@link LlmSuccessSignal} and {@link LlmOverflowSignal}, coordinating state machine + perception + snapshot + persistence. + *

+ * Success path (§5.2 / C.6): {@code @Async("signalExecutor")} asynchronous processing, does not block inference threads. + * Each sub-step has an independent try-catch, so one failure does not affect others. + *

+ * Overflow path (§5.3): synchronous processing (no @Async), ensuring confidenceUpper is updated before PTL retry. + * + * @author MateClaw Team + */ +@Slf4j +@RequiredArgsConstructor +public class ContextSignalProcessor { + + private final WindowProbeRegistry windowProbeRegistry; + private final BackendDiversityRegistry diversityRegistry; + private final PressureInferencer pressureInferencer; + private final EnvSnapshotStore envSnapshotStore; + private final WindowStateRepository windowStateRepository; + private final PersistRetryQueue persistRetryQueue; + /** Observability metrics gateway (§C.7), internal null degradation, no caller null-check needed */ + private final ContextIntelMetrics metrics; + + // ==================== Success path (async) ==================== + + /** + * Handle success signal (§5.2 / C.6). + *

+ * Each sub-step has an independent try-catch, so one failure does not affect others. + */ + @Async("signalExecutor") + @EventListener + public void onSuccess(LlmSuccessSignal event) { + long startNs = System.nanoTime(); + try { + String key = WindowProbeRegistry.key(event.provider(), event.modelName()); + + // Step 1: WindowProbe state machine update + ProbeUpdate update = null; + try { + update = windowProbeRegistry.recordSuccess(key, event.promptTokens()); + } catch (Exception e) { + log.debug("[ContextIntel] traceId={} WindowProbe recordSuccess failed: {}", + event.traceId(), e.getMessage()); + // §C.7: probe update failed -> signal main effect lost, record as dropped + metrics.recordSignalDropped("probe_update_failed"); + } + + // Step 2: BackendDiversityTracker update + try { + diversityRegistry.recordSuccess(key, event.promptTokens()); + } catch (Exception e) { + log.debug("[ContextIntel] traceId={} DiversityRegistry failed: {}", + event.traceId(), e.getMessage()); + } + + // Step 3: PressureInferencer update + try { + pressureInferencer.recordSuccess(event.latencyMs()); + } catch (Exception e) { + log.debug("[ContextIntel] traceId={} PressureInferencer failed: {}", + event.traceId(), e.getMessage()); + } + + // Step 4+5: Refresh EnvSnapshot (only when value changes) + try { + if (update != null) { + BackendDiversityTracker tracker = diversityRegistry.get(key); + boolean diversity = (tracker != null) && tracker.isDiversityDetected(); + int safeWindow = (tracker != null && diversity) ? tracker.computeSafeWindow() : 0; + envSnapshotStore.refreshIfChanged(key, + update.snapshot().effectiveWindow(), + pressureInferencer.currentPressure(), + diversity, + safeWindow); + } + } catch (Exception e) { + log.debug("[ContextIntel] traceId={} EnvSnapshot refresh failed: {}", + event.traceId(), e.getMessage()); + } + + // Step 6: DB persist (only on phase change or first success) + if (update != null && (update.phaseChanged() || update.snapshot().totalSuccess() == 1)) { + try { + windowStateRepository.persist(key, update.snapshot()); + } catch (Exception e) { + log.debug("[ContextIntel] traceId={} DB persist failed: {}", + event.traceId(), e.getMessage()); + } + } + + // Step 7: Retry queue + try { + persistRetryQueue.retryOnce(); + } catch (Exception e) { + log.debug("[ContextIntel] traceId={} retryOnce failed: {}", + event.traceId(), e.getMessage()); + } + } finally { + // §C.7: total duration of success signal processing + metrics.recordSuccessProcess(Duration.ofNanos(System.nanoTime() - startNs)); + } + } + + // ==================== Overflow path (sync) ==================== + + /** + * Handle overflow signal (§5.3). + *

+ * Synchronous execution (no @Async), ensuring confidenceUpper is updated before PTL retry. + * Fully wrapped in try-catch; exceptions only logged at debug level (risk 1 mitigation). + */ + @EventListener + public void onOverflow(LlmOverflowSignal event) { + long startNs = System.nanoTime(); + try { + String key = WindowProbeRegistry.key(event.provider(), event.modelName()); + + try { + // Step 1: WindowProbe state machine update (overflow shrinkage) + ProbeUpdate update = windowProbeRegistry.recordOverflow(key, event.attemptedTokens()); + + // Step 2: BackendDiversityTracker update (v1 Bug 2 fix) + try { + diversityRegistry.recordOverflow(key, event.attemptedTokens()); + } catch (Exception e) { + log.debug("[ContextIntel] traceId={} DiversityRegistry.recordOverflow failed: {}", + event.traceId(), e.getMessage()); + } + + // Step 3: PressureInferencer update + try { + pressureInferencer.recordOverflow(); + } catch (Exception e) { + log.debug("[ContextIntel] traceId={} PressureInferencer.recordOverflow failed: {}", + event.traceId(), e.getMessage()); + } + + // Step 4: Refresh EnvSnapshot (overflow inevitably shrinks the window, so the value definitely changes) + try { + if (update != null) { + BackendDiversityTracker tracker = diversityRegistry.get(key); + boolean diversity = (tracker != null) && tracker.isDiversityDetected(); + int safeWindow = (tracker != null && diversity) ? tracker.computeSafeWindow() : 0; + envSnapshotStore.refreshIfChanged(key, + update.snapshot().effectiveWindow(), + pressureInferencer.currentPressure(), + diversity, + safeWindow); + } + } catch (Exception e) { + log.debug("[ContextIntel] traceId={} EnvSnapshot refresh failed: {}", + event.traceId(), e.getMessage()); + } + + // Step 5: DB persist (synchronous, overflow is a low-frequency event) + if (update != null) { + try { + windowStateRepository.persist(key, update.snapshot()); + } catch (Exception e) { + log.debug("[ContextIntel] traceId={} DB persist failed: {}", + event.traceId(), e.getMessage()); + } + } + } catch (Exception e) { + log.debug("[ContextIntel] traceId={} onOverflow failed: {}", + event.traceId(), e.getMessage()); + // §C.7: overflow handling failed entirely -> signal dropped + metrics.recordSignalDropped("overflow_exception"); + } + } finally { + // §C.7: total duration of overflow signal processing + metrics.recordOverflowProcess(Duration.ofNanos(System.nanoTime() - startNs)); + } + } +} diff --git a/mateclaw-server/src/main/java/vip/mate/context/intelligence/metrics/ContextIntelMetrics.java b/mateclaw-server/src/main/java/vip/mate/context/intelligence/metrics/ContextIntelMetrics.java new file mode 100644 index 000000000..95f105796 --- /dev/null +++ b/mateclaw-server/src/main/java/vip/mate/context/intelligence/metrics/ContextIntelMetrics.java @@ -0,0 +1,190 @@ +package vip.mate.context.intelligence.metrics; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.Tags; +import io.micrometer.core.instrument.Timer; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.stereotype.Component; +import vip.mate.context.intelligence.config.ContextIntelligenceProperties; + +import java.time.Duration; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Context Intelligence domain metrics gateway (modeled after {@code vip.mate.wiki.metrics.WikiMetrics}). + * + *

All observability metrics of the v2 module flow through this gateway, in order to: + *

+ * + *

Corresponds to design document §C.7 and §8.2. Metric inventory: + * + * + * + * + * + * + * + * + * + * + * + *
Metric nameTypeTagsDescription
{@code context.intel.signal.process.duration}Timertype=success|overflowSignal processing duration
{@code context.intel.phase.transition}Counterprovider,model,from,toPhase transition count
{@code context.intel.snapshot.refresh}Counterprovider,modelSnapshot refresh count
{@code context.intel.db.persist.failure}Counterprovider,modelDB persist failure
{@code context.intel.signal.dropped}CounterreasonSignal dropped
{@code context.intel.pressure.level}Gaugeprovider,modelCurrent pressure level 0/1/2/3
{@code context.intel.effective.window}Gaugeprovider,modelCurrent effective window tokens
{@code context.intel.phase}Gaugeprovider,modelCurrent state machine phase 0/1/2/3/4
{@code context.intel.diversity.detected}Gaugeprovider,modelWhether multiple backends detected 0/1
+ * + *

Gauge implementation note: Uses {@link AtomicInteger} as the gauge value container, + * registered once via {@link MeterRegistry#gauge(String, Iterable, Object)}; + * subsequent updates via {@link AtomicInteger#set(int)} are automatically reflected at scrape time. + * + * @author MateClaw Team + */ +@Slf4j +@Component +public class ContextIntelMetrics { + + private final MeterRegistry registry; + /** Combined switch: true only when MeterRegistry exists AND metrics.enabled=true */ + private final boolean enabled; + private final ConcurrentHashMap counterCache = new ConcurrentHashMap<>(); + private final ConcurrentHashMap timerCache = new ConcurrentHashMap<>(); + private final ConcurrentHashMap gaugeValueCache = new ConcurrentHashMap<>(); + + public ContextIntelMetrics(ObjectProvider registryProvider, + ContextIntelligenceProperties props) { + this.registry = registryProvider.getIfAvailable(); + this.enabled = registry != null && props.getMetrics().isEnabled(); + if (!enabled) { + log.info("[ContextIntel] metrics disabled (MeterRegistry={}, metrics.enabled={})", + registry != null ? "available" : "absent", + props.getMetrics().isEnabled()); + } else { + log.info("[ContextIntel] MeterRegistry available; metrics enabled"); + } + } + + // ==================== Signal processing timers (§C.7) ==================== + + /** Record successful signal processing duration (Processor.onSuccess). */ + public void recordSuccessProcess(Duration duration) { + if (!enabled) return; + getTimer("context.intel.signal.process.duration", + Tags.of(Tag.of("type", "success"))).record(duration); + } + + /** Record overflow signal processing duration (Processor.onOverflow). */ + public void recordOverflowProcess(Duration duration) { + if (!enabled) return; + getTimer("context.intel.signal.process.duration", + Tags.of(Tag.of("type", "overflow"))).record(duration); + } + + // ==================== Phase transitions (§C.7) ==================== + + /** Record state machine phase transition (WindowProbeRegistry). */ + public void recordPhaseTransition(String provider, String model, String from, String to) { + if (!enabled) return; + getCounter("context.intel.phase.transition", + Tags.of(Tag.of("provider", provider), + Tag.of("model", model), + Tag.of("from", from), + Tag.of("to", to))).increment(); + } + + // ==================== Snapshot refresh (§C.7) ==================== + + /** Record snapshot refresh count (when EnvSnapshotStore.refreshIfChanged actually updates). */ + public void recordSnapshotRefresh(String provider, String model) { + if (!enabled) return; + getCounter("context.intel.snapshot.refresh", + Tags.of(Tag.of("provider", provider), + Tag.of("model", model))).increment(); + } + + // ==================== DB persist failures (§C.7) ==================== + + /** Record DB persist failure (WindowStateRepository.persist catch block). */ + public void recordDbPersistFailure(String provider, String model) { + if (!enabled) return; + getCounter("context.intel.db.persist.failure", + Tags.of(Tag.of("provider", provider), + Tag.of("model", model))).increment(); + } + + // ==================== Signal dropped (§C.7) ==================== + + /** Record signal dropped (signal not fully processed due to Processor exception). */ + public void recordSignalDropped(String reason) { + if (!enabled) return; + getCounter("context.intel.signal.dropped", + Tags.of(Tag.of("reason", reason))).increment(); + } + + // ==================== Gauges (per provider:model, §C.7) ==================== + + /** Update pressure level gauge (0/1/2/3). */ + public void updatePressureLevel(String provider, String model, int level) { + if (!enabled) return; + updateGauge("context.intel.pressure.level", provider, model, level); + } + + /** Update effective window gauge (tokens). */ + public void updateEffectiveWindow(String provider, String model, int tokens) { + if (!enabled) return; + updateGauge("context.intel.effective.window", provider, model, tokens); + } + + /** Update state machine phase gauge (0/1/2/3/4). */ + public void updatePhase(String provider, String model, int phase) { + if (!enabled) return; + updateGauge("context.intel.phase", provider, model, phase); + } + + /** Update diversity detection gauge (0/1). */ + public void updateDiversityDetected(String provider, String model, boolean detected) { + if (!enabled) return; + updateGauge("context.intel.diversity.detected", provider, model, detected ? 1 : 0); + } + + // ==================== Internal cache (copied from WikiMetrics) ==================== + + private void updateGauge(String name, String provider, String model, int value) { + String key = gaugeKey(name, provider, model); + AtomicInteger holder = gaugeValueCache.computeIfAbsent(key, k -> { + AtomicInteger h = new AtomicInteger(value); + Tags tags = Tags.of(Tag.of("provider", provider), + Tag.of("model", model)); + registry.gauge(name, tags, h); + return h; + }); + holder.set(value); + } + + private Counter getCounter(String name, Tags tags) { + return counterCache.computeIfAbsent(meterKey(name, tags), + k -> registry.counter(name, tags)); + } + + private Timer getTimer(String name, Tags tags) { + return timerCache.computeIfAbsent(meterKey(name, tags), + k -> registry.timer(name, tags)); + } + + private static String meterKey(String name, Tags tags) { + StringBuilder sb = new StringBuilder(name); + for (Tag t : tags) { + sb.append('|').append(t.getKey()).append('=').append(t.getValue()); + } + return sb.toString(); + } + + private static String gaugeKey(String name, String provider, String model) { + return name + "|provider=" + provider + "|model=" + model; + } +} diff --git a/mateclaw-server/src/main/java/vip/mate/context/intelligence/perception/BackendDiversityRegistry.java b/mateclaw-server/src/main/java/vip/mate/context/intelligence/perception/BackendDiversityRegistry.java new file mode 100644 index 000000000..cf677709b --- /dev/null +++ b/mateclaw-server/src/main/java/vip/mate/context/intelligence/perception/BackendDiversityRegistry.java @@ -0,0 +1,74 @@ +package vip.mate.context.intelligence.perception; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import vip.mate.context.intelligence.config.ContextIntelligenceProperties; + +import java.util.concurrent.ConcurrentHashMap; + +/** + * Multi-model BackendDiversityTracker registry. + *

+ * Mirrors the compute pattern of {@code WindowProbeRegistry}, managing tracker instances + * keyed by (provider, model). One independent tracker per key (not global), because the + * detection targets the variance "under the same model name". + * + * @author MateClaw Team + */ +@Slf4j +@Component +public class BackendDiversityRegistry { + + private final ConcurrentHashMap trackers = new ConcurrentHashMap<>(); + + private final ContextIntelligenceProperties props; + + public BackendDiversityRegistry(ContextIntelligenceProperties props) { + this.props = props; + } + + /** Record a successful call */ + public void recordSuccess(String key, int promptTokens) { + try { + trackers.computeIfAbsent(key, k -> new BackendDiversityTracker(props.getDiversity())) + .recordSuccess(promptTokens); + } catch (Exception e) { + log.debug("[ContextIntel] DiversityTracker recordSuccess failed for {}: {}", key, e.getMessage()); + } + } + + /** Record an overflow call (v1 Bug 2 fix: overflow finally feeds into diversity detection) */ + public void recordOverflow(String key, int attemptedTokens) { + try { + trackers.computeIfAbsent(key, k -> new BackendDiversityTracker(props.getDiversity())) + .recordOverflow(attemptedTokens); + } catch (Exception e) { + log.debug("[ContextIntel] DiversityTracker recordOverflow failed for {}: {}", key, e.getMessage()); + } + } + + /** + * Get the tracker (for the read path). + *

+ * May return null (computeIfAbsent not yet triggered on first access). + */ + public BackendDiversityTracker get(String key) { + return trackers.get(key); + } + + /** Restore the diversityDetected flag from DB (§5.7 restart recovery) */ + public void restore(String key, boolean diversityDetected) { + trackers.compute(key, (k, existing) -> { + if (existing != null) { + // already exists: do not overwrite the existing tracker (in-memory data takes priority) + return existing; + } + return BackendDiversityTracker.restore(diversityDetected, props.getDiversity()); + }); + } + + /** Idle eviction (coordinated by WindowProbeRegistry.evictIdleProbes) */ + public void evict(String key) { + trackers.remove(key); + } +} diff --git a/mateclaw-server/src/main/java/vip/mate/context/intelligence/perception/BackendDiversityTracker.java b/mateclaw-server/src/main/java/vip/mate/context/intelligence/perception/BackendDiversityTracker.java new file mode 100644 index 000000000..3a9eef560 --- /dev/null +++ b/mateclaw-server/src/main/java/vip/mate/context/intelligence/perception/BackendDiversityTracker.java @@ -0,0 +1,181 @@ +package vip.mate.context.intelligence.perception; + +import vip.mate.context.intelligence.config.ContextIntelligenceProperties; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Backend diversity tracker (per-model instance, not a Spring Bean). + *

+ * Detects whether a single model name routes to multiple backends (e.g. a gateway sometimes + * routes "gpt-4" to an 8K model, sometimes to a 128K model). This manifests as a large + * variance in successTokens or an abnormal success/overflow ratio for that key. + *

+ * Detection conditions (triggered when any is met, sticky until decay): + *

+ *

+ * Thread safety: successTokens uses copy-on-write + AtomicReference; + * minOverflowToken / maxSuccessToken / observationCount use AtomicInteger. + * + * @author MateClaw Team + */ +public class BackendDiversityTracker { + + // --- observation data --- + final AtomicReference> successTokens; + final AtomicInteger minOverflowToken; // v1 Bug 2: never updated, fixed in v2 + final AtomicInteger maxSuccessToken; + final AtomicInteger observationCount; + + // --- detection result (sticky)--- + volatile boolean diversityDetected; + volatile long lastResetTime; + + // --- config --- + final int maxObservations; + final int safePercentile; + final double successVarianceThreshold; + final double overflowRatioThreshold; + final long decayMs; + + BackendDiversityTracker(ContextIntelligenceProperties.Diversity cfg) { + this.successTokens = new AtomicReference<>(new ArrayList<>()); + this.minOverflowToken = new AtomicInteger(Integer.MAX_VALUE); + this.maxSuccessToken = new AtomicInteger(0); + this.observationCount = new AtomicInteger(0); + this.diversityDetected = false; + this.lastResetTime = System.currentTimeMillis(); + this.maxObservations = cfg.getMaxObservations(); + this.safePercentile = cfg.getSafePercentile(); + this.successVarianceThreshold = cfg.getSuccessVarianceThreshold(); + this.overflowRatioThreshold = cfg.getOverflowRatioThreshold(); + this.decayMs = cfg.getDecayHours() * 3_600_000L; + } + + /** Restore diversityDetected flag from DB (§5.7 restart recovery) */ + static BackendDiversityTracker restore(boolean diversityDetected, ContextIntelligenceProperties.Diversity cfg) { + BackendDiversityTracker t = new BackendDiversityTracker(cfg); + t.diversityDetected = diversityDetected; + return t; + } + + // ==================== write path ==================== + + /** Record the prompt token count of a successful call */ + void recordSuccess(int promptTokens) { + checkDecay(); + + // copy-on-write update of successTokens + successTokens.updateAndGet(current -> { + List copy = new ArrayList<>(current); + copy.add(promptTokens); + // discard oldest observations when exceeding the cap + while (copy.size() > maxObservations) { + copy.remove(0); + } + return Collections.unmodifiableList(copy); + }); + + // update maxSuccessToken + maxSuccessToken.accumulateAndGet(promptTokens, Math::max); + + // check detection conditions every 10 observations + int count = observationCount.incrementAndGet(); + if (count % 10 == 0) { + detectPattern(); + } + } + + /** Record the attempted token count of an overflow call (v1 Bug 2 fix: this method is finally invoked) */ + void recordOverflow(int attemptedTokens) { + checkDecay(); + + // update minOverflowToken + minOverflowToken.accumulateAndGet(attemptedTokens, Math::min); + + observationCount.incrementAndGet(); + + // overflow is a strong signal, check immediately + detectPattern(); + } + + // ==================== read path ==================== + + public boolean isDiversityDetected() { + return diversityDetected; + } + + /** + * Compute the P10 safe window (10th percentile of successTokens). + *

+ * Only meaningful when diversityDetected=true. Returns 0 when there is no observation data. + */ + public int computeSafeWindow() { + List tokens = successTokens.get(); + if (tokens.isEmpty()) { + return 0; + } + List sorted = new ArrayList<>(tokens); + Collections.sort(sorted); + // P10 = 10th percentile + int idx = (int) Math.ceil(sorted.size() * safePercentile / 100.0) - 1; + if (idx < 0) idx = 0; + return sorted.get(idx); + } + + // ==================== internal methods ==================== + + /** Check decay: reset detection result after decayMs */ + private void checkDecay() { + long now = System.currentTimeMillis(); + if (now - lastResetTime > decayMs) { + diversityDetected = false; + successTokens.set(new ArrayList<>()); + minOverflowToken.set(Integer.MAX_VALUE); + maxSuccessToken.set(0); + observationCount.set(0); + lastResetTime = now; + } + } + + /** Run detection conditions A and B */ + private void detectPattern() { + if (diversityDetected) { + return; // sticky, no need to re-detect once detected + } + + int maxSuccess = maxSuccessToken.get(); + int minOverflow = minOverflowToken.get(); + + // Condition A (strong signal): abnormal success/overflow ratio + // maxSuccess much larger than minOverflow -> some backends can handle far more tokens than the overflow threshold + if (minOverflow > 0 && minOverflow != Integer.MAX_VALUE && maxSuccess > 0) { + double ratio = (double) maxSuccess / minOverflow; + if (ratio > overflowRatioThreshold) { + diversityDetected = true; + return; + } + } + + // Condition B (weak signal): large variance in success token distribution + List tokens = successTokens.get(); + if (tokens.size() >= 10) { + int min = tokens.stream().mapToInt(Integer::intValue).min().orElse(0); + int max = tokens.stream().mapToInt(Integer::intValue).max().orElse(0); + if (min > 0) { + double varianceRatio = (double) max / min; + if (varianceRatio > successVarianceThreshold) { + diversityDetected = true; + } + } + } + } +} diff --git a/mateclaw-server/src/main/java/vip/mate/context/intelligence/perception/PressureInferencer.java b/mateclaw-server/src/main/java/vip/mate/context/intelligence/perception/PressureInferencer.java new file mode 100644 index 000000000..bfaa8fe20 --- /dev/null +++ b/mateclaw-server/src/main/java/vip/mate/context/intelligence/perception/PressureInferencer.java @@ -0,0 +1,151 @@ +package vip.mate.context.intelligence.perception; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import vip.mate.context.intelligence.config.ContextIntelligenceProperties; +import vip.mate.context.intelligence.enums.ResourcePressure; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Pressure inferencer (global singleton, not per-model). + *

+ * Infers the current resource pressure level of the LLM backend based on consecutive + * success/overflow/latency. The pressure level is shared globally (affecting the budget + * allocation ratio for all models). + *

+ * Degradation path (stepped, C.4): CRITICAL → HIGH → ELEVATED → NORMAL + *

+ * Escalation path: overflowEscalateThreshold consecutive overflows OR average latency > latencyEscalateMs (requires ≥ minLatencySamples samples) + * + * @author MateClaw Team + */ +@Slf4j +@Component +public class PressureInferencer { + + // --- current pressure level (volatile, lock-free read path)--- + volatile ResourcePressure currentPressure = ResourcePressure.NORMAL; + + // --- consecutive counters (AtomicInteger, CAS updates)--- + final AtomicInteger consecutiveSuccesses = new AtomicInteger(0); + final AtomicInteger consecutiveOverflows = new AtomicInteger(0); + + // --- latency samples (ring buffer, synchronized guards multi-field updates)--- + final long[] latencySamples = new long[10]; + int latencyIdx = 0; + int latencySampleCount = 0; + + private final ContextIntelligenceProperties.Pressure cfg; + + public PressureInferencer(ContextIntelligenceProperties props) { + this.cfg = props.getPressure(); + } + + // ==================== write path ==================== + + /** + * Record a successful call (stepped degradation, C.4). + *

+ * Each level of degradation requires successDegradeStep consecutive successes; the counter + * is reset after degrading. Latency escalation requires ≥ minLatencySamples samples to + * avoid misjudging a single spike (v2.6). + */ + public void recordSuccess(long latencyMs) { + try { + consecutiveSuccesses.incrementAndGet(); + consecutiveOverflows.set(0); + updateLatencySamples(latencyMs); + + // stepped degradation + int step = cfg.getSuccessDegradeStep(); + int successes = consecutiveSuccesses.get(); + if (successes >= step) { + boolean degraded = false; + if (currentPressure == ResourcePressure.CRITICAL) { + currentPressure = ResourcePressure.HIGH; + degraded = true; + } else if (currentPressure == ResourcePressure.HIGH) { + currentPressure = ResourcePressure.ELEVATED; + degraded = true; + } else if (currentPressure == ResourcePressure.ELEVATED) { + currentPressure = ResourcePressure.NORMAL; + degraded = true; + } + if (degraded) { + consecutiveSuccesses.set(0); // reset counter, accumulate step successes again for the next level + } + } + + // latency escalation check (requires ≥ minLatencySamples samples, v2.6) + if (latencySampleCount >= cfg.getMinLatencySamples()) { + long avgLatency = averageLatency(); + if (avgLatency > cfg.getLatencyEscalateMs()) { + escalatePressure(); + } + } + } catch (Exception e) { + log.debug("[ContextIntel] PressureInferencer recordSuccess failed: {}", e.getMessage()); + } + } + + /** Record an overflow call (overflowEscalateThreshold consecutive overflows -> escalate) */ + public void recordOverflow() { + try { + consecutiveOverflows.incrementAndGet(); + consecutiveSuccesses.set(0); + + if (consecutiveOverflows.get() >= cfg.getOverflowEscalateThreshold()) { + escalatePressure(); + } + } catch (Exception e) { + log.debug("[ContextIntel] PressureInferencer recordOverflow failed: {}", e.getMessage()); + } + } + + // ==================== read path ==================== + + public ResourcePressure currentPressure() { + return currentPressure; + } + + // ==================== internal methods ==================== + + /** Escalate the pressure level (NORMAL → ELEVATED → HIGH → CRITICAL) */ + private void escalatePressure() { + if (currentPressure == ResourcePressure.NORMAL) { + currentPressure = ResourcePressure.ELEVATED; + } else if (currentPressure == ResourcePressure.ELEVATED) { + currentPressure = ResourcePressure.HIGH; + } else if (currentPressure == ResourcePressure.HIGH) { + currentPressure = ResourcePressure.CRITICAL; + } + // CRITICAL is the highest level, no further escalation + } + + /** + * Update the latency sample ring buffer. + *

+ * synchronized guard: multiple fields (latencySamples / latencyIdx / latencySampleCount) + * need atomic updates. signalExecutor has 8 threads, but the method is very fast and contention is low. + */ + private synchronized void updateLatencySamples(long latencyMs) { + latencySamples[latencyIdx] = latencyMs; + latencyIdx = (latencyIdx + 1) % latencySamples.length; + if (latencySampleCount < latencySamples.length) { + latencySampleCount++; + } + } + + /** Compute the average of the latency samples */ + private long averageLatency() { + if (latencySampleCount == 0) { + return 0; + } + long sum = 0; + for (int i = 0; i < latencySampleCount; i++) { + sum += latencySamples[i]; + } + return sum / latencySampleCount; + } +} diff --git a/mateclaw-server/src/main/java/vip/mate/context/intelligence/persist/PersistRetryQueue.java b/mateclaw-server/src/main/java/vip/mate/context/intelligence/persist/PersistRetryQueue.java new file mode 100644 index 000000000..8bc993bce --- /dev/null +++ b/mateclaw-server/src/main/java/vip/mate/context/intelligence/persist/PersistRetryQueue.java @@ -0,0 +1,69 @@ +package vip.mate.context.intelligence.persist; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.stereotype.Component; +import vip.mate.context.intelligence.probe.WindowProbeSnapshot; + +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * DB persist failure retry queue (in-memory, single retry). + *

+ * When DB persist fails, the snapshot is buffered to this queue. It is retried once on the next signal. + *

+ * Key design (§5.6): + *

+ * + * @author MateClaw Team + */ +@Slf4j +@Component +public class PersistRetryQueue { + + private static final int MAX_QUEUE_SIZE = 50; + + private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); + + private final ObjectProvider repositoryProvider; + + public PersistRetryQueue(ObjectProvider repositoryProvider) { + this.repositoryProvider = repositoryProvider; + } + + /** Enqueue (drop oldest when cap exceeded) */ + public void offer(String key, WindowProbeSnapshot snapshot) { + while (queue.size() >= MAX_QUEUE_SIZE) { + queue.poll(); + } + queue.offer(new PendingPersist(key, snapshot)); + } + + /** Retry once (dequeue one and retry, do not re-enqueue) */ + public void retryOnce() { + PendingPersist pending = queue.poll(); + if (pending == null) { + return; + } + WindowStateRepository repo = repositoryProvider.getIfAvailable(); + if (repo == null) { + return; + } + try { + repo.persist(pending.key(), pending.snapshot()); + } catch (Exception e) { + log.debug("[ContextIntel] Retry persist failed, dropping: {}", pending.key()); + } + } + + /** Number of pending retry entries */ + public int size() { + return queue.size(); + } + + /** Buffered pending-persist entry */ + record PendingPersist(String key, WindowProbeSnapshot snapshot) {} +} diff --git a/mateclaw-server/src/main/java/vip/mate/context/intelligence/persist/WindowStateRepository.java b/mateclaw-server/src/main/java/vip/mate/context/intelligence/persist/WindowStateRepository.java new file mode 100644 index 000000000..202c2dc43 --- /dev/null +++ b/mateclaw-server/src/main/java/vip/mate/context/intelligence/persist/WindowStateRepository.java @@ -0,0 +1,203 @@ +package vip.mate.context.intelligence.persist; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.dao.DataAccessException; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Component; +import vip.mate.context.intelligence.metrics.ContextIntelMetrics; +import vip.mate.context.intelligence.probe.WindowProbe; +import vip.mate.context.intelligence.probe.WindowProbeSnapshot; +import vip.mate.context.intelligence.probe.WindowStateLoader; + +import java.sql.Timestamp; +import java.time.Instant; +import java.util.List; +import java.util.Map; + +/** + * DB read/write (JdbcTemplate). + *

+ * Implements {@link WindowStateLoader} for {@code WindowProbeRegistry} to load persisted state. + *

+ * Table schema (created in V161): {@code mate_model_context_state}, PK = (provider_id, model_name) + *

+ * Persistence strategy (§5.6): + *

    + *
  • Capture snapshot inside lock, write to DB outside lock (avoid IO inside lock)
  • + *
  • On failure, buffer to {@link PersistRetryQueue} and retry once on next signal
  • + *
  • Use "UPDATE first, INSERT if 0 rows" pattern, cross-dialect compatible
  • + *
+ * + * @author MateClaw Team + */ +@Slf4j +@Component +public class WindowStateRepository implements WindowStateLoader { + + private static final String TABLE = "mate_model_context_state"; + + private static final String SQL_UPDATE = """ + UPDATE """ + TABLE + """ + SET phase=?, effective_window=?, confidence_lower=?, confidence_upper=?, + declared_limit=?, peak_observed=?, successive_success=?, successive_overflow=?, + total_success=?, total_overflow=?, last_success_at=?, last_overflow_at=?, last_updated_at=? + WHERE provider_id=? AND model_name=? + """; + + private static final String SQL_INSERT = """ + INSERT INTO """ + TABLE + """ + (provider_id, model_name, phase, effective_window, confidence_lower, confidence_upper, + declared_limit, peak_observed, successive_success, successive_overflow, + total_success, total_overflow, last_success_at, last_overflow_at, last_updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """; + + private static final String SQL_SELECT = """ + SELECT phase, effective_window, confidence_lower, confidence_upper, + declared_limit, peak_observed, successive_success, successive_overflow, + total_success, total_overflow, last_success_at, last_overflow_at, last_updated_at + FROM """ + TABLE + """ + WHERE provider_id=? AND model_name=? + """; + + private final JdbcTemplate jdbcTemplate; + private final PersistRetryQueue retryQueue; + /** Observability metrics gateway (§C.7), nullable: ObjectProvider may return null in test environment */ + private final ContextIntelMetrics metrics; + + public WindowStateRepository(JdbcTemplate jdbcTemplate, + PersistRetryQueue retryQueue, + ObjectProvider metricsProvider) { + this.jdbcTemplate = jdbcTemplate; + this.retryQueue = retryQueue; + this.metrics = metricsProvider.getIfAvailable(); + } + + // ==================== WindowStateLoader ==================== + + @Override + public WindowProbeSnapshot load(String provider, String modelName) { + try { + List> rows = jdbcTemplate.queryForList(SQL_SELECT, provider, modelName); + if (rows.isEmpty()) { + return null; + } + Map row = rows.get(0); + return mapToSnapshot(row); + } catch (DataAccessException e) { + log.debug("[ContextIntel] DB load failed for {}:{}: {}", provider, modelName, e.getMessage()); + return null; + } + } + + // ==================== Persistence ==================== + + /** + * Persist snapshot to DB (cross-dialect upsert: UPDATE first, INSERT if 0 rows). + *

+ * On failure, buffer to {@link PersistRetryQueue}. + */ + public void persist(String key, WindowProbeSnapshot snapshot) { + String[] parts = splitKey(key); + String provider = parts[0]; + String model = parts[1]; + + try { + int updated = jdbcTemplate.update(SQL_UPDATE, + snapshot.phase().name(), + snapshot.effectiveWindow(), + snapshot.confidenceLower(), + snapshot.confidenceUpper(), + snapshot.declaredLimit(), + snapshot.peakObserved(), + snapshot.successiveSuccess(), + snapshot.successiveOverflow(), + snapshot.totalSuccess(), + snapshot.totalOverflow(), + toTimestamp(snapshot.lastSuccessAt()), + toTimestamp(snapshot.lastOverflowAt()), + toTimestamp(snapshot.lastUpdatedAt()), + provider, model + ); + if (updated == 0) { + jdbcTemplate.update(SQL_INSERT, + provider, model, + snapshot.phase().name(), + snapshot.effectiveWindow(), + snapshot.confidenceLower(), + snapshot.confidenceUpper(), + snapshot.declaredLimit(), + snapshot.peakObserved(), + snapshot.successiveSuccess(), + snapshot.successiveOverflow(), + snapshot.totalSuccess(), + snapshot.totalOverflow(), + toTimestamp(snapshot.lastSuccessAt()), + toTimestamp(snapshot.lastOverflowAt()), + toTimestamp(snapshot.lastUpdatedAt()) + ); + } + } catch (DataAccessException e) { + log.debug("[ContextIntel] DB persist failed for {}: {}", key, e.getMessage()); + // §C.7: DB persist failure count + if (metrics != null) { + try { + metrics.recordDbPersistFailure(provider, model); + } catch (Exception me) { + log.debug("[ContextIntel] dbPersistFailure metric failed: {}", me.getMessage()); + } + } + retryQueue.offer(key, snapshot); + } + } + + // ==================== Utilities ==================== + + private WindowProbeSnapshot mapToSnapshot(Map row) { + return new WindowProbeSnapshot( + WindowProbe.Phase.valueOf(stringOf(row.get("phase"))), + intOf(row.get("effective_window")), + intOf(row.get("confidence_lower")), + intOf(row.get("confidence_upper")), + intOf(row.get("declared_limit")), + intOf(row.get("peak_observed")), + intOf(row.get("successive_success")), + intOf(row.get("successive_overflow")), + intOf(row.get("total_success")), + intOf(row.get("total_overflow")), + toInstant(row.get("last_success_at")), + toInstant(row.get("last_overflow_at")), + toInstant(row.get("last_updated_at")) + ); + } + + private static String[] splitKey(String key) { + int idx = key.indexOf(':'); + if (idx < 0) { + return new String[]{key, ""}; + } + return new String[]{key.substring(0, idx), key.substring(idx + 1)}; + } + + private static String stringOf(Object val) { + return (val != null) ? val.toString() : null; + } + + private static int intOf(Object val) { + if (val == null) return 0; + if (val instanceof Number n) return n.intValue(); + return Integer.parseInt(val.toString()); + } + + private static Timestamp toTimestamp(Instant instant) { + return (instant != null) ? Timestamp.from(instant) : null; + } + + private static Instant toInstant(Object val) { + if (val == null) return null; + if (val instanceof Timestamp ts) return ts.toInstant(); + if (val instanceof java.util.Date d) return d.toInstant(); + return null; + } +} diff --git a/mateclaw-server/src/main/java/vip/mate/context/intelligence/probe/ProbeUpdate.java b/mateclaw-server/src/main/java/vip/mate/context/intelligence/probe/ProbeUpdate.java new file mode 100644 index 000000000..03adec898 --- /dev/null +++ b/mateclaw-server/src/main/java/vip/mate/context/intelligence/probe/ProbeUpdate.java @@ -0,0 +1,23 @@ +package vip.mate.context.intelligence.probe; + +/** + * Return value of recordSuccess / recordOverflow. + *

+ * Captures snapshot + previousPhase within the {@code ConcurrentHashMap.compute} lock, + * and returns them to the processor for out-of-lock reading of effectiveWindow (step 5) and deciding + * whether to persist (step 6). The processor is stateless and does not need to cache the previous phase — + * it is obtained from the return value on each call. + * + * @param snapshot the post-update state snapshot + * @param previousPhase the pre-update phase + * @author MateClaw Team + */ +public record ProbeUpdate( + WindowProbeSnapshot snapshot, + WindowProbe.Phase previousPhase +) { + /** Whether the phase has changed (used to decide whether DB persistence is needed) */ + public boolean phaseChanged() { + return snapshot.phase() != previousPhase; + } +} diff --git a/mateclaw-server/src/main/java/vip/mate/context/intelligence/probe/WindowProbe.java b/mateclaw-server/src/main/java/vip/mate/context/intelligence/probe/WindowProbe.java new file mode 100644 index 000000000..c4517f60b --- /dev/null +++ b/mateclaw-server/src/main/java/vip/mate/context/intelligence/probe/WindowProbe.java @@ -0,0 +1,353 @@ +package vip.mate.context.intelligence.probe; + +import vip.mate.context.intelligence.config.ContextIntelligenceProperties; + +import java.time.Instant; + +/** + * Window probe state machine (up aggressive / down conservative). + *

+ * State transitions: COLD → PROBING → BINARY_SEARCH → STABLE → DEGRADED → PROBING + *

+ * Thread-safety model: + *

    + *
  • {@code recordSuccess} / {@code recordOverflow} execute inside the {@code ConcurrentHashMap.compute} lock, + * serialized per-key, no extra synchronization needed
  • + *
  • {@code snapshot()} and volatile fields can be read concurrently by the read path (budget planning); + * volatile guarantees visibility
  • + *
  • {@code samples} / {@code sampleIdx} / {@code sampleCount} are volatile data, only accessed within the compute lock
  • + *
+ *

+ * Not a Spring Bean: created per-key by {@link WindowProbeRegistry}, with config injected at construction. + * + * @author MateClaw Team + */ +public class WindowProbe { + + /** State machine phase */ + public enum Phase { + /** Initial value exists but has not been validated by production traffic */ + COLD, + /** Lower bound validated, upper bound under exponential probing */ + PROBING, + /** Both bounds known, converging via binary search */ + BINARY_SEARCH, + /** Window confirmed, passively observing */ + STABLE, + /** Just overflowed, shrinking */ + DEGRADED + } + + // --- Window estimation (volatile, visible to read path) --- + volatile Phase phase; + volatile int effectiveWindow; + volatile int confidenceLower; + volatile int confidenceUpper; + volatile int declaredLimit; + + // --- Statistics (volatile, visible to read path) --- + volatile int peakObserved; + volatile int successiveSuccess; + volatile int successiveOverflow; + volatile int totalSuccess; + volatile int totalOverflow; + volatile Instant lastSuccessAt; + volatile Instant lastOverflowAt; + volatile Instant lastUpdatedAt; + + /** + * Last DB persistence time in STABLE state (used for C.3 throttling). + *

+ * Non-volatile field, only used in the ContextSignalProcessor single-writer scenario; + * the worst-case race is one extra or one missed DB write, which is harmless. + */ + volatile Instant lastPersistAt; + + // --- Sampling ring buffer (volatile data, accessed within compute lock) --- + final int[] samples; + int sampleIdx; + int sampleCount; + final int sampleSize; + + // --- Constants (injected from yml) --- + final int globalCeiling; + final int coldSeedFallback; + final double overflowShrinkRatio; + final double binaryConvergence; + final long staleReprobeMs; + + /** + * Config is injected when the Registry creates an instance. + */ + WindowProbe(ContextIntelligenceProperties.Probe cfg) { + this.globalCeiling = cfg.getGlobalCeiling(); + this.coldSeedFallback = cfg.getColdSeedFallback(); + this.overflowShrinkRatio = cfg.getOverflowShrinkRatio(); + this.binaryConvergence = cfg.getBinaryConvergence(); + this.staleReprobeMs = cfg.getStaleReprobeMs(); + this.sampleSize = cfg.getSampleSize(); + this.samples = new int[sampleSize]; + } + + // ==================== Factory methods ==================== + + /** + * Cold start (no declaredLimit). + *

+ * The seed value is typically the yml cold-seed-fallback (default 32768), to avoid excessive + * initial optimism for large models. + */ + static WindowProbe coldStart(int seed, ContextIntelligenceProperties.Probe cfg) { + WindowProbe p = new WindowProbe(cfg); + p.effectiveWindow = seed; + p.confidenceLower = 0; + p.confidenceUpper = Math.min(seed * 2, p.globalCeiling); + p.phase = Phase.COLD; + p.declaredLimit = 0; + p.lastUpdatedAt = Instant.now(); + return p; + } + + /** + * Cold start (with declaredLimit, for scenarios where the model's declared limit is known). + *

+ * When the declared limit is smaller than the exponential probing upper bound, lower the upper + * bound to declaredLimit. + */ + static WindowProbe coldStart(int seed, int declaredLimit, ContextIntelligenceProperties.Probe cfg) { + WindowProbe p = coldStart(seed, cfg); + p.declaredLimit = declaredLimit; + if (declaredLimit > 0 && declaredLimit < p.confidenceUpper) { + p.confidenceUpper = declaredLimit; + } + return p; + } + + /** + * Restore from a DB snapshot (restart recovery flow, §5.7). + *

+ * samples / sampleIdx / sampleCount are cleared (volatile data), and re-accumulate quickly after restart. + */ + static WindowProbe restoreFromSnapshot(WindowProbeSnapshot snap, ContextIntelligenceProperties.Probe cfg) { + WindowProbe p = new WindowProbe(cfg); + p.phase = snap.phase(); + p.effectiveWindow = snap.effectiveWindow(); + p.confidenceLower = snap.confidenceLower(); + p.confidenceUpper = snap.confidenceUpper(); + p.declaredLimit = snap.declaredLimit(); + p.peakObserved = snap.peakObserved(); + p.successiveSuccess = snap.successiveSuccess(); + p.successiveOverflow = snap.successiveOverflow(); + p.totalSuccess = snap.totalSuccess(); + p.totalOverflow = snap.totalOverflow(); + p.lastSuccessAt = snap.lastSuccessAt(); + p.lastOverflowAt = snap.lastOverflowAt(); + p.lastUpdatedAt = snap.lastUpdatedAt(); + return p; + } + + // ==================== Lock-free reads ==================== + + /** + * Capture the current state snapshot (for out-of-lock DB persistence and EnvSnapshot refresh). + *

+ * Read path is concurrency-safe: all fields are volatile, reading an approximate value at a given moment. + */ + WindowProbeSnapshot snapshot() { + return new WindowProbeSnapshot( + phase, effectiveWindow, confidenceLower, confidenceUpper, + declaredLimit, peakObserved, successiveSuccess, successiveOverflow, + totalSuccess, totalOverflow, lastSuccessAt, lastOverflowAt, lastUpdatedAt + ); + } + + // ==================== Write path: recordSuccess (up aggressive) ==================== + + /** + * Record a successful invocation and immediately try to expand effectiveWindow (§6.2). + *

+ * Must be called within the ConcurrentHashMap.compute lock. + */ + void recordSuccess(int promptTokens) { + // 1. Sampling + samples[sampleIdx] = promptTokens; + sampleIdx = (sampleIdx + 1) % sampleSize; + if (sampleCount < sampleSize) { + sampleCount++; + } + + // 2. Statistics + if (promptTokens > peakObserved) { + peakObserved = promptTokens; + } + successiveSuccess++; + successiveOverflow = 0; + totalSuccess++; + lastSuccessAt = Instant.now(); + lastUpdatedAt = lastSuccessAt; + + // 3. State transition + int safeEstimate = safeEstimate(); + switch (phase) { + case COLD -> handleColdSuccess(safeEstimate); + case PROBING -> handleProbingSuccess(safeEstimate); + case BINARY_SEARCH -> handleBinarySearchSuccess(); + case STABLE -> handleStableSuccess(); + case DEGRADED -> handleDegradedSuccess(); + } + } + + /** COLD → PROBING: when samples fill SAMPLE_SIZE/2, extract the lower bound and start probing */ + private void handleColdSuccess(int safeEstimate) { + if (sampleCount < sampleSize / 2) { + return; + } + effectiveWindow = safeEstimate; + confidenceLower = safeEstimate; + confidenceUpper = (int) Math.min((long) safeEstimate * 2, globalCeiling); + if (declaredLimit > 0 && declaredLimit < confidenceUpper) { + confidenceUpper = declaredLimit; + } + phase = Phase.PROBING; + } + + /** PROBING: 3 consecutive successes and samples near the current window → expand the upper bound; hit ceiling → STABLE */ + private void handleProbingSuccess(int safeEstimate) { + if (successiveSuccess >= 3 && safeEstimate >= effectiveWindow * 0.8) { + // Expand the upper bound (up aggressive) + confidenceLower = Math.max(confidenceLower, effectiveWindow); + int newUpper = (int) Math.min((long) confidenceUpper * 2, globalCeiling); + if (declaredLimit > 0 && declaredLimit < newUpper) { + newUpper = declaredLimit; + } + confidenceUpper = newUpper; + effectiveWindow = confidenceUpper; + } + // Hit ceiling → STABLE (conservatively pin to the lower bound) + boolean hitCeiling = confidenceUpper >= globalCeiling; + boolean hitDeclared = declaredLimit > 0 && confidenceUpper >= declaredLimit; + if (hitCeiling || hitDeclared) { + effectiveWindow = confidenceLower; + phase = Phase.STABLE; + } + } + + /** BINARY_SEARCH: success → raise the lower bound, check convergence */ + private void handleBinarySearchSuccess() { + confidenceLower = Math.max(confidenceLower, effectiveWindow); + if (confidenceLower > 0 && (double) (confidenceUpper - confidenceLower) / confidenceLower < binaryConvergence) { + effectiveWindow = confidenceLower; + phase = Phase.STABLE; + } + } + + /** STABLE: no near-ceiling calls for longer than staleReprobeMs → re-probe (GPU may have scaled up) */ + private void handleStableSuccess() { + if (lastSuccessAt == null) { + return; + } + long elapsedMs = System.currentTimeMillis() - lastSuccessAt.toEpochMilli(); + if (elapsedMs > staleReprobeMs) { + phase = Phase.PROBING; + confidenceUpper = (int) Math.min((long) effectiveWindow * 2, globalCeiling); + if (declaredLimit > 0 && declaredLimit < confidenceUpper) { + confidenceUpper = declaredLimit; + } + effectiveWindow = confidenceUpper; + } + } + + /** DEGRADED → PROBING: 3 consecutive successes, re-probe */ + private void handleDegradedSuccess() { + if (successiveSuccess >= 3) { + phase = Phase.PROBING; + confidenceUpper = (int) Math.min((long) effectiveWindow * 2, globalCeiling); + if (declaredLimit > 0 && declaredLimit < confidenceUpper) { + confidenceUpper = declaredLimit; + } + effectiveWindow = confidenceUpper; + } + } + + // ==================== Write path: recordOverflow (down conservative) ==================== + + /** + * Record an overflow invocation (§6.3). + *

+ * Must be called within the ConcurrentHashMap.compute lock. + * A single overflow does not shrink; shrinkage requires statistical evidence from ≥ SAMPLE_SIZE/2 samples. + */ + void recordOverflow(int attemptedTokens) { + successiveOverflow++; + successiveSuccess = 0; + totalOverflow++; + lastOverflowAt = Instant.now(); + lastUpdatedAt = lastOverflowAt; + + // Insufficient samples: only update confidenceUpper, no shrinkage + if (sampleCount < sampleSize / 2) { + if (attemptedTokens > 0 && attemptedTokens < confidenceUpper) { + confidenceUpper = attemptedTokens; + } + return; + } + + int safeMax = safeEstimate(); + int shrunk = Math.max(safeMax, (int) Math.floor(attemptedTokens * overflowShrinkRatio)); + + switch (phase) { + case COLD, PROBING -> handleProbingOverflow(attemptedTokens, shrunk); + case BINARY_SEARCH -> handleBinarySearchOverflow(attemptedTokens, shrunk); + case STABLE -> handleStableOverflow(attemptedTokens, shrunk); + case DEGRADED -> handleDegradedOverflow(shrunk); + } + } + + /** COLD/PROBING overflow: has lower bound → BINARY_SEARCH, no lower bound → DEGRADED */ + private void handleProbingOverflow(int attemptedTokens, int shrunk) { + effectiveWindow = shrunk; + confidenceUpper = attemptedTokens; + if (confidenceLower > 0) { + phase = Phase.BINARY_SEARCH; + } else { + phase = Phase.DEGRADED; + } + } + + /** BINARY_SEARCH overflow: lower the upper bound, check convergence */ + private void handleBinarySearchOverflow(int attemptedTokens, int shrunk) { + effectiveWindow = shrunk; + confidenceUpper = Math.min(confidenceUpper, attemptedTokens); + if (confidenceLower > 0 && (double) (confidenceUpper - confidenceLower) / confidenceLower < binaryConvergence) { + effectiveWindow = confidenceLower; + phase = Phase.STABLE; + } + } + + /** STABLE overflow → DEGRADED */ + private void handleStableOverflow(int attemptedTokens, int shrunk) { + effectiveWindow = shrunk; + confidenceUpper = Math.min(confidenceUpper, attemptedTokens); + phase = Phase.DEGRADED; + } + + /** DEGRADED overflow: only shrink, never expand */ + private void handleDegradedOverflow(int shrunk) { + if (shrunk < effectiveWindow) { + effectiveWindow = shrunk; + } + } + + // ==================== Utilities ==================== + + /** Max successful token count within the sampling window (safeEstimate) */ + private int safeEstimate() { + int max = 0; + for (int i = 0; i < sampleCount; i++) { + if (samples[i] > max) { + max = samples[i]; + } + } + return max; + } +} diff --git a/mateclaw-server/src/main/java/vip/mate/context/intelligence/probe/WindowProbeRegistry.java b/mateclaw-server/src/main/java/vip/mate/context/intelligence/probe/WindowProbeRegistry.java new file mode 100644 index 000000000..6712b2d4b --- /dev/null +++ b/mateclaw-server/src/main/java/vip/mate/context/intelligence/probe/WindowProbeRegistry.java @@ -0,0 +1,207 @@ +package vip.mate.context.intelligence.probe; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import vip.mate.context.intelligence.config.ContextIntelligenceProperties; +import vip.mate.context.intelligence.metrics.ContextIntelMetrics; + +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Multi-model probe registry (write path entry). + *

+ * Manages the {@code (provider, model) → WindowProbe} mapping, using {@link ConcurrentHashMap#compute} + * to guarantee per-key atomic updates. The read path (budget planning) does not go through here; it reads + * {@code EnvSnapshotStore} directly. + *

+ * key format: {@code provider + ":" + modelName} (split on the first colon) + * + * @author MateClaw Team + */ +@Slf4j +@Component +public class WindowProbeRegistry { + + private final ConcurrentHashMap states = new ConcurrentHashMap<>(); + + private final ContextIntelligenceProperties props; + private final WindowStateLoader stateLoader; // nullable + /** Observability metrics gateway (§C.7), nullable: ObjectProvider may return null in test environments */ + private final ContextIntelMetrics metrics; + + public WindowProbeRegistry( + ContextIntelligenceProperties props, + ObjectProvider stateLoaderProvider, + ObjectProvider metricsProvider) { + this.props = props; + this.stateLoader = stateLoaderProvider.getIfAvailable(); + this.metrics = metricsProvider.getIfAvailable(); + } + + // ==================== key utilities ==================== + + /** Build a unified key for (provider, modelName) */ + public static String key(String provider, String modelName) { + return provider + ":" + modelName; + } + + private static String[] splitKey(String key) { + int idx = key.indexOf(':'); + if (idx < 0) { + return new String[]{key, ""}; + } + return new String[]{key.substring(0, idx), key.substring(idx + 1)}; + } + + // ==================== Write path ==================== + + /** + * Record a successful invocation (state machine update is performed within the compute lock). + *

+ * Both snapshot and previousPhase are captured within the compute lock to ensure consistency. + * + * @return ProbeUpdate (containing snapshot + previousPhase); the processor uses it to decide whether to persist + */ + public ProbeUpdate recordSuccess(String key, int promptTokens) { + final WindowProbe.Phase[] prevHolder = new WindowProbe.Phase[1]; + final WindowProbeSnapshot[] snapHolder = new WindowProbeSnapshot[1]; + states.compute(key, (k, existing) -> { + WindowProbe probe = (existing != null) ? existing : loadOrCreate(k); + prevHolder[0] = probe.phase; + probe.recordSuccess(promptTokens); + snapHolder[0] = probe.snapshot(); + return probe; + }); + recordPhaseMetrics(key, prevHolder[0], snapHolder[0]); + return new ProbeUpdate(snapHolder[0], prevHolder[0]); + } + + /** + * Record an overflow invocation (state machine update is performed within the compute lock). + *

+ * The overflow path executes synchronously (@EventListener without @Async), ensuring confidenceUpper is + * updated before the PTL retry. + */ + public ProbeUpdate recordOverflow(String key, int attemptedTokens) { + final WindowProbe.Phase[] prevHolder = new WindowProbe.Phase[1]; + final WindowProbeSnapshot[] snapHolder = new WindowProbeSnapshot[1]; + states.compute(key, (k, existing) -> { + WindowProbe probe = (existing != null) ? existing : loadOrCreate(k); + prevHolder[0] = probe.phase; + probe.recordOverflow(attemptedTokens); + snapHolder[0] = probe.snapshot(); + return probe; + }); + recordPhaseMetrics(key, prevHolder[0], snapHolder[0]); + return new ProbeUpdate(snapHolder[0], prevHolder[0]); + } + + // ==================== Read path ==================== + + /** + * Get a probe snapshot (for metrics / debugging, not the main budget-planning path). + *

+ * The main budget-planning path reads EnvSnapshotStore, not the Registry directly. + */ + public WindowProbeSnapshot getSnapshot(String key) { + WindowProbe probe = states.get(key); + return (probe != null) ? probe.snapshot() : null; + } + + // ==================== Idle eviction (C.2) ==================== + + /** + * Periodically evict probes of idle models to prevent the states map from growing unbounded. + *

+ * Eviction rule: STABLE state + no successful invocations for longer than idleThresholdMs. + * After eviction, the next access rebuilds from DB via loadOrCreate. + */ + @Scheduled(fixedRateString = "${mateclaw.context.intelligence.auto-clean.clean-interval-ms:3600000}") + public void evictIdleProbes() { + ContextIntelligenceProperties.AutoClean cfg = props.getAutoClean(); + if (!cfg.isEnabled()) { + return; + } + Instant cutoff = Instant.now().minus(Duration.ofMillis(cfg.getIdleThresholdMs())); + int[] removed = {0}; + states.entrySet().removeIf(entry -> { + WindowProbe probe = entry.getValue(); + if (probe.phase != WindowProbe.Phase.STABLE) { + return false; + } + if (probe.lastSuccessAt == null) { + return false; + } + if (probe.lastSuccessAt.isAfter(cutoff)) { + return false; + } + removed[0]++; + return true; + }); + if (removed[0] > 0) { + log.debug("[ContextIntel] evicted {} idle probes (STABLE + idle > {}ms)", + removed[0], cfg.getIdleThresholdMs()); + } + } + + // ==================== Internal methods ==================== + + /** + * Report phase-related metrics (§C.7). + *

+ * Executed outside the compute lock to avoid holding the lock during potential metrics registration contention. + *

    + *
  • {@code updatePhase} gauge is always updated to reflect the current phase
  • + *
  • {@code recordPhaseTransition} counter is incremented only on phase transitions
  • + *
+ * Wrapped in try-catch throughout; a metrics failure must not affect the state machine. + */ + private void recordPhaseMetrics(String key, WindowProbe.Phase prevPhase, WindowProbeSnapshot snap) { + if (metrics == null) return; + try { + String[] parts = splitKey(key); + String provider = parts[0]; + String model = parts[1]; + WindowProbe.Phase currPhase = snap.phase(); + metrics.updatePhase(provider, model, currPhase.ordinal()); + if (prevPhase != currPhase) { + metrics.recordPhaseTransition(provider, model, + prevPhase.name(), currPhase.name()); + } + } catch (Exception e) { + log.debug("[ContextIntel] phase metrics failed for {}: {}", key, e.getMessage()); + } + } + + /** + * Load or create a probe (§5.7 restart recovery flow). + *

+ * 1. First query DB (if stateLoader is available) + * 2. DB has a record → restoreFromSnapshot + * 3. DB has no record → coldStart(coldSeedFallback) + *

+ * Note: this method is called within the compute lock, so DB I/O holds the lock. + * It is only triggered on first access (cold start), which is acceptable. + */ + private WindowProbe loadOrCreate(String key) { + // 1. Try to restore from DB + if (stateLoader != null && props.isDbPersist()) { + try { + String[] parts = splitKey(key); + WindowProbeSnapshot snap = stateLoader.load(parts[0], parts[1]); + if (snap != null && snap.effectiveWindow() > 0) { + return WindowProbe.restoreFromSnapshot(snap, props.getProbe()); + } + } catch (Exception e) { + log.debug("[ContextIntel] loadFromDB failed for {}: {}", key, e.getMessage()); + } + } + + // 2. Cold start + return WindowProbe.coldStart(props.getProbe().getColdSeedFallback(), props.getProbe()); + } +} diff --git a/mateclaw-server/src/main/java/vip/mate/context/intelligence/probe/WindowProbeSnapshot.java b/mateclaw-server/src/main/java/vip/mate/context/intelligence/probe/WindowProbeSnapshot.java new file mode 100644 index 000000000..7ec73b398 --- /dev/null +++ b/mateclaw-server/src/main/java/vip/mate/context/intelligence/probe/WindowProbeSnapshot.java @@ -0,0 +1,43 @@ +package vip.mate.context.intelligence.probe; + +import java.time.Instant; + +/** + * WindowProbe state snapshot (for DB persistence and out-of-lock reads). + *

+ * Captured within the {@code ConcurrentHashMap.compute} lock via {@code WindowProbe.snapshot()}, + * and passed outside the lock for DB writes and snapshot refresh. + *

+ * samples / sampleIdx / sampleCount are volatile data and are not included in the snapshot. + * + * @param phase current phase + * @param effectiveWindow effective window + * @param confidenceLower confidence lower bound + * @param confidenceUpper confidence upper bound + * @param declaredLimit model declared limit (0 means unknown) + * @param peakObserved historical max successful token count + * @param successiveSuccess successive success count + * @param successiveOverflow successive overflow count + * @param totalSuccess total success count + * @param totalOverflow total overflow count + * @param lastSuccessAt last success time + * @param lastOverflowAt last overflow time + * @param lastUpdatedAt last update time + * + * @author MateClaw Team + */ +public record WindowProbeSnapshot( + WindowProbe.Phase phase, + int effectiveWindow, + int confidenceLower, + int confidenceUpper, + int declaredLimit, + int peakObserved, + int successiveSuccess, + int successiveOverflow, + int totalSuccess, + int totalOverflow, + Instant lastSuccessAt, + Instant lastOverflowAt, + Instant lastUpdatedAt +) {} diff --git a/mateclaw-server/src/main/java/vip/mate/context/intelligence/probe/WindowStateLoader.java b/mateclaw-server/src/main/java/vip/mate/context/intelligence/probe/WindowStateLoader.java new file mode 100644 index 000000000..ecf225083 --- /dev/null +++ b/mateclaw-server/src/main/java/vip/mate/context/intelligence/probe/WindowStateLoader.java @@ -0,0 +1,24 @@ +package vip.mate.context.intelligence.probe; + +/** + * Window state loader (dependency inversion, to prevent the probe package from reverse depending on the persist package). + *

+ * Implemented by {@code vip.mate.context.intelligence.persist.WindowStateRepository}, + * injected into {@link WindowProbeRegistry} in {@code ContextIntelligenceAutoConfiguration}. + *

+ * When DB persistence is disabled or the Repository is not ready, the Registry degrades to null (always cold start). + * + * @author MateClaw Team + */ +@FunctionalInterface +public interface WindowStateLoader { + + /** + * Load the persisted state snapshot by (provider, modelName). + * + * @param provider provider ID + * @param modelName model name + * @return the snapshot; returns null when there is no record + */ + WindowProbeSnapshot load(String provider, String modelName); +} diff --git a/mateclaw-server/src/main/java/vip/mate/context/intelligence/snapshot/EnvSnapshot.java b/mateclaw-server/src/main/java/vip/mate/context/intelligence/snapshot/EnvSnapshot.java new file mode 100644 index 000000000..5d9828fa1 --- /dev/null +++ b/mateclaw-server/src/main/java/vip/mate/context/intelligence/snapshot/EnvSnapshot.java @@ -0,0 +1,44 @@ +package vip.mate.context.intelligence.snapshot; + +import vip.mate.context.intelligence.enums.ResourcePressure; + +/** + * Immutable environment snapshot (sole data source for read path). + *

+ * Copy-on-write: create a new instance to replace the old one whenever a value changes; + * the read path reads lock-free via {@code AtomicReference.get()}. + *

+ * Field descriptions: + *

    + *
  • {@code effectiveWindow} — effective context window estimated by WindowProbe
  • + *
  • {@code pressure} — pressure level inferred by PressureInferencer
  • + *
  • {@code diversityDetected} — whether BackendDiversityTracker detected multiple backends
  • + *
  • {@code safeWindow} — P10 safe window (effective only when diversityDetected=true, acts as a ceiling for effectiveWindow)
  • + *
  • {@code lastUpdatedAt} — snapshot timestamp (for freshness checks)
  • + *
+ * + * @param effectiveWindow effective window (0 means no data) + * @param pressure pressure level + * @param diversityDetected whether multiple backends were detected + * @param safeWindow P10 safe window (0 when diversityDetected=false) + * @param lastUpdatedAt snapshot timestamp (epoch millis, 0 means empty snapshot) + * + * @author MateClaw Team + */ +public record EnvSnapshot( + int effectiveWindow, + ResourcePressure pressure, + boolean diversityDetected, + int safeWindow, + long lastUpdatedAt +) { + + /** Empty snapshot, indicates no data; read path falls back to yml */ + public static final EnvSnapshot EMPTY = + new EnvSnapshot(0, ResourcePressure.NORMAL, false, 0, 0); + + /** Whether the snapshot is available (effectiveWindow > 0 and not empty) */ + public boolean isAvailable() { + return effectiveWindow > 0; + } +} diff --git a/mateclaw-server/src/main/java/vip/mate/context/intelligence/snapshot/EnvSnapshotStore.java b/mateclaw-server/src/main/java/vip/mate/context/intelligence/snapshot/EnvSnapshotStore.java new file mode 100644 index 000000000..b6f362c62 --- /dev/null +++ b/mateclaw-server/src/main/java/vip/mate/context/intelligence/snapshot/EnvSnapshotStore.java @@ -0,0 +1,125 @@ +package vip.mate.context.intelligence.snapshot; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.stereotype.Component; +import vip.mate.context.intelligence.enums.ResourcePressure; +import vip.mate.context.intelligence.metrics.ContextIntelMetrics; +import vip.mate.context.intelligence.probe.WindowProbeRegistry; + +import java.time.Instant; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Environment snapshot cache (sole data source for read path). + *

+ * Stores {@link EnvSnapshot} keyed by {@code (provider, model)}, using + * {@code ConcurrentHashMap + AtomicReference} for lock-free reads. + *

+ * Key design (§5.4): + *

    + *
  • EnvSnapshot is an immutable record, copy-on-write
  • + *
  • AtomicReference.set is a volatile write, immediately visible to readers
  • + *
  • refreshIfChanged performs CAS only when effectiveWindow / pressure / diversityDetected change, reducing contention
  • + *
+ * + * @author MateClaw Team + */ +@Slf4j +@Component +public class EnvSnapshotStore { + + private final ConcurrentHashMap> snapshots = new ConcurrentHashMap<>(); + + /** Observability metrics gateway (§C.7), nullable: ObjectProvider may return null in test environment */ + private final ContextIntelMetrics metrics; + + public EnvSnapshotStore(ObjectProvider metricsProvider) { + this.metrics = metricsProvider.getIfAvailable(); + } + + /** + * Read path: get snapshot (fully lock-free). + *

+ * Returns EMPTY to indicate no data; caller should fall back to yml. + */ + public EnvSnapshot get(String provider, String modelName) { + String key = WindowProbeRegistry.key(provider, modelName); + AtomicReference ref = snapshots.get(key); + if (ref == null) { + return EnvSnapshot.EMPTY; + } + EnvSnapshot snap = ref.get(); + return (snap != null) ? snap : EnvSnapshot.EMPTY; + } + + /** + * Write path: conditional snapshot refresh (only update when key fields change). + *

+ * Called by ContextSignalProcessor after each signal is processed. + * + * @param key provider:model + * @param newEffectiveWindow new effective window + * @param newPressure new pressure level + * @param newDiversity new diversity detection result + * @param newSafeWindow new P10 safe window (effective when diversityDetected=true) + */ + public void refreshIfChanged(String key, int newEffectiveWindow, + ResourcePressure newPressure, boolean newDiversity, int newSafeWindow) { + try { + AtomicReference ref = snapshots.computeIfAbsent( + key, k -> new AtomicReference<>(EnvSnapshot.EMPTY)); + EnvSnapshot old = ref.get(); + // Only create a new snapshot when one of the three key fields changes (reduce CAS contention, mitigates risk 2) + if (old.effectiveWindow() != newEffectiveWindow + || old.pressure() != newPressure + || old.diversityDetected() != newDiversity) { + ref.set(new EnvSnapshot( + newEffectiveWindow, newPressure, newDiversity, newSafeWindow, Instant.now().toEpochMilli() + )); + // §C.7: Report counter + 3 gauges when snapshot is actually refreshed + recordRefreshMetrics(key, newEffectiveWindow, newPressure, newDiversity); + } + } catch (Exception e) { + log.debug("[ContextIntel] EnvSnapshot refresh failed for {}: {}", key, e.getMessage()); + } + } + + /** Idle eviction (coordinated by WindowProbeRegistry.evictIdleProbes) */ + public void evict(String key) { + snapshots.remove(key); + } + + // ==================== Internal methods ==================== + + /** + * Report snapshot refresh metrics (§C.7). + *

+ * Only invoked on actual refresh (not on every refreshIfChanged call). + * Fully try-catch guarded; metrics failures must not affect the snapshot. + */ + private void recordRefreshMetrics(String key, int effectiveWindow, + ResourcePressure pressure, boolean diversity) { + if (metrics == null) return; + try { + String[] parts = splitKey(key); + String provider = parts[0]; + String model = parts[1]; + metrics.recordSnapshotRefresh(provider, model); + metrics.updatePressureLevel(provider, model, pressure.gaugeValue()); + metrics.updateEffectiveWindow(provider, model, effectiveWindow); + metrics.updateDiversityDetected(provider, model, diversity); + } catch (Exception e) { + log.debug("[ContextIntel] refresh metrics failed for {}: {}", key, e.getMessage()); + } + } + + private static String[] splitKey(String key) { + int idx = key.indexOf(':'); + if (idx < 0) { + return new String[]{key, ""}; + } + return new String[]{key.substring(0, idx), key.substring(idx + 1)}; + } +} diff --git a/mateclaw-server/src/main/resources/application.yml b/mateclaw-server/src/main/resources/application.yml index 74b4f78e8..0aef80116 100644 --- a/mateclaw-server/src/main/resources/application.yml +++ b/mateclaw-server/src/main/resources/application.yml @@ -335,6 +335,13 @@ mate: compact-trigger-ratio: 0.75 preserve-recent-pairs: 2 summary-max-tokens: 300 + # Context Intelligence v2 模块配置(设计文档 §C.9) + # 动态探测模型上下文窗口,优化历史消息裁剪与 token 预算分配 + context: + intelligence: + enabled: true + metrics: + enabled: true # Wiki 知识库配置 wiki: enabled: true # 是否启用 Wiki 知识库功能 diff --git a/mateclaw-server/src/main/resources/db/migration/h2/V161__mate_model_context_state.sql b/mateclaw-server/src/main/resources/db/migration/h2/V161__mate_model_context_state.sql new file mode 100644 index 000000000..8dd4bc8f0 --- /dev/null +++ b/mateclaw-server/src/main/resources/db/migration/h2/V161__mate_model_context_state.sql @@ -0,0 +1,26 @@ +-- V161: Context Intelligence v2 — mate_model_context_state table (H2). +-- +-- H2 accepts the same column layout as MySQL. Differences: +-- 1. TINYINT(1) -> BOOLEAN +-- 2. DATETIME(3) -> TIMESTAMP (H2's TIMESTAMP has sub-second precision) +-- See mysql/V161__mate_model_context_state.sql for full design notes. + +CREATE TABLE IF NOT EXISTS mate_model_context_state ( + provider_id VARCHAR(128) NOT NULL, + model_name VARCHAR(256) NOT NULL, + phase VARCHAR(32) NOT NULL, + effective_window INT NOT NULL DEFAULT 0, + confidence_lower INT NOT NULL DEFAULT 0, + confidence_upper INT NOT NULL DEFAULT 0, + declared_limit INT NOT NULL DEFAULT 0, + peak_observed INT NOT NULL DEFAULT 0, + successive_success INT NOT NULL DEFAULT 0, + successive_overflow INT NOT NULL DEFAULT 0, + total_success INT NOT NULL DEFAULT 0, + total_overflow INT NOT NULL DEFAULT 0, + last_success_at TIMESTAMP NULL, + last_overflow_at TIMESTAMP NULL, + last_updated_at TIMESTAMP NULL, + is_diversity BOOLEAN NOT NULL DEFAULT FALSE, + PRIMARY KEY (provider_id, model_name) +); diff --git a/mateclaw-server/src/main/resources/db/migration/kingbase/V161__mate_model_context_state.sql b/mateclaw-server/src/main/resources/db/migration/kingbase/V161__mate_model_context_state.sql new file mode 100644 index 000000000..722049ee6 --- /dev/null +++ b/mateclaw-server/src/main/resources/db/migration/kingbase/V161__mate_model_context_state.sql @@ -0,0 +1,24 @@ +-- V161: Context Intelligence v2 — mate_model_context_state table (Kingbase). +-- +-- Kingbase accepts BOOLEAN and TIMESTAMP(3). Schema mirrors the H2 layout. +-- See mysql/V161__mate_model_context_state.sql for full design notes. + +CREATE TABLE IF NOT EXISTS mate_model_context_state ( + provider_id VARCHAR(128) NOT NULL, + model_name VARCHAR(256) NOT NULL, + phase VARCHAR(32) NOT NULL, + effective_window INT NOT NULL DEFAULT 0, + confidence_lower INT NOT NULL DEFAULT 0, + confidence_upper INT NOT NULL DEFAULT 0, + declared_limit INT NOT NULL DEFAULT 0, + peak_observed INT NOT NULL DEFAULT 0, + successive_success INT NOT NULL DEFAULT 0, + successive_overflow INT NOT NULL DEFAULT 0, + total_success INT NOT NULL DEFAULT 0, + total_overflow INT NOT NULL DEFAULT 0, + last_success_at TIMESTAMP(3) NULL, + last_overflow_at TIMESTAMP(3) NULL, + last_updated_at TIMESTAMP(3) NULL, + is_diversity BOOLEAN NOT NULL DEFAULT FALSE, + PRIMARY KEY (provider_id, model_name) +); diff --git a/mateclaw-server/src/main/resources/db/migration/mysql/V161__mate_model_context_state.sql b/mateclaw-server/src/main/resources/db/migration/mysql/V161__mate_model_context_state.sql new file mode 100644 index 000000000..37efb196d --- /dev/null +++ b/mateclaw-server/src/main/resources/db/migration/mysql/V161__mate_model_context_state.sql @@ -0,0 +1,38 @@ +-- V161: Context Intelligence v2 — mate_model_context_state table. +-- +-- Backs the WindowProbe state machine (vip.mate.context.intelligence.probe). +-- Each (provider_id, model_name) pair has at most one row storing the probe's +-- learned window bounds + statistics so that restarts do not lose learning. +-- +-- Why a new table instead of reusing v1's mate_model_context_state: +-- The user reset the codebase before v2 was built, so v1's table was never +-- created in any prior migration. This V161 creates it from scratch with +-- the v2 schema (design doc §5.6). +-- +-- Column notes: +-- * phase — WindowProbe.Phase enum name (COLD/PROBING/BINARY_SEARCH/STABLE/DEGRADED) +-- * declared_limit — model-declared max context (0 = unknown) +-- * is_diversity — reserved for BackendDiversityTracker.diversityDetected +-- persistence (design doc §5.6). Not actively written/read by the current +-- repository; DEFAULT 0 keeps it safe until the restore path is wired. +-- * last_*_at — nullable because COLD-start probes have no timestamps yet. + +CREATE TABLE IF NOT EXISTS mate_model_context_state ( + provider_id VARCHAR(128) NOT NULL, + model_name VARCHAR(256) NOT NULL, + phase VARCHAR(32) NOT NULL, + effective_window INT NOT NULL DEFAULT 0, + confidence_lower INT NOT NULL DEFAULT 0, + confidence_upper INT NOT NULL DEFAULT 0, + declared_limit INT NOT NULL DEFAULT 0, + peak_observed INT NOT NULL DEFAULT 0, + successive_success INT NOT NULL DEFAULT 0, + successive_overflow INT NOT NULL DEFAULT 0, + total_success INT NOT NULL DEFAULT 0, + total_overflow INT NOT NULL DEFAULT 0, + last_success_at DATETIME(3) NULL, + last_overflow_at DATETIME(3) NULL, + last_updated_at DATETIME(3) NULL, + is_diversity TINYINT(1) NOT NULL DEFAULT 0, + PRIMARY KEY (provider_id, model_name) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;