diff --git a/packages/semantic-cache/src/SemanticCache.ts b/packages/semantic-cache/src/SemanticCache.ts index 279f03c3..2f89e419 100644 --- a/packages/semantic-cache/src/SemanticCache.ts +++ b/packages/semantic-cache/src/SemanticCache.ts @@ -15,11 +15,7 @@ import type { ModelCost, ConfigRefreshOptions, } from './types'; -import { - SemanticCacheUsageError, - EmbeddingError, - ValkeyCommandError, -} from './errors'; +import { SemanticCacheUsageError, EmbeddingError, ValkeyCommandError } from './errors'; import { createTelemetry, type Telemetry } from './telemetry'; import { encodeFloat32, @@ -33,11 +29,7 @@ import { import { DEFAULT_COST_TABLE } from './defaultCostTable'; import { clusterScan } from './cluster'; import { createAnalytics, NOOP_ANALYTICS, type Analytics } from './analytics'; -import { - DiscoveryManager, - buildSemanticMetadata, - type DiscoveryOptions, -} from './discovery'; +import { DiscoveryManager, buildSemanticMetadata, type DiscoveryOptions } from './discovery'; const INVALIDATE_BATCH_SIZE = 1000; @@ -47,6 +39,21 @@ function errMsg(err: unknown): string { return err instanceof Error ? err.message : String(err); } +function parseHitCostMicros(raw: string | undefined | null): number | null { + if (raw === undefined || raw === null) { + return null; + } + const n = parseInt(raw, 10); + if (!Number.isFinite(n) || n < 0) { + return null; + } + return n; +} + +function correlationIdFor(prompt: string): string { + return createHash('sha256').update(prompt).digest('hex').slice(0, 16); +} + export class SemanticCache { private readonly client: Valkey; private readonly embedFn: EmbedFn; @@ -55,6 +62,7 @@ export class SemanticCache { private readonly entryPrefix: string; private readonly statsKey: string; private readonly similarityWindowKey: string; + private readonly missPendingKey: string; private readonly configKey: string; private defaultThreshold: number; private readonly defaultTtl: number | undefined; @@ -102,6 +110,7 @@ export class SemanticCache { this.entryPrefix = `${this.name}:entry:`; this.statsKey = `${this.name}:__stats`; this.similarityWindowKey = `${this.name}:__similarity_window`; + this.missPendingKey = `${this.name}:__miss_pending`; this.configKey = `${this.name}:__config`; this.embedKeyPrefix = `${this.name}:embed:`; this.defaultThreshold = options.defaultThreshold ?? 0.1; @@ -184,10 +193,7 @@ export class SemanticCache { } // Cluster-aware SCAN for entry keys and embed cache keys - const patterns = [ - `${this.name}:entry:*`, - `${this.name}:embed:*`, - ]; + const patterns = [`${this.name}:entry:*`, `${this.name}:embed:*`]; for (const pattern of patterns) { await clusterScan(this.client, pattern, async (keys, nodeClient) => { @@ -197,6 +203,7 @@ export class SemanticCache { await this.client.del(this.statsKey); await this.client.del(this.similarityWindowKey); + await this.client.del(this.missPendingKey); this.analytics.capture('cache_flush'); } @@ -240,7 +247,10 @@ export class SemanticCache { // -- Public operations -- - async check(prompt: string | ContentBlock[], options?: CacheCheckOptions): Promise { + async check( + prompt: string | ContentBlock[], + options?: CacheCheckOptions, + ): Promise { this.assertInitialized('check'); return this.traced('check', async (span) => { @@ -269,9 +279,9 @@ export class SemanticCache { // AND semantics: each ref must be present — chain separate TAG clauses. const binaryFilter = binaryRefs.length > 0 && this._hasBinaryRefs - ? (binaryRefs.length === 1 - ? `@binary_refs:{${escapeTag(binaryRefs[0])}}` - : binaryRefs.map((r) => `@binary_refs:{${escapeTag(r)}}`).join(' ')) + ? binaryRefs.length === 1 + ? `@binary_refs:{${escapeTag(binaryRefs[0])}}` + : binaryRefs.map((r) => `@binary_refs:{${escapeTag(r)}}`).join(' ') : null; const combinedFilter = [userFilter, binaryFilter].filter(Boolean).join(' '); const filterExpr = combinedFilter ? `(${combinedFilter})` : '*'; @@ -281,10 +291,18 @@ export class SemanticCache { let rawResult: unknown; try { rawResult = await this.client.call( - 'FT.SEARCH', this.indexName, query, - 'PARAMS', '2', 'vec', encodeFloat32(embedding), - 'LIMIT', '0', String(k), - 'DIALECT', '2', + 'FT.SEARCH', + this.indexName, + query, + 'PARAMS', + '2', + 'vec', + encodeFloat32(embedding), + 'LIMIT', + '0', + String(k), + 'DIALECT', + '2', ); } catch (err) { throw new ValkeyCommandError('FT.SEARCH', err); @@ -293,16 +311,19 @@ export class SemanticCache { const parsed = parseFtSearchResponse(rawResult); const categoryLabel = category || 'none'; - const timingAttrs = { 'embedding_latency_ms': embedSec * 1000, 'search_latency_ms': searchMs }; + const timingAttrs = { embedding_latency_ms: embedSec * 1000, search_latency_ms: searchMs }; // No candidates at all if (parsed.length === 0) { await this.recordStat('misses'); this.telemetry.metrics.requestsTotal - .labels({ cache_name: this.name, result: 'miss', category: categoryLabel }).inc(); + .labels({ cache_name: this.name, result: 'miss', category: categoryLabel }) + .inc(); span.setAttributes({ - 'cache.hit': false, 'cache.name': this.name, - 'cache.category': categoryLabel, ...timingAttrs, + 'cache.hit': false, + 'cache.name': this.name, + 'cache.category': categoryLabel, + ...timingAttrs, }); return { hit: false, confidence: 'miss' as const }; } @@ -312,20 +333,25 @@ export class SemanticCache { if (!isNaN(score)) { this.telemetry.metrics.similarityScore - .labels({ cache_name: this.name, category: categoryLabel }).observe(score); + .labels({ cache_name: this.name, category: categoryLabel }) + .observe(score); } // Miss (no usable score, or score exceeds threshold) if (isNaN(score) || score > threshold) { if (!isNaN(score)) { - await this.recordSimilarityWindow(score, 'miss', category); + const missMember = await this.recordSimilarityWindow(score, 'miss', category, null); + await this.recordMissPending(promptText, missMember); } await this.recordStat('misses'); this.telemetry.metrics.requestsTotal - .labels({ cache_name: this.name, result: 'miss', category: categoryLabel }).inc(); + .labels({ cache_name: this.name, result: 'miss', category: categoryLabel }) + .inc(); span.setAttributes({ - 'cache.hit': false, 'cache.name': this.name, - 'cache.category': categoryLabel, ...timingAttrs, + 'cache.hit': false, + 'cache.name': this.name, + 'cache.category': categoryLabel, + ...timingAttrs, ...(isNaN(score) ? {} : { 'cache.similarity': score, 'cache.threshold': threshold }), }); @@ -350,16 +376,23 @@ export class SemanticCache { candidate: { response: parsed[i].fields['response'] ?? '', similarity: s }, })); const picked = await rerankOpts.rerankFn( - promptText, indexedCandidates.map((x) => x.candidate), + promptText, + indexedCandidates.map((x) => x.candidate), ); // Explicit bounds check: -1 means "reject all"; out-of-range is a caller bug // treated as a miss rather than silently falling back to the top candidate. if (picked === -1 || picked < 0 || picked >= indexedCandidates.length) { - await this.recordSimilarityWindow(score, 'miss', category); + const missMember = await this.recordSimilarityWindow(score, 'miss', category, null); + await this.recordMissPending(promptText, missMember); await this.recordStat('misses'); this.telemetry.metrics.requestsTotal - .labels({ cache_name: this.name, result: 'miss', category: categoryLabel }).inc(); - span.setAttributes({ 'cache.hit': false, 'cache.name': this.name, 'cache.reranked': true }); + .labels({ cache_name: this.name, result: 'miss', category: categoryLabel }) + .inc(); + span.setAttributes({ + 'cache.hit': false, + 'cache.name': this.name, + 'cache.reranked': true, + }); return { hit: false, confidence: 'miss' as const }; } // Map back to the original parsed[] index (not the candidates[] index) @@ -376,12 +409,16 @@ export class SemanticCache { // Evict stale entry try { await this.client.del(winner.key); - } catch { /* best effort */ } - await this.recordSimilarityWindow(winnerScore, 'miss', category); + } catch { + /* best effort */ + } + const missMember = await this.recordSimilarityWindow(winnerScore, 'miss', category, null); + await this.recordMissPending(promptText, missMember); this.telemetry.metrics.staleModelEvictions.labels({ cache_name: this.name }).inc(); await this.recordStat('misses'); this.telemetry.metrics.requestsTotal - .labels({ cache_name: this.name, result: 'miss', category: categoryLabel }).inc(); + .labels({ cache_name: this.name, result: 'miss', category: categoryLabel }) + .inc(); span.setAttributes({ 'cache.hit': false, 'cache.stale_evicted': true }); return { hit: false, confidence: 'miss' as const }; } @@ -400,9 +437,12 @@ export class SemanticCache { const onError = options.judge.onError ?? 'accept'; type JudgeDecision = - | 'accept' | 'reject' - | 'error_accept' | 'error_reject' - | 'timeout_accept' | 'timeout_reject'; + | 'accept' + | 'reject' + | 'error_accept' + | 'error_reject' + | 'timeout_accept' + | 'timeout_reject'; let decision: JudgeDecision; try { @@ -447,7 +487,8 @@ export class SemanticCache { // Preserve 'uncertain'; fall through to hit-return path } else { // reject / error_reject / timeout_reject → treat as miss - await this.recordSimilarityWindow(winnerScore, 'miss', category); + const missMember = await this.recordSimilarityWindow(winnerScore, 'miss', category, null); + await this.recordMissPending(promptText, missMember); await this.recordStat('misses'); this.telemetry.metrics.requestsTotal .labels({ cache_name: this.name, result: 'miss', category: categoryLabel }) @@ -472,12 +513,15 @@ export class SemanticCache { } // --- End judge --- + const hitCostMicros = parseHitCostMicros(winner.fields['cost_micros']); + // Record as genuine hit (moved here from before the judge block) - await this.recordSimilarityWindow(winnerScore, 'hit', category); + await this.recordSimilarityWindow(winnerScore, 'hit', category, hitCostMicros); await this.recordStat('hits'); const metricResult = confidence === 'uncertain' ? 'uncertain_hit' : 'hit'; this.telemetry.metrics.requestsTotal - .labels({ cache_name: this.name, result: metricResult, category: categoryLabel }).inc(); + .labels({ cache_name: this.name, result: metricResult, category: categoryLabel }) + .inc(); if (this.defaultTtl !== undefined && matchedKey) { await this.client.expire(matchedKey, this.defaultTtl); @@ -485,16 +529,13 @@ export class SemanticCache { // Cost saved let costSaved: number | undefined; - const costMicrosStr = winner.fields['cost_micros']; - if (costMicrosStr) { - const costMicros = parseInt(costMicrosStr, 10); - if (!isNaN(costMicros) && costMicros > 0) { - costSaved = costMicros / 1_000_000; - // Atomically increment cost_saved_micros in stats - await this.client.hincrby(this.statsKey, 'cost_saved_micros', costMicros); - this.telemetry.metrics.costSavedTotal - .labels({ cache_name: this.name, category: categoryLabel }).inc(costSaved); - } + if (hitCostMicros !== null) { + costSaved = hitCostMicros / 1_000_000; + // Atomically increment cost_saved_micros in stats + await this.client.hincrby(this.statsKey, 'cost_saved_micros', hitCostMicros); + this.telemetry.metrics.costSavedTotal + .labels({ cache_name: this.name, category: categoryLabel }) + .inc(costSaved); } // Content blocks @@ -503,18 +544,27 @@ export class SemanticCache { if (contentBlocksStr) { try { contentBlocks = JSON.parse(contentBlocksStr); - } catch { /* ignore parse errors */ } + } catch { + /* ignore parse errors */ + } } span.setAttributes({ - 'cache.hit': true, 'cache.similarity': winnerScore, 'cache.threshold': threshold, - 'cache.confidence': confidence, 'cache.matched_key': matchedKey, - 'cache.category': categoryLabel, ...timingAttrs, + 'cache.hit': true, + 'cache.similarity': winnerScore, + 'cache.threshold': threshold, + 'cache.confidence': confidence, + 'cache.matched_key': matchedKey, + 'cache.category': categoryLabel, + ...timingAttrs, }); const result: CacheCheckResult = { - hit: true, response: winner.fields['response'], - similarity: winnerScore, confidence, matchedKey, + hit: true, + response: winner.fields['response'], + similarity: winnerScore, + confidence, + matchedKey, }; if (costSaved !== undefined) result.costSaved = costSaved; if (contentBlocks) result.contentBlocks = contentBlocks; @@ -522,7 +572,11 @@ export class SemanticCache { }); } - async store(prompt: string | ContentBlock[], response: string, options?: CacheStoreOptions): Promise { + async store( + prompt: string | ContentBlock[], + response: string, + options?: CacheStoreOptions, + ): Promise { this.assertInitialized('store'); return this.traced('store', async (span) => { @@ -545,8 +599,9 @@ export class SemanticCache { const pricing = this.costTable[options.model]; if (pricing) { costMicros = Math.round( - (options.inputTokens * pricing.inputPer1k / 1000 + - options.outputTokens * pricing.outputPer1k / 1000) * 1_000_000 + ((options.inputTokens * pricing.inputPer1k) / 1000 + + (options.outputTokens * pricing.outputPer1k) / 1000) * + 1_000_000, ); } } @@ -589,11 +644,18 @@ export class SemanticCache { if (ttl !== undefined) await this.client.expire(entryKey, ttl); span.setAttributes({ - 'cache.name': this.name, 'cache.key': entryKey, 'cache.ttl': ttl ?? -1, - 'cache.category': category || 'none', 'cache.model': model || 'none', - 'embedding_latency_ms': embedSec * 1000, + 'cache.name': this.name, + 'cache.key': entryKey, + 'cache.ttl': ttl ?? -1, + 'cache.category': category || 'none', + 'cache.model': model || 'none', + embedding_latency_ms: embedSec * 1000, }); + if (costMicros !== undefined && costMicros >= 0) { + await this.applyCostToPendingMiss(promptText, costMicros); + } + return entryKey; }); } @@ -622,12 +684,18 @@ export class SemanticCache { const model = options?.model ?? ''; let costMicros: number | undefined; - if (options?.model && options?.inputTokens !== undefined && options?.outputTokens !== undefined && this.costTable) { + if ( + options?.model && + options?.inputTokens !== undefined && + options?.outputTokens !== undefined && + this.costTable + ) { const pricing = this.costTable[options.model]; if (pricing) { costMicros = Math.round( - (options.inputTokens * pricing.inputPer1k / 1000 + - options.outputTokens * pricing.outputPer1k / 1000) * 1_000_000 + ((options.inputTokens * pricing.inputPer1k) / 1000 + + (options.outputTokens * pricing.outputPer1k) / 1000) * + 1_000_000, ); } } @@ -649,7 +717,9 @@ export class SemanticCache { if (costMicros !== undefined && costMicros > 0) { hashFields['cost_micros'] = String(costMicros); } - if (options?.temperature !== undefined) hashFields['temperature'] = String(options.temperature); + if (options?.temperature !== undefined) { + hashFields['temperature'] = String(options.temperature); + } if (options?.topP !== undefined) hashFields['top_p'] = String(options.topP); if (options?.seed !== undefined) hashFields['seed'] = String(options.seed); @@ -663,11 +733,18 @@ export class SemanticCache { if (ttl !== undefined) await this.client.expire(entryKey, ttl); span.setAttributes({ - 'cache.name': this.name, 'cache.key': entryKey, 'cache.ttl': ttl ?? -1, - 'cache.category': category || 'none', 'cache.model': model || 'none', - 'embedding_latency_ms': embedSec * 1000, + 'cache.name': this.name, + 'cache.key': entryKey, + 'cache.ttl': ttl ?? -1, + 'cache.category': category || 'none', + 'cache.model': model || 'none', + embedding_latency_ms: embedSec * 1000, }); + if (costMicros !== undefined && costMicros >= 0) { + await this.applyCostToPendingMiss(promptText, costMicros); + } + return entryKey; }); } @@ -722,19 +799,27 @@ export class SemanticCache { const binaryFilter = binaryRefs.length > 0 && this._hasBinaryRefs - ? (binaryRefs.length === 1 - ? `@binary_refs:{${escapeTag(binaryRefs[0])}}` - : binaryRefs.map((r) => `@binary_refs:{${escapeTag(r)}}`).join(' ')) + ? binaryRefs.length === 1 + ? `@binary_refs:{${escapeTag(binaryRefs[0])}}` + : binaryRefs.map((r) => `@binary_refs:{${escapeTag(r)}}`).join(' ') : null; const combinedFilter = [userFilter, binaryFilter].filter(Boolean).join(' '); const filterExpr = combinedFilter ? `(${combinedFilter})` : '*'; const query = `${filterExpr}=>[KNN ${k} @embedding $vec AS __score]`; pipeline.call( - 'FT.SEARCH', this.indexName, query, - 'PARAMS', '2', 'vec', encodeFloat32(embedding), - 'LIMIT', '0', String(k), - 'DIALECT', '2', + 'FT.SEARCH', + this.indexName, + query, + 'PARAMS', + '2', + 'vec', + encodeFloat32(embedding), + 'LIMIT', + '0', + String(k), + 'DIALECT', + '2', ); } @@ -752,7 +837,8 @@ export class SemanticCache { if (err) { await this.recordStat('misses'); this.telemetry.metrics.requestsTotal - .labels({ cache_name: this.name, result: 'miss', category: categoryLabel }).inc(); + .labels({ cache_name: this.name, result: 'miss', category: categoryLabel }) + .inc(); results.push({ hit: false, confidence: 'miss' as const }); continue; } @@ -762,7 +848,8 @@ export class SemanticCache { if (parsed.length === 0) { await this.recordStat('misses'); this.telemetry.metrics.requestsTotal - .labels({ cache_name: this.name, result: 'miss', category: categoryLabel }).inc(); + .labels({ cache_name: this.name, result: 'miss', category: categoryLabel }) + .inc(); results.push({ hit: false, confidence: 'miss' as const }); continue; } @@ -772,11 +859,13 @@ export class SemanticCache { if (isNaN(score) || score > threshold) { if (!isNaN(score)) { - await this.recordSimilarityWindow(score, 'miss', category); + const missMember = await this.recordSimilarityWindow(score, 'miss', category, null); + await this.recordMissPending(resolved[i].text, missMember); } await this.recordStat('misses'); this.telemetry.metrics.requestsTotal - .labels({ cache_name: this.name, result: 'miss', category: categoryLabel }).inc(); + .labels({ cache_name: this.name, result: 'miss', category: categoryLabel }) + .inc(); const result: CacheCheckResult = { hit: false, confidence: 'miss' as const }; if (!isNaN(score)) { result.similarity = score; @@ -786,13 +875,15 @@ export class SemanticCache { continue; } - await this.recordSimilarityWindow(score, 'hit', category); + const hitCostMicros = parseHitCostMicros(parsed[0].fields['cost_micros']); + await this.recordSimilarityWindow(score, 'hit', category, hitCostMicros); const confidence: CacheConfidence = score >= threshold - this.uncertaintyBand ? 'uncertain' : 'high'; await this.recordStat('hits'); const metricResult = confidence === 'uncertain' ? 'uncertain_hit' : 'hit'; this.telemetry.metrics.requestsTotal - .labels({ cache_name: this.name, result: metricResult, category: categoryLabel }).inc(); + .labels({ cache_name: this.name, result: metricResult, category: categoryLabel }) + .inc(); const matchedKey = parsed[0].key; if (this.defaultTtl !== undefined && matchedKey) { @@ -800,26 +891,30 @@ export class SemanticCache { } let costSaved: number | undefined; - const costMicrosStr = parsed[0].fields['cost_micros']; - if (costMicrosStr) { - const costMicros = parseInt(costMicrosStr, 10); - if (!isNaN(costMicros) && costMicros > 0) { - costSaved = costMicros / 1_000_000; - await this.client.hincrby(this.statsKey, 'cost_saved_micros', costMicros); - this.telemetry.metrics.costSavedTotal - .labels({ cache_name: this.name, category: categoryLabel }).inc(costSaved); - } + if (hitCostMicros !== null) { + costSaved = hitCostMicros / 1_000_000; + await this.client.hincrby(this.statsKey, 'cost_saved_micros', hitCostMicros); + this.telemetry.metrics.costSavedTotal + .labels({ cache_name: this.name, category: categoryLabel }) + .inc(costSaved); } let contentBlocks: import('./utils').ContentBlock[] | undefined; const contentBlocksStr = parsed[0].fields['content_blocks']; if (contentBlocksStr) { - try { contentBlocks = JSON.parse(contentBlocksStr); } catch { /* ignore */ } + try { + contentBlocks = JSON.parse(contentBlocksStr); + } catch { + /* ignore */ + } } const result: CacheCheckResult = { - hit: true, response: parsed[0].fields['response'], - similarity: score, confidence, matchedKey, + hit: true, + response: parsed[0].fields['response'], + similarity: score, + confidence, + matchedKey, }; if (costSaved !== undefined) result.costSaved = costSaved; if (contentBlocks) result.contentBlocks = contentBlocks; @@ -844,10 +939,16 @@ export class SemanticCache { let rawResult: unknown; try { rawResult = await this.client.call( - 'FT.SEARCH', this.indexName, filter, - 'RETURN', '0', - 'LIMIT', '0', String(INVALIDATE_BATCH_SIZE), - 'DIALECT', '2', + 'FT.SEARCH', + this.indexName, + filter, + 'RETURN', + '0', + 'LIMIT', + '0', + String(INVALIDATE_BATCH_SIZE), + 'DIALECT', + '2', ); } catch (err) { throw new ValkeyCommandError('FT.SEARCH', err); @@ -856,8 +957,10 @@ export class SemanticCache { const parsed = parseFtSearchResponse(rawResult); if (parsed.length === 0) { span.setAttributes({ - 'cache.name': this.name, 'cache.filter': filter, - 'cache.deleted_count': 0, 'cache.truncated': false, + 'cache.name': this.name, + 'cache.filter': filter, + 'cache.deleted_count': 0, + 'cache.truncated': false, }); return { deleted: 0, truncated: false }; } @@ -871,8 +974,10 @@ export class SemanticCache { } span.setAttributes({ - 'cache.name': this.name, 'cache.filter': filter, - 'cache.deleted_count': keys.length, 'cache.truncated': truncated, + 'cache.name': this.name, + 'cache.filter': filter, + 'cache.deleted_count': keys.length, + 'cache.truncated': truncated, }); return { deleted: keys.length, truncated }; }); @@ -942,9 +1047,10 @@ export class SemanticCache { const minSamples = options?.minSamples ?? 100; const category = options?.category; - const threshold = category && this.categoryThresholds[category] !== undefined - ? this.categoryThresholds[category] - : this.defaultThreshold; + const threshold = + category && this.categoryThresholds[category] !== undefined + ? this.categoryThresholds[category] + : this.defaultThreshold; // Read all window entries let rawEntries: string[]; @@ -967,7 +1073,9 @@ export class SemanticCache { entries.push(entry); } } - } catch { /* skip corrupt entries */ } + } catch { + /* skip corrupt entries */ + } } const sampleCount = entries.length; @@ -1067,14 +1175,18 @@ export class SemanticCache { try { const entry = JSON.parse(raw); if (entry.category) categories.add(entry.category); - } catch { /* skip */ } + } catch { + /* skip */ + } } const results = await Promise.all([ this.thresholdEffectiveness({ minSamples: options?.minSamples }), - ...[...categories].filter(Boolean).map((cat) => - this.thresholdEffectiveness({ category: cat, minSamples: options?.minSamples }) - ), + ...[...categories] + .filter(Boolean) + .map((cat) => + this.thresholdEffectiveness({ category: cat, minSamples: options?.minSamples }), + ), ]); return results; @@ -1132,7 +1244,9 @@ export class SemanticCache { // -- Internal helpers exposed to package adapters -- /** @internal Default similarity threshold. */ - get _defaultThreshold(): number { return this.defaultThreshold; } + get _defaultThreshold(): number { + return this.defaultThreshold; + } /** @internal Test-only getter. */ get _categoryThresholds(): Readonly> { @@ -1151,10 +1265,17 @@ export class SemanticCache { */ async _searchEntries(filterExpr: string, limit: number, offset: number): Promise { return this.client.call( - 'FT.SEARCH', this.indexName, filterExpr, - 'SORTBY', 'inserted_at', 'ASC', - 'LIMIT', String(offset), String(limit), - 'DIALECT', '2', + 'FT.SEARCH', + this.indexName, + filterExpr, + 'SORTBY', + 'inserted_at', + 'ASC', + 'LIMIT', + String(offset), + String(limit), + 'DIALECT', + '2', ); } @@ -1177,15 +1298,11 @@ export class SemanticCache { this.refreshConfig() .then((ok) => { if (!ok) { - this.telemetry.metrics.configRefreshFailed - .labels({ cache_name: this.name }) - .inc(); + this.telemetry.metrics.configRefreshFailed.labels({ cache_name: this.name }).inc(); } }) .catch(() => { - this.telemetry.metrics.configRefreshFailed - .labels({ cache_name: this.name }) - .inc(); + this.telemetry.metrics.configRefreshFailed.labels({ cache_name: this.name }).inc(); }); }; @@ -1245,9 +1362,7 @@ export class SemanticCache { metadata, heartbeatIntervalMs: this.discoveryOptions.heartbeatIntervalMs, onWriteFailed: () => { - this.telemetry.metrics.discoveryWriteFailed - .labels({ cache_name: this.name }) - .inc(); + this.telemetry.metrics.discoveryWriteFailed.labels({ cache_name: this.name }).inc(); }, }); await manager.register(); @@ -1259,7 +1374,10 @@ export class SemanticCache { this.analyticsInitiated = true; try { const a = await createAnalytics(this.analyticsOpts); - if (this.shutdownCalled) { await a.shutdown(); return; } + if (this.shutdownCalled) { + await a.shutdown(); + return; + } this.analytics = a; await a.init(this.client, this.name, { defaultThreshold: this.defaultThreshold, @@ -1315,20 +1433,45 @@ export class SemanticCache { const dim = (await this.embed('probe')).vector.length; try { await this.client.call( - 'FT.CREATE', this.indexName, 'ON', 'HASH', - 'PREFIX', '1', this.entryPrefix, + 'FT.CREATE', + this.indexName, + 'ON', + 'HASH', + 'PREFIX', + '1', + this.entryPrefix, 'SCHEMA', - 'prompt', 'TEXT', 'NOSTEM', - 'response', 'TEXT', 'NOSTEM', - 'model', 'TAG', - 'category', 'TAG', - 'binary_refs', 'TAG', - 'inserted_at', 'NUMERIC', 'SORTABLE', - 'temperature', 'NUMERIC', - 'top_p', 'NUMERIC', - 'seed', 'NUMERIC', - 'embedding', 'VECTOR', 'HNSW', '6', - 'TYPE', 'FLOAT32', 'DIM', String(dim), 'DISTANCE_METRIC', 'COSINE', + 'prompt', + 'TEXT', + 'NOSTEM', + 'response', + 'TEXT', + 'NOSTEM', + 'model', + 'TAG', + 'category', + 'TAG', + 'binary_refs', + 'TAG', + 'inserted_at', + 'NUMERIC', + 'SORTABLE', + 'temperature', + 'NUMERIC', + 'top_p', + 'NUMERIC', + 'seed', + 'NUMERIC', + 'embedding', + 'VECTOR', + 'HNSW', + '6', + 'TYPE', + 'FLOAT32', + 'DIM', + String(dim), + 'DISTANCE_METRIC', + 'COSINE', ); } catch (err) { throw new ValkeyCommandError('FT.CREATE', err); @@ -1356,9 +1499,7 @@ export class SemanticCache { } /** Resolve a prompt (string or ContentBlock[]) into text + binary refs. */ - private resolvePrompt( - prompt: string | ContentBlock[], - ): { text: string; binaryRefs: string[] } { + private resolvePrompt(prompt: string | ContentBlock[]): { text: string; binaryRefs: string[] } { if (typeof prompt === 'string') { return { text: prompt, binaryRefs: [] }; } @@ -1377,7 +1518,8 @@ export class SemanticCache { const cached = await this.client.getBuffer(embedKey); if (cached) { this.telemetry.metrics.embeddingCacheTotal - .labels({ cache_name: this.name, result: 'hit' }).inc(); + .labels({ cache_name: this.name, result: 'hit' }) + .inc(); // Decode Float32 buffer const vector: number[] = []; for (let i = 0; i < cached.length; i += 4) { @@ -1385,9 +1527,12 @@ export class SemanticCache { } return { vector, durationSec: 0 }; } - } catch { /* ignore cache read errors */ } + } catch { + /* ignore cache read errors */ + } this.telemetry.metrics.embeddingCacheTotal - .labels({ cache_name: this.name, result: 'miss' }).inc(); + .labels({ cache_name: this.name, result: 'miss' }) + .inc(); } const start = performance.now(); @@ -1398,9 +1543,7 @@ export class SemanticCache { throw new EmbeddingError(`embedFn failed: ${errMsg(err)}`, err); } const durationSec = (performance.now() - start) / 1000; - this.telemetry.metrics.embeddingDuration - .labels({ cache_name: this.name }) - .observe(durationSec); + this.telemetry.metrics.embeddingDuration.labels({ cache_name: this.name }).observe(durationSec); // Store in embedding cache if (this.embeddingCacheEnabled && text) { @@ -1409,7 +1552,9 @@ export class SemanticCache { try { const buf = encodeFloat32(vector); await this.client.set(embedKey, buf, 'EX', this.embeddingCacheTtl); - } catch { /* ignore cache write errors */ } + } catch { + /* ignore cache write errors */ + } } return { vector, durationSec }; @@ -1453,21 +1598,110 @@ export class SemanticCache { score: number, result: 'hit' | 'miss', category: string, - ): Promise { + costSavedMicros: number | null, + ): Promise { const now = Date.now(); - // Include a unique nonce so identical (score, result, category) tuples are - // each recorded as distinct ZADD members instead of overwriting each other. - const member = JSON.stringify({ score, result, category, _n: Math.random() }); + const member = JSON.stringify({ + score, + result, + category, + _n: Math.random(), + cost_saved_micros: costSavedMicros, + }); const sevenDaysAgo = now - 7 * 24 * 60 * 60 * 1000; try { const pipeline = this.client.pipeline(); pipeline.zadd(this.similarityWindowKey, now, member); - // Trim by time: remove entries older than 7 days pipeline.zremrangebyscore(this.similarityWindowKey, '-inf', sevenDaysAgo); - // Trim by count: keep at most 10,000 most recent pipeline.zremrangebyrank(this.similarityWindowKey, 0, -10001); await pipeline.exec(); - } catch { /* best effort - never fail on window writes */ } + } catch { + /* best effort - never fail on window writes */ + } + return member; + } + + /** + * Track a miss so a subsequent store() can backfill its cost into the + * similarity-window record. Bounded by a 5-minute TTL on the bookkeeping + * zset — entries beyond that are pruned on every record and backfill. + */ + private async recordMissPending(prompt: string, similarityMember: string): Promise { + const correlationId = correlationIdFor(prompt); + const now = Date.now(); + const fiveMinutesAgo = now - 5 * 60 * 1000; + const entry = JSON.stringify({ correlationId, similarityMember }); + try { + await this.client.zadd(this.missPendingKey, now, entry); + await this.client.zremrangebyscore(this.missPendingKey, '-inf', `(${fiveMinutesAgo}`); + } catch { + /* best effort */ + } + } + + /** + * After a successful store(), find the oldest pending miss for the same + * query and update its similarity-window record with the now-known cost. + * Best-effort — silently no-op if no pending miss exists or the bookkeeping + * entry has already been pruned. + */ + private async applyCostToPendingMiss(prompt: string, costMicros: number): Promise { + const correlationId = correlationIdFor(prompt); + const fiveMinutesAgo = Date.now() - 5 * 60 * 1000; + try { + await this.client.zremrangebyscore(this.missPendingKey, '-inf', `(${fiveMinutesAgo}`); + + const raw = (await this.client.zrange( + this.missPendingKey, + '0', + '-1', + 'WITHSCORES', + )) as Array; + let matchedEntry: string | null = null; + let matchedSimilarityMember: string | null = null; + for (let i = 0; i < raw.length; i += 2) { + const entryStr = raw[i]; + try { + const parsed = JSON.parse(entryStr) as { + correlationId: string; + similarityMember: string; + }; + if (parsed.correlationId === correlationId) { + matchedEntry = entryStr; + matchedSimilarityMember = parsed.similarityMember; + break; + } + } catch { + /* skip malformed */ + } + } + if (matchedEntry === null || matchedSimilarityMember === null) { + return; + } + + const rawScore = await this.client.zscore(this.similarityWindowKey, matchedSimilarityMember); + if (rawScore === null) { + await this.client.zrem(this.missPendingKey, matchedEntry); + return; + } + const similarityScore = Number(rawScore); + if (!Number.isFinite(similarityScore)) { + await this.client.zrem(this.missPendingKey, matchedEntry); + return; + } + + const parsedMember = JSON.parse(matchedSimilarityMember) as Record; + parsedMember.cost_saved_micros = costMicros; + const updatedMember = JSON.stringify(parsedMember); + + const updatePipeline = this.client.pipeline(); + updatePipeline.zrem(this.similarityWindowKey, matchedSimilarityMember); + updatePipeline.zadd(this.similarityWindowKey, similarityScore, updatedMember); + updatePipeline.zrem(this.missPendingKey, matchedEntry); + await updatePipeline.exec(); + } catch { + /* never fail store() because of bookkeeping */ + } } private assertInitialized(method: string): void { @@ -1495,7 +1729,6 @@ export class SemanticCache { ); } - private parseDimensionFromInfo(info: unknown[]): number { for (let i = 0; i < info.length - 1; i += 2) { const key = String(info[i]); diff --git a/packages/semantic-cache/src/__tests__/cost-instrumentation.test.ts b/packages/semantic-cache/src/__tests__/cost-instrumentation.test.ts new file mode 100644 index 00000000..a93a1d54 --- /dev/null +++ b/packages/semantic-cache/src/__tests__/cost-instrumentation.test.ts @@ -0,0 +1,208 @@ +import { describe, it, expect } from 'vitest'; +import { SemanticCache } from '../SemanticCache'; + +class StubValkey { + zsets = new Map>(); + + async zadd(key: string, score: number, member: string): Promise { + const list = this.zsets.get(key) ?? []; + list.push({ score, member }); + this.zsets.set(key, list); + return 1; + } + + async zrange( + key: string, + _start: string | number, + _stop: string | number, + mode?: string, + ): Promise { + const list = (this.zsets.get(key) ?? []).slice().sort((a, b) => a.score - b.score); + if (mode === 'WITHSCORES') { + const out: string[] = []; + for (const e of list) { + out.push(e.member, String(e.score)); + } + return out; + } + return list.map((e) => e.member); + } + + async zrem(key: string, member: string): Promise { + const list = this.zsets.get(key) ?? []; + const next = list.filter((e) => e.member !== member); + this.zsets.set(key, next); + return list.length - next.length; + } + + async zscore(key: string, member: string): Promise { + const entry = (this.zsets.get(key) ?? []).find((e) => e.member === member); + return entry ? String(entry.score) : null; + } + + async zremrangebyscore(key: string, _min: string, maxRaw: string | number): Promise { + const maxStr = typeof maxRaw === 'string' ? maxRaw : String(maxRaw); + const max = maxStr.startsWith('(') ? Number(maxStr.slice(1)) : Number(maxStr); + const list = this.zsets.get(key) ?? []; + const next = list.filter((e) => e.score > max); + this.zsets.set(key, next); + return list.length - next.length; + } + + pipeline() { + const ops: Array<[string, unknown[]]> = []; + const p = { + zadd: (key: string, score: number, member: string) => { + ops.push(['zadd', [key, score, member]]); + return p; + }, + zrem: (key: string, member: string) => { + ops.push(['zrem', [key, member]]); + return p; + }, + zremrangebyscore: () => p, + zremrangebyrank: () => p, + } as Record; + p['exec'] = async () => { + for (const [op, args] of ops) { + if (op === 'zadd') { + const [key, score, member] = args as [string, number, string]; + const list = this.zsets.get(key) ?? []; + list.push({ score, member }); + this.zsets.set(key, list); + } + if (op === 'zrem') { + const [key, member] = args as [string, string]; + const list = this.zsets.get(key) ?? []; + this.zsets.set(key, list.filter((e) => e.member !== member)); + } + } + return []; + }; + return p as { + zadd: (k: string, s: number, m: string) => unknown; + zrem: (k: string, m: string) => unknown; + zremrangebyscore: (k: string, a: string, b: string | number) => unknown; + zremrangebyrank: (k: string, a: number, b: number) => unknown; + exec: () => Promise; + }; + } +} + +describe('cost instrumentation on similarity-window writes', () => { + it('records cost_saved_micros on hit', async () => { + const client = new StubValkey(); + const cache = Object.create(SemanticCache.prototype) as { + similarityWindowKey: string; + client: StubValkey; + recordSimilarityWindow: ( + score: number, + result: 'hit' | 'miss', + category: string, + costSavedMicros: number | null, + ) => Promise; + }; + cache.similarityWindowKey = 'test:__similarity_window'; + cache.client = client; + await cache.recordSimilarityWindow(0.08, 'hit', 'all', 1500); + + const entries = client.zsets.get('test:__similarity_window') ?? []; + expect(entries).toHaveLength(1); + const parsed = JSON.parse(entries[0].member); + expect(parsed.cost_saved_micros).toBe(1500); + expect(parsed.result).toBe('hit'); + }); + + it('records cost_saved_micros: null on miss', async () => { + const client = new StubValkey(); + const cache = Object.create(SemanticCache.prototype) as { + similarityWindowKey: string; + client: StubValkey; + recordSimilarityWindow: ( + score: number, + result: 'hit' | 'miss', + category: string, + costSavedMicros: number | null, + ) => Promise; + }; + cache.similarityWindowKey = 'test:__similarity_window'; + cache.client = client; + await cache.recordSimilarityWindow(0.15, 'miss', 'all', null); + + const entries = client.zsets.get('test:__similarity_window') ?? []; + expect(entries).toHaveLength(1); + const parsed = JSON.parse(entries[0].member); + expect(parsed.cost_saved_micros).toBeNull(); + expect(parsed.result).toBe('miss'); + }); + + it('writes a __miss_pending entry on miss and applies cost on subsequent store', async () => { + const client = new StubValkey(); + type CachePrivates = { + similarityWindowKey: string; + missPendingKey: string; + client: StubValkey; + recordSimilarityWindow: ( + score: number, + result: 'hit' | 'miss', + category: string, + cost: number | null, + ) => Promise; + recordMissPending: (prompt: string, member: string) => Promise; + applyCostToPendingMiss: (prompt: string, costMicros: number) => Promise; + }; + const cache = Object.create(SemanticCache.prototype) as CachePrivates; + cache.similarityWindowKey = 'test:__similarity_window'; + cache.missPendingKey = 'test:__miss_pending'; + cache.client = client; + + const member = await cache.recordSimilarityWindow(0.18, 'miss', 'all', null); + await cache.recordMissPending('what is the capital of France', member); + await cache.applyCostToPendingMiss('what is the capital of France', 2500); + + const entries = client.zsets.get('test:__similarity_window') ?? []; + expect(entries).toHaveLength(1); + const parsed = JSON.parse(entries[0].member); + expect(parsed.cost_saved_micros).toBe(2500); + expect(parsed.result).toBe('miss'); + }); + + it('recordMissPending prunes entries older than the 5-minute bound', async () => { + const client = new StubValkey(); + type CachePrivates = { + missPendingKey: string; + client: StubValkey; + recordMissPending: (prompt: string, member: string) => Promise; + }; + const cache = Object.create(SemanticCache.prototype) as CachePrivates; + cache.missPendingKey = 'test:__miss_pending'; + cache.client = client; + + const tenMinutesAgo = Date.now() - 10 * 60 * 1000; + await client.zadd('test:__miss_pending', tenMinutesAgo, '{"stale":true}'); + + await cache.recordMissPending('fresh query', 'member-1'); + + const entries = client.zsets.get('test:__miss_pending') ?? []; + expect(entries).toHaveLength(1); + expect(entries[0].member).not.toContain('stale'); + }); + + it('applyCostToPendingMiss is a no-op when no pending miss exists', async () => { + const client = new StubValkey(); + type CachePrivates = { + similarityWindowKey: string; + missPendingKey: string; + client: StubValkey; + applyCostToPendingMiss: (prompt: string, costMicros: number) => Promise; + }; + const cache = Object.create(SemanticCache.prototype) as CachePrivates; + cache.similarityWindowKey = 'test:__similarity_window'; + cache.missPendingKey = 'test:__miss_pending'; + cache.client = client; + + await expect( + cache.applyCostToPendingMiss('never-seen-before query', 500), + ).resolves.toBeUndefined(); + }); +}); diff --git a/proprietary/cache-proposals/__tests__/cache-readonly.service.spec.ts b/proprietary/cache-proposals/__tests__/cache-readonly.service.spec.ts index 7df164ed..e1e27c56 100644 --- a/proprietary/cache-proposals/__tests__/cache-readonly.service.spec.ts +++ b/proprietary/cache-proposals/__tests__/cache-readonly.service.spec.ts @@ -65,11 +65,22 @@ class StubRegistry { } } +class StubLicenseService { + private tier: 'community' | 'pro' | 'enterprise' = 'pro'; + setTier(t: 'community' | 'pro' | 'enterprise'): void { + this.tier = t; + } + getLicenseTier(): string { + return this.tier; + } +} + const buildService = async (): Promise<{ service: CacheReadonlyService; client: StubValkey; resolver: StubResolver; storage: MemoryAdapter; + license: StubLicenseService; }> => { const client = new StubValkey(); const resolver = new StubResolver(); @@ -78,15 +89,20 @@ const buildService = async (): Promise<{ const storage = new MemoryAdapter(); await storage.initialize(); const registry = new StubRegistry(client); + const license = new StubLicenseService(); const service = new CacheReadonlyService( registry as unknown as ConnectionRegistry, resolver as unknown as CacheResolverService, storage, + license as unknown as import('@proprietary/licenses').LicenseService, ); - return { service, client, resolver, storage }; + return { service, client, resolver, storage, license }; }; -const seedRegistry = (client: StubValkey, entries: Record): void => { +const seedRegistry = ( + client: StubValkey, + entries: Record, +): void => { client.hashes[REGISTRY_KEY] = {}; for (const [name, marker] of Object.entries(entries)) { client.hashes[REGISTRY_KEY][name] = JSON.stringify({ @@ -110,6 +126,30 @@ const seedSimilarityWindow = ( })); }; +const seedSamplesWithCost = ( + client: StubValkey, + prefix: string, + samples: Array<{ + score: number; + result: 'hit' | 'miss'; + category: string; + ts?: number; + cost?: number | null; + }>, +): void => { + const baseTs = Date.now(); + client.zsets[`${prefix}:__similarity_window`] = samples.map((s, i) => ({ + member: JSON.stringify({ + score: s.score, + result: s.result, + category: s.category, + _n: i, + cost_saved_micros: s.cost === undefined ? null : s.cost, + }), + score: s.ts ?? baseTs + i, + })); +}; + describe('CacheReadonlyService', () => { describe('listCaches', () => { it('returns both cache types with the right discriminator and live/stale status', async () => { @@ -414,6 +454,108 @@ describe('CacheReadonlyService', () => { expect(result.confidence_breakdown!.signal).toBeGreaterThan(0); }); + it('Enterprise: single expensive uncertain hit drives TIGHTEN where count-based misses it', async () => { + const { service, client, license } = await buildService(); + license.setTier('enterprise'); + const samples: Array<{ + score: number; + result: 'hit' | 'miss'; + category: string; + cost: number | null; + }> = []; + for (let i = 0; i < 99; i++) { + samples.push({ score: 0.02, result: 'hit', category: 'all', cost: 1000 }); + } + samples.push({ score: 0.08, result: 'hit', category: 'all', cost: 50_000_000 }); + for (let i = 0; i < 50; i++) { + samples.push({ score: 0.5, result: 'miss', category: 'all', cost: null }); + } + seedSamplesWithCost(client, SEMANTIC_NAME, samples); + const result = await service.thresholdRecommendation(CONNECTION_ID, SEMANTIC_NAME, { + minSamples: 50, + }); + expect(result.recommendation).toBe('tighten_threshold'); + expect(result.signal).toBe('uncertain_hits'); + expect(result.cost_weighted_uncertain_hit_rate).toBeGreaterThan(0.2); + expect(result.uncertain_hit_cost_usd).toBeCloseTo(50, 1); + }); + + it('Enterprise with no costed samples falls back to unweighted', async () => { + const { service, client, license } = await buildService(); + license.setTier('enterprise'); + // threshold=0.10, band=0.05: uncertain = score >= 0.05 + // proposed tighten step = 0.03 → new threshold = 0.07 + // hits at 0.05 are uncertain (0.05 >= 0.05) but survive tightening (0.05 <= 0.07) + // hits at 0.06 are uncertain and are lost (0.06 > 0.07 is false) — also survive + // uncertainHitRate = 1.0, uncertainFractionOfAll = 1.0 * (85/90) > 0.15 + const samples = [ + ...Array.from({ length: 85 }, (_, i) => ({ + score: 0.05 + (i % 2) * 0.005, + result: 'hit' as const, + category: 'all', + cost: null as number | null, + ts: Date.now() + i, + })), + ...Array.from({ length: 5 }, (_, i) => ({ + score: 0.2, + result: 'miss' as const, + category: 'all', + cost: null as number | null, + ts: Date.now() + 100 + i, + })), + ]; + seedSamplesWithCost(client, SEMANTIC_NAME, samples); + const result = await service.thresholdRecommendation(CONNECTION_ID, SEMANTIC_NAME, { + minSamples: 50, + }); + expect(result.recommendation).toBe('tighten_threshold'); + expect(result.cost_weighted_uncertain_hit_rate).toBeUndefined(); + expect(result.total_hit_cost_usd).toBeUndefined(); + }); + + it('Pro+ with costed samples still uses count-based decision', async () => { + const { service, client, license } = await buildService(); + license.setTier('pro'); + const samples: Array<{ + score: number; + result: 'hit' | 'miss'; + category: string; + cost: number | null; + ts: number; + }> = []; + for (let i = 0; i < 99; i++) { + samples.push({ + score: 0.02, + result: 'hit', + category: 'all', + cost: 1000, + ts: Date.now() + i, + }); + } + samples.push({ + score: 0.08, + result: 'hit', + category: 'all', + cost: 50_000_000, + ts: Date.now() + 100, + }); + for (let i = 0; i < 50; i++) { + samples.push({ + score: 0.5, + result: 'miss', + category: 'all', + cost: null, + ts: Date.now() + 200 + i, + }); + } + seedSamplesWithCost(client, SEMANTIC_NAME, samples); + const result = await service.thresholdRecommendation(CONNECTION_ID, SEMANTIC_NAME, { + minSamples: 50, + }); + expect(result.recommendation).toBe('optimal'); + expect(result.cost_weighted_uncertain_hit_rate).toBeUndefined(); + }); + it('populates non-zero confidence on distant_hits TIGHTEN', async () => { const { service, client } = await buildService(); // Tighten the uncertainty_band so distant != uncertain (the engine's @@ -461,14 +603,39 @@ describe('CacheReadonlyService', () => { expect(result.confidence_score).toBeGreaterThan(0); expect(result.confidence_breakdown!.signal).toBeGreaterThan(0); }); + + it('Enterprise: reasoning includes dollars when cost-weighting fired', async () => { + const { service, client, license } = await buildService(); + license.setTier('enterprise'); + const samples: Array<{ + score: number; + result: 'hit' | 'miss'; + category: string; + cost: number | null; + }> = []; + for (let i = 0; i < 99; i++) { + samples.push({ score: 0.02, result: 'hit', category: 'all', cost: 1000 }); + } + samples.push({ score: 0.08, result: 'hit', category: 'all', cost: 50_000_000 }); + for (let i = 0; i < 50; i++) { + samples.push({ score: 0.5, result: 'miss', category: 'all', cost: null }); + } + seedSamplesWithCost(client, SEMANTIC_NAME, samples); + const result = await service.thresholdRecommendation(CONNECTION_ID, SEMANTIC_NAME, { + minSamples: 50, + }); + expect(result.recommendation).toBe('tighten_threshold'); + expect(result.reasoning).toMatch(/\$50\.00/); + expect(result.reasoning).toMatch(/saved cost/); + }); }); describe('toolEffectiveness', () => { it('errors INVALID_CACHE_TYPE on semantic_cache', async () => { const { service } = await buildService(); - await expect( - service.toolEffectiveness(CONNECTION_ID, SEMANTIC_NAME), - ).rejects.toBeInstanceOf(InvalidCacheTypeError); + await expect(service.toolEffectiveness(CONNECTION_ID, SEMANTIC_NAME)).rejects.toBeInstanceOf( + InvalidCacheTypeError, + ); }); it('returns per-tool entries sorted by cost_saved_usd desc', async () => { diff --git a/proprietary/cache-proposals/cache-readonly.service.ts b/proprietary/cache-proposals/cache-readonly.service.ts index 57b9a0a0..94556be6 100644 --- a/proprietary/cache-proposals/cache-readonly.service.ts +++ b/proprietary/cache-proposals/cache-readonly.service.ts @@ -4,6 +4,7 @@ import type { CacheType, StoredCacheProposal } from '@betterdb/shared'; import { AGENT_CACHE, REGISTRY_KEY, SEMANTIC_CACHE, heartbeatKeyFor } from '@betterdb/shared'; import type { StoragePort } from '@app/common/interfaces/storage-port.interface'; import { ConnectionRegistry } from '@app/connections/connection-registry.service'; +import { LicenseService, Tier } from '@proprietary/licenses'; import { CacheResolverService, type ResolvedCache } from './cache-resolver.service'; import { CacheNotFoundError, InvalidCacheTypeError } from './errors'; import { readIntField } from '@app/common/utils/record-fields'; @@ -88,6 +89,7 @@ export class CacheReadonlyService { private readonly registry: ConnectionRegistry, private readonly resolver: CacheResolverService, @Inject('STORAGE_CLIENT') private readonly storage: StoragePort, + private readonly license: LicenseService, ) {} async listCaches(connectionId: string): Promise { @@ -242,6 +244,37 @@ export class CacheReadonlyService { ); const nearMissRate = misses.length === 0 ? 0 : nearMisses.length / misses.length; + const hitsCosted = hits.filter((s) => s.cost_saved_micros !== null); + const missesCosted = misses.filter((s) => s.cost_saved_micros !== null); + const uncertainHitsCost = uncertainHits.filter((s) => s.cost_saved_micros !== null); + const nearMissesCost = nearMisses.filter((s) => s.cost_saved_micros !== null); + + const sumCost = (xs: ReadonlyArray<{ cost_saved_micros: number | null }>): number => { + return xs.reduce((acc, s) => acc + (s.cost_saved_micros ?? 0), 0); + }; + + const totalHitCostMicros = sumCost(hitsCosted); + const uncertainHitCostMicros = sumCost(uncertainHitsCost); + const totalMissCostMicros = sumCost(missesCosted); + const nearMissCostMicros = sumCost(nearMissesCost); + + const haveCostedHits = hitsCosted.length >= minSamples; + const haveCostedMisses = missesCosted.length >= minSamples; + + const costWeightedUncertainHitRate = + totalHitCostMicros > 0 ? uncertainHitCostMicros / totalHitCostMicros : 0; + const costWeightedNearMissRate = + totalMissCostMicros > 0 ? nearMissCostMicros / totalMissCostMicros : 0; + + const isEnterprise = this.license.getLicenseTier() === Tier.enterprise; + const useCostWeightedTighten = isEnterprise && haveCostedHits; + const useCostWeightedLoosen = isEnterprise && haveCostedMisses; + + const effectiveUncertainHitRate = useCostWeightedTighten + ? costWeightedUncertainHitRate + : uncertainHitRate; + const effectiveNearMissRate = useCostWeightedLoosen ? costWeightedNearMissRate : nearMissRate; + const avgHitSimilarity = hits.length === 0 ? 0 : hits.reduce((acc, s) => acc + s.score, 0) / hits.length; const avgMissSimilarity = @@ -264,19 +297,37 @@ export class CacheReadonlyService { near_miss_rate: nearMissRate, }; - if (uncertainHitRate > 0.2) { + if (effectiveUncertainHitRate > 0.2) { // Many hits at the threshold boundary — but weigh against overall hit rate. // A 20% uncertain-hit rate with 70% overall hits means 14% of all ops are // uncertain — that's noise, not a strong signal. Only tighten when the // uncertain fraction of ALL operations (not just hits) is meaningful. - const uncertainFractionOfAll = uncertainHitRate * hitRate; + // For cost-weighted mode, the effective rate already captures cost importance. + const totalCostAcrossSides = totalHitCostMicros + totalMissCostMicros; + // Cost-weighted denominator is total cost across costed hits + costed + // misses, not "total operations". In early rollout (most miss costs still + // null pending the retroactive update from store()) this can be slightly + // more aggressive than the count-based form; it converges to the same + // ratio once miss costs are populated. + const uncertainFractionOfAll = useCostWeightedTighten + ? totalCostAcrossSides > 0 + ? uncertainHitCostMicros / totalCostAcrossSides + : 0 + : uncertainHitRate * hitRate; if (uncertainFractionOfAll > 0.15) { recommendation = THRESHOLD_RECOMMENDATIONS.TIGHTEN; signal = 'uncertain_hits'; - signalRate = uncertainHitRate; + signalRate = effectiveUncertainHitRate; const step = config.uncertainty_band * 0.6; recommendedThreshold = Math.max(0, threshold - step); - reasoning = THRESHOLD_REASONINGS.tighten(uncertainHitRate); + if (useCostWeightedTighten) { + reasoning = THRESHOLD_REASONINGS.tightenCost( + costWeightedUncertainHitRate, + uncertainHitCostMicros / 1_000_000, + ); + } else { + reasoning = THRESHOLD_REASONINGS.tighten(uncertainHitRate); + } } else { recommendation = THRESHOLD_RECOMMENDATIONS.OPTIMAL; reasoning = THRESHOLD_REASONINGS.optimal(hitRate, uncertainHitRate); @@ -298,19 +349,29 @@ export class CacheReadonlyService { const p75 = sortedHitScores[Math.floor(sortedHitScores.length * 0.75)]; const target = p75 + config.uncertainty_band * 0.3; const maxStep = config.uncertainty_band * 2; - recommendedThreshold = Math.min(threshold, Math.max(threshold - maxStep, Math.max(0, target))); + recommendedThreshold = Math.min( + threshold, + Math.max(threshold - maxStep, Math.max(0, target)), + ); reasoning = THRESHOLD_REASONINGS.tightenDistantHits(distantHitRate, hitRate); } else { recommendation = THRESHOLD_RECOMMENDATIONS.OPTIMAL; reasoning = THRESHOLD_REASONINGS.optimal(hitRate, uncertainHitRate); } - } else if (nearMissRate > 0.25) { + } else if (effectiveNearMissRate > 0.25) { // Many near-misses just above the threshold — probably too strict. recommendation = THRESHOLD_RECOMMENDATIONS.LOOSEN; signal = 'near_misses'; - signalRate = nearMissRate; + signalRate = effectiveNearMissRate; recommendedThreshold = threshold + avgNearMissDelta; - reasoning = THRESHOLD_REASONINGS.loosen(nearMissRate); + if (useCostWeightedLoosen) { + reasoning = THRESHOLD_REASONINGS.loosenCost( + costWeightedNearMissRate, + nearMissCostMicros / 1_000_000, + ); + } else { + reasoning = THRESHOLD_REASONINGS.loosen(nearMissRate); + } } else if (hitRate < 0.05 && misses.length >= 20) { // Very few hits with enough data — threshold may be too strict. // Check if there are misses close to the threshold that would become hits. @@ -344,7 +405,9 @@ export class CacheReadonlyService { recommendedThreshold !== undefined && hits.length > 0 ) { - const hitsLost = hits.filter((s) => s.score > recommendedThreshold! && s.score <= threshold).length; + const hitsLost = hits.filter( + (s) => s.score > recommendedThreshold! && s.score <= threshold, + ).length; const recallCost = hitsLost / hits.length; if (recallCost > RECALL_COST_MAX) { recommendation = THRESHOLD_RECOMMENDATIONS.OPTIMAL; @@ -425,6 +488,18 @@ export class CacheReadonlyService { ); } + const costFields: Partial = {}; + if (useCostWeightedTighten) { + costFields.cost_weighted_uncertain_hit_rate = costWeightedUncertainHitRate; + costFields.total_hit_cost_usd = totalHitCostMicros / 1_000_000; + costFields.uncertain_hit_cost_usd = uncertainHitCostMicros / 1_000_000; + } + if (useCostWeightedLoosen) { + costFields.cost_weighted_near_miss_rate = costWeightedNearMissRate; + costFields.total_miss_cost_usd = totalMissCostMicros / 1_000_000; + costFields.near_miss_cost_usd = nearMissCostMicros / 1_000_000; + } + return { category: categoryLabel, sample_count: sampleCount, @@ -443,6 +518,7 @@ export class CacheReadonlyService { consecutive_same_direction: consecutiveSameDirection, confidence_score, confidence_breakdown, + ...costFields, }; } @@ -596,7 +672,9 @@ export class CacheReadonlyService { const hits = readIntField(raw, 'hits') + readIntField(raw, 'llm:hits') + readIntField(raw, 'tool:hits'); const misses = - readIntField(raw, 'misses') + readIntField(raw, 'llm:misses') + readIntField(raw, 'tool:misses'); + readIntField(raw, 'misses') + + readIntField(raw, 'llm:misses') + + readIntField(raw, 'tool:misses'); const explicitTotal = readIntField(raw, 'total'); return { hits, misses, total: explicitTotal === 0 ? hits + misses : explicitTotal }; } @@ -605,7 +683,13 @@ export class CacheReadonlyService { client: Valkey, prefix: string, ): Promise< - Array<{ score: number; result: 'hit' | 'miss'; category: string; recordedAt: number }> + Array<{ + score: number; + result: 'hit' | 'miss'; + category: string; + recordedAt: number; + cost_saved_micros: number | null; + }> > { let raw: Array; try { @@ -623,6 +707,7 @@ export class CacheReadonlyService { result: 'hit' | 'miss'; category: string; recordedAt: number; + cost_saved_micros: number | null; }> = []; for (let i = 0; i < raw.length; i += 2) { const member = raw[i]; @@ -641,7 +726,14 @@ export class CacheReadonlyService { if (result !== 'hit' && result !== 'miss') { continue; } - out.push({ score, result, category, recordedAt }); + const rawCost = entry.cost_saved_micros; + let cost_saved_micros: number | null; + if (typeof rawCost === 'number' && Number.isFinite(rawCost) && rawCost >= 0) { + cost_saved_micros = rawCost; + } else { + cost_saved_micros = null; + } + out.push({ score, result, category, recordedAt, cost_saved_micros }); } catch { // ignore malformed entries } @@ -674,7 +766,13 @@ export class CacheReadonlyService { private computeDampening( history: TuningHistoryEntry[], currentDirection: 'tighten' | 'loosen', - ): { factor: number; consecutiveSameDirection: number; directionFlips: number; capped: boolean; reason?: string } { + ): { + factor: number; + consecutiveSameDirection: number; + directionFlips: number; + capped: boolean; + reason?: string; + } { if (history.length === 0) { return { factor: 1, consecutiveSameDirection: 0, directionFlips: 0, capped: false }; } diff --git a/proprietary/cache-proposals/cache-readonly.types.ts b/proprietary/cache-proposals/cache-readonly.types.ts index e7751029..0cbec87a 100644 --- a/proprietary/cache-proposals/cache-readonly.types.ts +++ b/proprietary/cache-proposals/cache-readonly.types.ts @@ -65,6 +65,10 @@ export const THRESHOLD_REASONINGS = { `${formatPct(nearMissRate)} of misses are very close to the threshold — consider loosening.`, loosenLowHitRate: (hitRate: number): string => `Hit rate is only ${formatPct(hitRate)} with near-misses just above the threshold — consider loosening.`, + tightenCost: (rate: number, atRiskUsd: number): string => + `${formatPct(rate)} of saved cost ($${atRiskUsd.toFixed(2)}) on uncertain hits — tighten the threshold.`, + loosenCost: (rate: number, missedUsd: number): string => + `${formatPct(rate)} of missed savings ($${missedUsd.toFixed(2)}) on near-misses — consider loosening.`, optimal: (hitRate: number, uncertainHitRate: number): string => `Hit rate ${formatPct(hitRate)} with ${formatPct(uncertainHitRate)} uncertain hits — threshold appears well-calibrated.`, recallCostTooHigh: (recallCost: number): string => @@ -108,6 +112,12 @@ export interface ThresholdRecommendation { consecutive_same_direction?: number; confidence_score: number | null; confidence_breakdown: ThresholdRecommendationConfidenceBreakdown | null; + cost_weighted_uncertain_hit_rate?: number; + cost_weighted_near_miss_rate?: number; + total_hit_cost_usd?: number; + uncertain_hit_cost_usd?: number; + total_miss_cost_usd?: number; + near_miss_cost_usd?: number; } export interface TuningHistoryEntry {