diff --git a/packages/semantic-cache/CHANGELOG.md b/packages/semantic-cache/CHANGELOG.md index a8e76a56..b286b717 100644 --- a/packages/semantic-cache/CHANGELOG.md +++ b/packages/semantic-cache/CHANGELOG.md @@ -9,6 +9,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- **Per-entry hit analytics** — every entry now tracks `hit_count` and + `last_accessed_at`, incremented atomically on each cache hit (batched into the + existing TTL-refresh pipeline — no extra round trip). New `entryAnalytics()` + method reports total / never-hit / cold entry counts and the hottest entries. + When the index includes usage fields, counts use server-side `FT.SEARCH` with + `LIMIT 0 0` (exact, no materialization); `topEntries` uses `SORTBY hit_count + DESC` with `LIMIT 0 topN`. Older indexes fall back to `SCAN` + pipelined + `HMGET` (5 fields only) over a sample of up to 10,000 entries. New + `entry_analytics` capability on the discovery marker. `EntryAnalyticsOptions`, + `EntryAnalyticsResult`, and `EntrySummary` exported from the package root. - **LLM-as-judge for borderline hits** — `CacheCheckOptions.judge` accepts a `judgeFn` that adjudicates hits whose cosine distance lands in the uncertainty band (`threshold - uncertaintyBand < score <= threshold`). The judge promotes accepted hits to `confidence: 'high'` and demotes rejected hits to a miss with `nearestMiss` populated. Configurable `timeoutMs` (default 2000) and `onError` (default `'accept'`, fail-open). Direct response to user feedback that single-threshold matching produced too many uncertain borderline returns on chat.betterdb.com. - New Prometheus metrics `{prefix}_judge_decisions_total{decision}` and `{prefix}_judge_duration_seconds{decision}` with decision labels `accept | reject | error_accept | error_reject | timeout_accept | timeout_reject`. - New OTel span attributes `cache.judge.invoked`, `cache.judge.decision`, `cache.judge.latency_ms`. @@ -16,6 +26,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- Cache hits now incur one Valkey round trip for usage tracking even when + `defaultTtl` is not configured. Previously a hit was a pure read. +- The FT index schema gains `hit_count NUMERIC SORTABLE` and `last_accessed_at + NUMERIC SORTABLE`. Existing indexes keep working; run `flush()` + + `initialize()` to rebuild the schema and enable the fast analytics path. + `HINCRBY` auto-creates the counter, so pre-existing entries begin tracking + correctly on their first hit after upgrade with no migration. - `nearestMiss.deltaToThreshold` may be `<= 0` when a miss originates from a judge rejection (the score did clear the threshold but the judge said no). Existing miss paths still produce `> 0`. Documented on the type. - `checkBatch()` throws `SemanticCacheUsageError` when `judge` is supplied, matching the existing handling of `rerank` and `staleAfterModelChange`. diff --git a/packages/semantic-cache/README.md b/packages/semantic-cache/README.md index 63a3c2a5..2a3fcbe1 100644 --- a/packages/semantic-cache/README.md +++ b/packages/semantic-cache/README.md @@ -291,6 +291,29 @@ Stops the analytics client, cancels the stats snapshot timer, and disposes the d Graceful shutdown of the discovery layer for in-process caches without destroying data. Stops the discovery heartbeat and deletes the heartbeat key; does not touch the index or entries. +### `cache.entryAnalytics(options?)` + +Per-entry usage analytics: how many stored entries have ever been returned as a hit, which entries are hottest, and how many are cold (never hit, or not accessed within `coldAfterDays`). Use this to find dead-weight entries when sizing TTLs and keeping the HNSW index lean. + +```typescript +const a = await cache.entryAnalytics({ topN: 5, coldAfterDays: 14 }); +console.log(`${a.neverHitCount} of ${a.totalEntries} entries have never been hit`); +console.log('hottest:', a.topEntries.map((e) => `${e.key} (${e.hitCount})`)); +``` + +Returns `totalEntries`, `neverHitCount`, `hitAtLeastOnceCount`, `coldEntryCount`, +`topEntries` (sorted by `hitCount` descending, capped at `topN`), and the +`coldAfterDays` value applied. + +When the FT index was created with the `hit_count` / `last_accessed_at` sortable +fields, counts use server-side `FT.SEARCH` with `LIMIT 0 0` (exact, no +materialization) and `topEntries` uses `SORTBY hit_count DESC` with `LIMIT 0 topN`. + +Older indexes fall back to `SCAN` + pipelined `HMGET` (fetching only the 5 +analytics fields) over a sample of up to 10,000 entries in implementation-defined +order (`totalEntries` is the sample size). Run `flush()` + `initialize()` to +rebuild the index and enable exact counts. + ### `cache.thresholdEffectiveness(options?)` Analyzes the rolling similarity score window (last 10,000 entries, up to 7 days) and returns: diff --git a/packages/semantic-cache/src/SemanticCache.ts b/packages/semantic-cache/src/SemanticCache.ts index 279f03c3..e8f29fd8 100644 --- a/packages/semantic-cache/src/SemanticCache.ts +++ b/packages/semantic-cache/src/SemanticCache.ts @@ -14,6 +14,9 @@ import type { EmbedFn, ModelCost, ConfigRefreshOptions, + EntryAnalyticsOptions, + EntryAnalyticsResult, + EntrySummary, } from './types'; import { SemanticCacheUsageError, @@ -40,6 +43,7 @@ import { } from './discovery'; const INVALIDATE_BATCH_SIZE = 1000; +const ENTRY_ANALYTICS_LIMIT = 10000; const PACKAGE_VERSION = (require('../package.json') as { version: string }).version; @@ -75,6 +79,7 @@ export class SemanticCache { private _initialized = false; private _dimension = 0; private _hasBinaryRefs = false; + private _hasUsageFields = false; private _initPromise: Promise | null = null; private _initGeneration = 0; @@ -479,8 +484,8 @@ export class SemanticCache { this.telemetry.metrics.requestsTotal .labels({ cache_name: this.name, result: metricResult, category: categoryLabel }).inc(); - if (this.defaultTtl !== undefined && matchedKey) { - await this.client.expire(matchedKey, this.defaultTtl); + if (matchedKey) { + await this.recordEntryUsage(matchedKey); } // Cost saved @@ -557,6 +562,8 @@ export class SemanticCache { model, category, inserted_at: Date.now().toString(), + hit_count: '0', + last_accessed_at: '0', metadata: JSON.stringify(options?.metadata ?? {}), embedding: encodeFloat32(embedding), }; @@ -638,6 +645,8 @@ export class SemanticCache { model, category, inserted_at: Date.now().toString(), + hit_count: '0', + last_accessed_at: '0', metadata: JSON.stringify(options?.metadata ?? {}), embedding: encodeFloat32(embedding), content_blocks: JSON.stringify(blocks), @@ -743,6 +752,7 @@ export class SemanticCache { const results: CacheCheckResult[] = []; const categoryLabel = category || 'none'; + const hitKeys: string[] = []; for (let i = 0; i < prompts.length; i++) { const pipelineEntry = pipelineResults?.[i]; @@ -795,8 +805,8 @@ export class SemanticCache { .labels({ cache_name: this.name, result: metricResult, category: categoryLabel }).inc(); const matchedKey = parsed[0].key; - if (this.defaultTtl !== undefined && matchedKey) { - await this.client.expire(matchedKey, this.defaultTtl); + if (matchedKey) { + hitKeys.push(matchedKey); } let costSaved: number | undefined; @@ -826,6 +836,8 @@ export class SemanticCache { results.push(result); } + await this.recordEntryUsageBatch(hitKeys); + return results; }); } @@ -1080,6 +1092,69 @@ export class SemanticCache { return results; } + /** + * Per-entry usage analytics: how many entries have ever been hit, which are + * hottest, and how many are cold (never hit, or not accessed recently). + * + * When the FT index includes the hit_count / last_accessed_at sortable + * fields (created at or after the version that introduced them), counts use + * server-side FT.SEARCH with LIMIT 0 0 (no materialization); top entries use + * SORTBY hit_count DESC with LIMIT 0 topN. For an older index it falls back + * to a SCAN + HGETALL sweep — correct but slower. Run flush() + + * initialize() to rebuild the index and enable the fast path. + * + * On a legacy index (created before this version), stats reflect a sample of + * up to 10,000 entries in implementation-defined scan order — totalEntries is + * the sample size, not the absolute entry count. Run flush() + initialize() + * to rebuild the index and get exact server-side counts. + */ + async entryAnalytics( + options?: EntryAnalyticsOptions, + ): Promise { + this.assertInitialized('entryAnalytics'); + return this.traced('entryAnalytics', async (span) => { + const topN = options?.topN ?? 10; + const coldAfterDays = options?.coldAfterDays ?? 7; + const coldCutoff = Date.now() - coldAfterDays * 24 * 60 * 60 * 1000; + + let totalEntries: number; + let neverHitCount: number; + let coldEntryCount: number; + let topEntries: EntrySummary[]; + + if (this._hasUsageFields) { + try { + ({ totalEntries, neverHitCount, coldEntryCount, topEntries } = + await this.collectAnalyticsViaSearch(coldCutoff, topN)); + } catch { + ({ totalEntries, neverHitCount, coldEntryCount, topEntries } = + await this.collectAnalyticsViaScan(coldCutoff, topN)); + } + } else { + ({ totalEntries, neverHitCount, coldEntryCount, topEntries } = + await this.collectAnalyticsViaScan(coldCutoff, topN)); + } + + const hitAtLeastOnceCount = Math.max(0, totalEntries - neverHitCount); + + span.setAttributes({ + 'cache.name': this.name, + 'cache.entry_total': totalEntries, + 'cache.entry_never_hit': neverHitCount, + 'cache.entry_cold': coldEntryCount, + }); + + return { + totalEntries, + neverHitCount, + hitAtLeastOnceCount, + coldEntryCount, + topEntries, + coldAfterDays, + }; + }); + } + /** * Refresh threshold config from Valkey. Returns true on a successful HGETALL, * false if the call threw. @@ -1202,12 +1277,13 @@ export class SemanticCache { private async _doInitialize(): Promise { const gen = this._initGeneration; return this.traced('initialize', async () => { - const { dim, hasBinaryRefs } = await this.ensureIndexAndGetDimension(); + const { dim, hasBinaryRefs, hasUsageFields } = await this.ensureIndexAndGetDimension(); if (this._initGeneration !== gen) { return; } this._dimension = dim; this._hasBinaryRefs = hasBinaryRefs; + this._hasUsageFields = hasUsageFields; // registerDiscovery() may throw SemanticCacheUsageError on a name // collision. Mark the cache initialized only after discovery succeeds // so a colliding caller cannot subsequently call check()/store() @@ -1294,16 +1370,21 @@ export class SemanticCache { .catch(() => {}); } - private async ensureIndexAndGetDimension(): Promise<{ dim: number; hasBinaryRefs: boolean }> { + private async ensureIndexAndGetDimension(): Promise<{ + dim: number; + hasBinaryRefs: boolean; + hasUsageFields: boolean; + }> { // Try reading an existing index try { const info = (await this.client.call('FT.INFO', this.indexName)) as unknown[]; const dim = this.parseDimensionFromInfo(info); - const hasBinaryRefs = this.parseHasBinaryRefsFromInfo(info); - if (dim > 0) return { dim, hasBinaryRefs }; + const hasBinaryRefs = this.parseHasFieldFromInfo(info, 'binary_refs'); + const hasUsageFields = this.parseHasFieldFromInfo(info, 'hit_count'); + if (dim > 0) return { dim, hasBinaryRefs, hasUsageFields }; // Couldn't parse dimension from FT.INFO - fall back to probe const probeDim = (await this.embed('probe')).vector.length; - return { dim: probeDim, hasBinaryRefs }; + return { dim: probeDim, hasBinaryRefs, hasUsageFields }; } catch (err) { if (err instanceof EmbeddingError) throw err; if (!this.isIndexNotFoundError(err)) { @@ -1324,6 +1405,8 @@ export class SemanticCache { 'category', 'TAG', 'binary_refs', 'TAG', 'inserted_at', 'NUMERIC', 'SORTABLE', + 'hit_count', 'NUMERIC', 'SORTABLE', + 'last_accessed_at', 'NUMERIC', 'SORTABLE', 'temperature', 'NUMERIC', 'top_p', 'NUMERIC', 'seed', 'NUMERIC', @@ -1333,11 +1416,11 @@ export class SemanticCache { } catch (err) { throw new ValkeyCommandError('FT.CREATE', err); } - return { dim, hasBinaryRefs: true }; + return { dim, hasBinaryRefs: true, hasUsageFields: true }; } - /** Check if the index schema has a binary_refs field. */ - private parseHasBinaryRefsFromInfo(info: unknown[]): boolean { + /** Check if the index schema includes a field by identifier name. */ + private parseHasFieldFromInfo(info: unknown[], fieldName: string): boolean { for (let i = 0; i < info.length - 1; i += 2) { const key = String(info[i]); if (key !== 'attributes' && key !== 'fields') continue; @@ -1346,7 +1429,7 @@ export class SemanticCache { for (const attr of attributes) { if (!Array.isArray(attr)) continue; for (let j = 0; j < attr.length - 1; j++) { - if (String(attr[j]) === 'identifier' && String(attr[j + 1]) === 'binary_refs') { + if (String(attr[j]) === 'identifier' && String(attr[j + 1]) === fieldName) { return true; } } @@ -1355,6 +1438,148 @@ export class SemanticCache { return false; } + private rowToEntrySummary(key: string, fields: Record): EntrySummary { + return { + key, + hitCount: Number.parseInt(fields['hit_count'] ?? '0', 10) || 0, + lastAccessedAt: Number.parseInt(fields['last_accessed_at'] ?? '0', 10) || 0, + insertedAt: Number.parseInt(fields['inserted_at'] ?? '0', 10) || 0, + category: fields['category'] ?? '', + model: fields['model'] ?? '', + }; + } + + private async collectAnalyticsViaSearch( + coldCutoff: number, + topN: number, + ): Promise<{ + totalEntries: number; + neverHitCount: number; + coldEntryCount: number; + topEntries: EntrySummary[]; + }> { + const countOf = async (filter: string): Promise => { + const resp = (await this.client.call( + 'FT.SEARCH', this.indexName, filter, 'LIMIT', '0', '0', + )) as unknown[]; + return Number(resp?.[0] ?? 0); + }; + + const [totalEntries, neverHitCount, coldEntryCount] = await Promise.all([ + countOf('*'), + countOf('@hit_count:[0 0]'), + countOf(`@last_accessed_at:[0 ${coldCutoff}]`), + ]); + + let topResp: unknown; + try { + topResp = await this.client.call( + 'FT.SEARCH', this.indexName, '*', + 'RETURN', '5', 'hit_count', 'last_accessed_at', 'inserted_at', 'category', 'model', + 'SORTBY', 'hit_count', 'DESC', + 'LIMIT', '0', String(topN), + 'DIALECT', '2', + ); + } catch { + throw new Error('search-path-failed'); + } + const topEntries = parseFtSearchResponse(topResp).map((row) => + this.rowToEntrySummary(row.key, row.fields), + ); + + return { totalEntries, neverHitCount, coldEntryCount, topEntries }; + } + + private async collectAnalyticsViaScan( + coldCutoff: number, + topN: number, + ): Promise<{ + totalEntries: number; + neverHitCount: number; + coldEntryCount: number; + topEntries: EntrySummary[]; + }> { + const NEEDED_FIELDS = ['hit_count', 'last_accessed_at', 'inserted_at', 'category', 'model'] as const; + const summaries: EntrySummary[] = []; + const pattern = `${this.entryPrefix}*`; + let limitReached = false; + await clusterScan(this.client, pattern, async (keys, nodeClient) => { + if (limitReached) return; + + // Clamp batch to the remaining capacity so we never pipeline more than needed + const remaining = ENTRY_ANALYTICS_LIMIT - summaries.length; + const batch = keys.length <= remaining ? keys : keys.slice(0, remaining); + if (batch.length < keys.length) limitReached = true; + + // One pipeline round trip per SCAN batch. + // HMGET fetches only the 5 fields rowToEntrySummary reads — avoids + // pulling embedding vectors (~6 KB each), response text, and prompt. + const pipeline = nodeClient.pipeline(); + for (const key of batch) { + pipeline.hmget(key, ...NEEDED_FIELDS); + } + const results = await pipeline.exec() as Array<[Error | null, (string | null)[]]>; + + for (let i = 0; i < batch.length; i++) { + const [err, values] = results[i] ?? [new Error('no result'), null]; + if (err || !values) continue; + const fields: Record = {}; + NEEDED_FIELDS.forEach((f, j) => { + if (values[j] != null) fields[f] = values[j]!; + }); + summaries.push(this.rowToEntrySummary(batch[i], fields)); + } + }); + + const totalEntries = summaries.length; + const neverHitCount = summaries.filter((e) => e.hitCount === 0).length; + const coldEntryCount = summaries.filter( + (e) => e.hitCount === 0 || e.lastAccessedAt < coldCutoff, + ).length; + const topEntries = [...summaries] + .sort((a, b) => b.hitCount - a.hitCount) + .slice(0, topN); + + return { totalEntries, neverHitCount, coldEntryCount, topEntries }; + } + + /** Atomically bump per-entry usage counters and optionally refresh TTL. */ + private async recordEntryUsage(matchedKey: string): Promise { + try { + const pipeline = this.client.pipeline(); + pipeline.hincrby(matchedKey, 'hit_count', 1); + pipeline.hset(matchedKey, 'last_accessed_at', Date.now().toString()); + if (this.defaultTtl !== undefined) { + pipeline.expire(matchedKey, this.defaultTtl); + } + await pipeline.exec(); + } catch { + // best-effort: usage tracking and TTL refresh are non-critical on a hit. + // A pipeline failure means hit_count / last_accessed_at may not update + // and TTL may not refresh — the entry will still expire on its original + // schedule. Previously expire() was standalone and would propagate errors; + // this is an intentional behavior change to avoid failing a cache hit. + } + } + + private async recordEntryUsageBatch(matchedKeys: string[]): Promise { + if (matchedKeys.length === 0) return; + try { + const pipeline = this.client.pipeline(); + const now = Date.now().toString(); + for (const key of matchedKeys) { + pipeline.hincrby(key, 'hit_count', 1); + pipeline.hset(key, 'last_accessed_at', now); + if (this.defaultTtl !== undefined) { + pipeline.expire(key, this.defaultTtl); + } + } + await pipeline.exec(); + } catch { + // best-effort + } + } + /** Resolve a prompt (string or ContentBlock[]) into text + binary refs. */ private resolvePrompt( prompt: string | ContentBlock[], diff --git a/packages/semantic-cache/src/__tests__/discovery.test.ts b/packages/semantic-cache/src/__tests__/discovery.test.ts index aead5acb..11e7dcfd 100644 --- a/packages/semantic-cache/src/__tests__/discovery.test.ts +++ b/packages/semantic-cache/src/__tests__/discovery.test.ts @@ -130,6 +130,7 @@ describe('buildSemanticMetadata', () => { 'invalidate', 'similarity_distribution', 'threshold_adjust', + 'entry_analytics', ]); }); diff --git a/packages/semantic-cache/src/__tests__/entry-analytics.test.ts b/packages/semantic-cache/src/__tests__/entry-analytics.test.ts new file mode 100644 index 00000000..2feb5ad2 --- /dev/null +++ b/packages/semantic-cache/src/__tests__/entry-analytics.test.ts @@ -0,0 +1,192 @@ +/** + * entryAnalytics() — server-side count queries must not sample via SORTBY LIMIT. + */ +import { describe, it, expect, vi } from 'vitest'; +import { SemanticCache } from '../SemanticCache'; +import type { Valkey } from '../types'; + +const TOTAL_ENTRIES = 15_000; +const NEVER_HIT_COUNT = 12_000; +const COLD_ENTRY_COUNT = 13_000; + +function makeAnalyticsMockClient() { + const ftInfo = [ + 'attributes', + [ + ['identifier', 'embedding', 'type', 'VECTOR', 'index', ['dimensions', '2']], + ['identifier', 'hit_count', 'type', 'NUMERIC'], + ['identifier', 'last_accessed_at', 'type', 'NUMERIC'], + ], + ]; + + const call = vi.fn(async (...args: unknown[]) => { + const cmd = args[0] as string; + if (cmd === 'FT.INFO') return ftInfo; + if (cmd === 'FT.CREATE') return 'OK'; + if (cmd === 'FT.DROPINDEX') return 'OK'; + if (cmd === 'FT.SEARCH') { + const filter = String(args[2] ?? '*'); + const limitIdx = args.indexOf('LIMIT'); + const limitCount = + limitIdx >= 0 ? Number(args[limitIdx + 2]) : Number.POSITIVE_INFINITY; + const isCountOnly = limitCount === 0; + + if (isCountOnly) { + if (filter === '*') return [String(TOTAL_ENTRIES)]; + if (filter === '@hit_count:[0 0]') return [String(NEVER_HIT_COUNT)]; + if (filter.startsWith('@last_accessed_at:')) return [String(COLD_ENTRY_COUNT)]; + return ['0']; + } + + // Top-N materialization — only hot entries (would hide never-hit in a 10k sample) + return [ + '2', + 'cache:entry:hot-1', + ['hit_count', '500', 'last_accessed_at', '1', 'inserted_at', '1', 'category', '', 'model', ''], + 'cache:entry:hot-2', + ['hit_count', '400', 'last_accessed_at', '2', 'inserted_at', '2', 'category', '', 'model', ''], + ]; + } + return null; + }); + + return { + call, + hset: vi.fn(async () => 1), + hgetall: vi.fn(async () => ({})), + hincrby: vi.fn(async () => 0), + expire: vi.fn(async () => 1), + del: vi.fn(async () => 1), + scan: vi.fn(async () => ['0', []]), + get: vi.fn(async () => null), + getBuffer: vi.fn(async () => null), + set: vi.fn(async () => 'OK'), + pipeline: vi.fn(() => ({ + hincrby: vi.fn().mockReturnThis(), + hset: vi.fn().mockReturnThis(), + exec: vi.fn(async () => [[null, 1], [null, 1]]), + call: vi.fn().mockReturnThis(), + zadd: vi.fn().mockReturnThis(), + zremrangebyscore: vi.fn().mockReturnThis(), + zremrangebyrank: vi.fn().mockReturnThis(), + })), + zadd: vi.fn(async () => 1), + zrange: vi.fn(async () => []), + nodes: vi.fn(() => null), + }; +} + +describe('entryAnalytics', () => { + it('uses FT.SEARCH LIMIT 0 0 for counts so never-hit is correct when total > 10k', async () => { + const client = makeAnalyticsMockClient(); + const cache = new SemanticCache({ + client: client as unknown as Valkey, + embedFn: vi.fn(async () => [0.5, 0.5]), + name: 'test_entry_analytics', + embeddingCache: { enabled: false }, + discovery: { enabled: false }, + configRefresh: { enabled: false }, + }); + await cache.initialize(); + + const result = await cache.entryAnalytics({ topN: 2, coldAfterDays: 7 }); + + expect(result.totalEntries).toBe(TOTAL_ENTRIES); + expect(result.neverHitCount).toBe(NEVER_HIT_COUNT); + expect(result.coldEntryCount).toBe(COLD_ENTRY_COUNT); + expect(result.hitAtLeastOnceCount).toBe(TOTAL_ENTRIES - NEVER_HIT_COUNT); + expect(result.topEntries).toHaveLength(2); + expect(result.topEntries[0].hitCount).toBe(500); + + const countCalls = client.call.mock.calls.filter( + (c) => c[0] === 'FT.SEARCH' && c[c.indexOf('LIMIT') + 2] === '0', + ); + expect(countCalls.length).toBeGreaterThanOrEqual(3); + expect(countCalls.some((c) => c[2] === '*')).toBe(true); + expect(countCalls.some((c) => c[2] === '@hit_count:[0 0]')).toBe(true); + expect(countCalls.some((c) => String(c[2]).startsWith('@last_accessed_at:'))).toBe( + true, + ); + + const topCalls = client.call.mock.calls.filter( + (c) => c[0] === 'FT.SEARCH' && c.includes('SORTBY'), + ); + expect(topCalls).toHaveLength(1); + expect(topCalls[0]).toContain('LIMIT'); + expect(topCalls[0][topCalls[0].indexOf('LIMIT') + 2]).toBe('2'); + }); +}); + +describe('entryAnalytics (SCAN fallback — cap at 10k)', () => { + it('stops at ENTRY_ANALYTICS_LIMIT and does not call hgetall beyond it', async () => { + const keys = Array.from({ length: 12_000 }, (_, i) => `scan_test:entry:${i}`); + const client = { + call: vi.fn(async (...args: unknown[]) => { + const cmd = args[0] as string; + // FT.INFO without hit_count → _hasUsageFields = false → scan path + if (cmd === 'FT.INFO') { + return [ + 'attributes', + [ + [ + 'identifier', + 'embedding', + 'type', + 'VECTOR', + 'index', + ['dimensions', '2'], + ], + ], + ]; + } + if (cmd === 'FT.CREATE') return 'OK'; + return null; + }), + hgetall: vi.fn(async () => ({})), // must NOT be called — hmget pipeline is used instead + hset: vi.fn(async () => 1), + hincrby: vi.fn(async () => 0), + expire: vi.fn(async () => 1), + del: vi.fn(async () => 1), + scan: vi.fn(async () => ['0', keys] as [string, string[]]), + get: vi.fn(async () => null), + getBuffer: vi.fn(async () => null), + set: vi.fn(async () => 'OK'), + pipeline: vi.fn(() => ({ + hmget: vi.fn().mockReturnThis(), + exec: vi.fn(async () => + // Return [null, values] tuples — one per key in the batch + Array.from({ length: Math.min(keys.length, 10_000) }, () => [ + null, + ['0', '0', '0', '', ''], // hit_count, last_accessed_at, inserted_at, category, model + ]), + ), + hincrby: vi.fn().mockReturnThis(), + hset: vi.fn().mockReturnThis(), + })), + zadd: vi.fn(async () => 1), + zrange: vi.fn(async () => []), + nodes: vi.fn(() => null), + }; + + const cache = new SemanticCache({ + client: client as unknown as Valkey, + embedFn: vi.fn(async () => [0.1, 0.2]), + name: 'scan_test', + embeddingCache: { enabled: false }, + discovery: { enabled: false }, + configRefresh: { enabled: false }, + }); + await cache.initialize(); + + const result = await cache.entryAnalytics({ topN: 5 }); + expect(result.totalEntries).toBe(10_000); + + // hgetall must never be called — we use pipelined hmget for only the 5 needed fields + expect(client.hgetall).not.toHaveBeenCalled(); + + // pipeline was used (one call per SCAN batch, not per key) + expect(client.pipeline).toHaveBeenCalled(); + const pipelineCallCount = (client.pipeline as ReturnType).mock.calls.length; + expect(pipelineCallCount).toBeLessThan(10_000); + }); +}); diff --git a/packages/semantic-cache/src/discovery.ts b/packages/semantic-cache/src/discovery.ts index f779cb06..cb964256 100644 --- a/packages/semantic-cache/src/discovery.ts +++ b/packages/semantic-cache/src/discovery.ts @@ -49,7 +49,12 @@ export function buildSemanticMetadata(input: BuildSemanticMetadataInput): Marker prefix: input.name, version: input.version, protocol_version: PROTOCOL_VERSION, - capabilities: ['invalidate', 'similarity_distribution', 'threshold_adjust'], + capabilities: [ + 'invalidate', + 'similarity_distribution', + 'threshold_adjust', + 'entry_analytics', + ], index_name: `${input.name}:idx`, stats_key: `${input.name}:__stats`, config_key: `${input.name}:__config`, diff --git a/packages/semantic-cache/src/index.ts b/packages/semantic-cache/src/index.ts index 4192c15b..b2314dae 100644 --- a/packages/semantic-cache/src/index.ts +++ b/packages/semantic-cache/src/index.ts @@ -15,6 +15,9 @@ export type { RerankOptions, JudgeOptions, ConfigRefreshOptions, + EntryAnalyticsOptions, + EntryAnalyticsResult, + EntrySummary, } from './types'; export { SemanticCacheUsageError, diff --git a/packages/semantic-cache/src/types.ts b/packages/semantic-cache/src/types.ts index 1fc18baf..a9e06759 100644 --- a/packages/semantic-cache/src/types.ts +++ b/packages/semantic-cache/src/types.ts @@ -347,3 +347,47 @@ export interface IndexInfo { dimension: number; indexingState: string; } + +export interface EntryAnalyticsOptions { + /** Number of hottest entries to return. Default: 10. */ + topN?: number; + /** + * An entry is "cold" if it has not been accessed within this many days. + * A never-hit entry is always counted as cold. Default: 7. + */ + coldAfterDays?: number; +} + +/** Lightweight summary of a single cache entry for analytics output. */ +export interface EntrySummary { + /** Valkey key of the entry. */ + key: string; + /** Number of times this entry has been returned as a hit. */ + hitCount: number; + /** Epoch ms of the last hit, or 0 if never hit. */ + lastAccessedAt: number; + /** Epoch ms the entry was stored. */ + insertedAt: number; + /** Category tag, or '' if none. */ + category: string; + /** Model tag, or '' if none. */ + model: string; +} + +export interface EntryAnalyticsResult { + /** Total number of entries in the index. */ + totalEntries: number; + /** Entries with hitCount === 0. */ + neverHitCount: number; + /** Entries with hitCount >= 1. */ + hitAtLeastOnceCount: number; + /** + * Entries considered cold: never hit, OR last accessed longer ago than + * coldAfterDays. Includes neverHitCount. + */ + coldEntryCount: number; + /** The topN hottest entries, descending by hitCount. */ + topEntries: EntrySummary[]; + /** The coldAfterDays cutoff that was applied. */ + coldAfterDays: number; +}