diff --git a/CLAUDE.md b/CLAUDE.md index 548d2b94..b16062a4 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -116,7 +116,7 @@ poetry run ruff check slayer/ tests/ **Embedding storage**: `SQLiteStorage` writes embeddings into the main `.db`; `YAMLStorage` uses a sidecar `/embeddings.db` so the YAML store stays git-diffable. Both go through `slayer/storage/sidecar_embedding_store.py`. Cascade-delete on a `canonical_id` matches exactly or as a strict dotted-path descendant — never as a character prefix. - **Sample-value snapshots** are cached on `Column.sampled` (text), `Column.sampled_values` (structured top-50 list for categorical columns, DEV-1480), and `Column.distinct_count` (true cardinality for categorical columns, DEV-1480). Refreshed on `slayer ingest` (table-backed models only), on `slayer search refresh-samples`, on `edit_model` (column edits → that column; model-level changes to `filters` / `sql` / `source_queries` → every column), and lazily on `inspect_model` cache miss (best-effort write-back). Categorical columns are ordered by count desc with alphabetical tie-break; the structured list is the consumer-facing way to compare predicate literals against actual stored values (text-split on `sampled` is ambiguous for values containing commas, e.g. `"R$ 1,000–3,000"`). Cache validity for categorical columns requires `sampled_values is not None` (v6 → v7 upgrades re-profile on next `inspect_model`). sql-mode and query-backed models do not yet have sample-value coverage. + **Sample-value snapshots** are cached on `Column.sampled` (text), `Column.sampled_values` (structured top-50 list for categorical columns, DEV-1480), and `Column.distinct_count` (true cardinality for categorical columns, DEV-1480). Refreshed on `slayer ingest` (table-backed models only), on `slayer search refresh-samples`, on `edit_model` (column edits → that column; model-level changes to `filters` / `sql` / `source_queries` → every column), lazily on `inspect_model` cache miss (best-effort write-back), and — DEV-1516 — lazily inside `search()` for any column hit whose persisted `sampled_values` is stale (pre-DEV-1480 legacy data or count_distinct-failed retry). Categorical columns are ordered by count desc with alphabetical tie-break; the structured list is the consumer-facing way to compare predicate literals against actual stored values (text-split on `sampled` is ambiguous for values containing commas, e.g. `"R$ 1,000–3,000"`). Cache validity for categorical columns requires `sampled_values is not None` (v6 → v7 upgrades re-profile on next `inspect_model` or next `search()` column hit). sql-mode and query-backed models do not yet have sample-value coverage. **DEV-1516**: the shared refresh helper `slayer/engine/profiling.py::ensure_column_sample_fresh(model, column, engine, storage)` is the single source of truth for the categorical cache-miss + persist pattern — both `inspect_model` (categorical loop) and `SearchService` (post-fusion column-hit hook) delegate to it. `SearchService.__init__` accepts an optional `engine: Optional[SlayerQueryEngine] = None`; when supplied (MCP `create_mcp_server` + REST `create_app` both wire it), the post-fusion hook walks column hits, groups by `(data_source, model_name)` (serialised within group, parallelised across via `asyncio.gather`), refreshes via the helper, and re-renders `EntityHit.text` via `render_column_text`. When `engine=None` the hook is a silent no-op (storage-only test contexts keep working). **DEV-1516 rendering**: `render_column_text` surfaces the **full** `sampled_values` list (typically 50) when populated — emitted as a JSON-encoded array (`["paid", "refunded", ...]`) so values containing commas (e.g. `"R$ 1,000–3,000"`) survive intact to the consumer, plus a `Distinct count: N` follow-up line when `distinct_count > len(sampled_values)`; falls back to the persisted `sampled` text only when `sampled_values is None`. Empty list (`[]`) is authoritative — skips the line entirely (no fallback to stale `sampled`). `inspect_model` markdown table continues to show the 20-truncated `sampled` text per column (readability on wide models). Known limitation: ranking (BM25 + tantivy + embeddings) still operates on stale corpus text — only the returned `EntityHit.text` is refreshed; sample-value-token search recall past position 20 stays bounded by indexed text until next ingest pass content-hashes the column and re-embeds it. `inspect_model` auto-renders a `Learnings` section showing only learning-only memories (`query is None`); query-bearing memories surface only via `search` in the `example_queries` bucket. diff --git a/docs/concepts/search.md b/docs/concepts/search.md index ad9e2768..1a23401b 100644 --- a/docs/concepts/search.md +++ b/docs/concepts/search.md @@ -317,12 +317,65 @@ All three are populated: - on `edit_model` (column edits → that column; model-level filter / sql / source-query body change → every column); - lazily on `inspect_model` when the cached value is missing (write-back - best-effort). Cache validity for categorical columns requires - `sampled_values is not None` — v6 (legacy `sampled` only) models - re-profile on next call so the structured field gets populated. + best-effort); +- lazily inside `search()` itself for any column hit whose persisted + `sampled_values` is stale (DEV-1516). The post-fusion column-hit hook + groups hits by `(data_source, model_name)` — refreshes within a model + serialise (the storage write is a model-level read-modify-write); + refreshes across different models run concurrently via + `asyncio.gather`. When `search()` is constructed without an engine + (storage-only contexts), the hook is a silent no-op. + +Cache validity for categorical columns requires `sampled_values is not None` — +v6 (legacy `sampled` only) models re-profile on the next `inspect_model` +or `search()` column hit so the structured field gets populated. sql-mode and query-backed models are silently skipped in v1. +### How sample values surface in search results + +The per-column doc rendered by `slayer/search/render.py:render_column_text` +prefers the structured `sampled_values` list (full top-50) over the +20-truncated `sampled` text. When `sampled_values` is populated: + +```text +Column: warehouse.orders.status +Type: TEXT +Description: Order status. +Sample values: ["paid", "refunded", "cancelled", "pending", …] ← JSON-encoded, all 50 +Distinct count: 12345 ← only when distinct_count > len(sampled_values) +``` + +The list is rendered as a JSON array (not comma-joined) so values that +themselves contain commas — `"R$ 1,000–3,000"`, locale-formatted numbers, +multi-clause labels — survive unambiguously to the consumer. This is why +DEV-1480 introduced the structured `sampled_values` field in the first +place; comma-joining it back to a flat string would re-introduce the +exact ambiguity it was meant to solve. + +When `sampled_values` is `None` (numeric / temporal columns, or legacy +v6 data, or rare overflow-with-failed-count_distinct rows), the renderer +falls back to the persisted `sampled` text — which already carries the +`... (N distinct)` suffix for the legacy overflow case, so no extra +`Distinct count` line is emitted. An empty `sampled_values=[]` list is +authoritative-empty: the line is skipped entirely (no fallback to stale +`sampled`). + +This same text feeds both the per-column search index doc AND +`EntityHit.text` returned by `search()` — single renderer, single +source of truth. `inspect_model`'s markdown `## Columns` table is the +**all-columns-at-once** surface and continues to show the 20-truncated +`sampled` text per column for readability on wide models. JSON +`inspect_model` output already carries the full `sampled_values` list. + +**Known limitation.** The refresh hook runs **after** RRF fusion, on the +top-K hits being returned. Ranking (BM25 / tantivy / embeddings) still +operates on whatever the corpus held at index-build time. A query whose +only match against a column is a newly-revealed value in positions 21-50 +may still fail to surface that column. The text the agent sees IS +refreshed; tantivy / embeddings will catch up on the next +`slayer ingest` content-hash pass. + ## Index design notes - The tantivy index is built **fresh on every search call** in v1 (no diff --git a/slayer/api/server.py b/slayer/api/server.py index ab544d9e..d707f758 100644 --- a/slayer/api/server.py +++ b/slayer/api/server.py @@ -650,7 +650,9 @@ async def delete_memory(memory_id: str) -> Dict[str, Any]: # ---------- DEV-1375: semantic search ----------------------------- - search_service = SearchService(storage=storage) + # DEV-1516: pass the engine so the search service's post-fusion + # column-hit hook can auto-refresh stale categorical columns. + search_service = SearchService(storage=storage, engine=engine) @app.post( "/search", diff --git a/slayer/engine/profiling.py b/slayer/engine/profiling.py index cdf6321e..9fb73c97 100644 --- a/slayer/engine/profiling.py +++ b/slayer/engine/profiling.py @@ -27,8 +27,13 @@ - Categorical query orders by per-value count desc (alphabetical tie-break in SQL) so the persisted top-N is "most common values first". - New ``Column.sampled_values: Optional[List[str]]`` carries the top-50 - list verbatim (no ambiguous text split). Stays ``None`` for overflow >50 - and for numeric/temporal columns. + list verbatim (no ambiguous text split). For categorical columns it is + populated on ≤50 distinct AND on overflow when the secondary + ``count_distinct`` query succeeds. It stays ``None`` only for + numeric/temporal columns AND for the rare overflow branch where the + secondary ``count_distinct`` query fails (intentional cache-miss retry + signal — ``_is_sample_cached`` flags it stale so the next + ``inspect_model`` / ``refresh-samples`` call retries). - New ``Column.distinct_count: Optional[int]`` carries the true total cardinality; the overflow branch fires a second ``count_distinct`` query via a transient ``ModelExtension`` (bypassing ``Column.allowed_aggregations`` @@ -40,10 +45,18 @@ ``values=None, distinct_count=None`` to signal "data omitted from the legacy entry". The richer DEV-1480 data only lives on ``ColumnSample`` produced by ``profile_column``. + +DEV-1516 additions: +- :func:`ensure_column_sample_fresh` — shared cache-aware refresh helper + used by both ``inspect_model``'s categorical loop and the search + service's post-fusion column-hit hook. Returns the input column on cache + hit / non-categorical / failure, and an in-memory refreshed copy on + success (after persisting via storage). """ from __future__ import annotations +import logging from typing import Any, Dict, List, NamedTuple, Optional, Set, Tuple from slayer.core.enums import DataType @@ -53,6 +66,9 @@ from slayer.storage.base import StorageBackend +logger = logging.getLogger(__name__) + + # --------------------------------------------------------------------------- # DEV-1480: categorical cap and public-ish result type # --------------------------------------------------------------------------- @@ -629,3 +645,105 @@ async def handle_edit_refresh( ) ) return warnings + + +# --------------------------------------------------------------------------- +# DEV-1516: shared cache-aware refresh helper +# --------------------------------------------------------------------------- + + +async def ensure_column_sample_fresh( + *, + model: SlayerModel, + column: Column, + engine: SlayerQueryEngine, + storage: StorageBackend, +) -> Column: + """Best-effort refresh of a stale categorical column's persisted sample. + + Used by both :func:`slayer.mcp.server.inspect_model` (categorical cache + miss path) and :class:`slayer.search.service.SearchService` (post-fusion + column-hit hook) so DEV-1516's "stale columns auto-refresh on the spot" + contract has a single source of truth. + + Returns the **input column unchanged** when: + + - ``_is_sample_cached(column)`` is True (cache hit; includes hidden / + primary-key columns by convention), + - the column is not categorical (numeric / temporal are handled by + ``inspect_model``'s batched min/max path; the search hook never + refreshes them since their ``sampled`` text has no 20-vs-50 issue), + - :func:`profile_column` returns ``None`` (e.g. transient query failure + or no rows), + - :func:`profile_column` raises (logged + swallowed), + - ``storage.update_column_sampled`` raises (logged + swallowed; the + in-memory refresh is still returned so the caller can render fresh + data this call). + + Returns a Pydantic ``model_copy``'d column with refreshed + ``sampled`` / ``sampled_values`` / ``distinct_count`` fields on + success (after persisting via storage). + + Logs ``WARNING`` on profile + persist failures with + ``(data_source, model_name, column_name)`` context so observability + matches the pre-DEV-1516 inline implementation in ``inspect_model``. + """ + if _is_sample_cached(column): + return column + if column.type not in _CATEGORICAL_TYPES: + # Numeric / temporal: inspect_model handles them via the batched + # min/max query. The search refresh hook intentionally skips them + # because their ``sampled`` text is a min/max range, not a value + # list — the 20-vs-50 distinction does not apply. + return column + try: + sample = await profile_column( + model=model, column=column, engine=engine, + ) + except Exception as exc: # NOSONAR(S112) — best-effort: see module docstring + logger.warning( + "ensure_column_sample_fresh: failed to profile %s.%s.%s: %s", + model.data_source, model.name, column.name, exc, + ) + return column + if sample is None: + # No data to persist (e.g. PK / hidden / no rows). Helper short- + # circuits without writing — keeps cache predicate from flipping. + return column + if ( + sample.sampled_values is None + and sample.distinct_count is None + and column.sampled + ): + # Overflow-retry path failed to recover structured data: the + # ``ColumnSample`` carries only the generic ``"> 50 distinct"`` + # marker. The column already has a richer ``sampled`` text + # (e.g. v6 legacy ``"a, b, c ... (1234 distinct)"`` or a + # previous successful-overflow run). Skip the persist + return + # the input so the rich text survives — cache predicate still + # flags the column stale so the next call retries. + return column + try: + await storage.update_column_sampled( + data_source=model.data_source, + model_name=model.name, + column_name=column.name, + sampled=sample.sampled, + sampled_values=sample.sampled_values, + distinct_count=sample.distinct_count, + ) + except Exception as exc: # NOSONAR(S112) — best-effort: see module docstring + logger.warning( + "ensure_column_sample_fresh: failed to persist sample for " + "%s.%s.%s via update_column_sampled: %s", + model.data_source, model.name, column.name, exc, + ) + # Fall through: surface the in-memory refresh so the caller can + # still render fresh data this call. Next call will retry — the + # cache predicate still flags the column stale because the persist + # never landed. + return column.model_copy(update={ + "sampled": sample.sampled, + "sampled_values": sample.sampled_values, + "distinct_count": sample.distinct_count, + }) diff --git a/slayer/mcp/server.py b/slayer/mcp/server.py index 9e33fa8a..f2d31cdb 100644 --- a/slayer/mcp/server.py +++ b/slayer/mcp/server.py @@ -25,8 +25,8 @@ from slayer.engine.profiling import ( _is_sample_cached, _profile_numeric_temporal_columns, + ensure_column_sample_fresh, handle_edit_refresh, - profile_column, ) from slayer.engine.query_engine import SlayerQueryEngine, SlayerResponse from slayer.help import TOPIC_SUMMARY_LINE, render_help @@ -1395,30 +1395,28 @@ async def _persist_sample( ) # Categorical: one top-values query per column (+ optional - # count_distinct on overflow). + # count_distinct on overflow). DEV-1516: delegates to the + # shared ``ensure_column_sample_fresh`` helper so the + # cache-miss + persist + render-dict-population pattern is + # owned by exactly one place (also used by the search + # service's post-fusion column-hit hook). for col in cat_uncached: - try: - sample = await profile_column( - model=model, column=col, engine=engine, - ) - except Exception as exc: - logger.warning( - "inspect_model: failed to profile %s.%s.%s: %s", - model.data_source, model.name, col.name, exc, - ) - sample = None - if sample is None: - continue - if sample.sampled is not None: - profile_by_name[col.name] = sample.sampled - profile_values_by_name[col.name] = sample.sampled_values - distinct_count_by_name[col.name] = sample.distinct_count - await _persist_sample( - col_name=col.name, - sampled=sample.sampled, - sampled_values=sample.sampled_values, - distinct_count=sample.distinct_count, + refreshed = await ensure_column_sample_fresh( + model=model, column=col, + engine=engine, storage=storage, ) + # On any failure (profile raise / None / persist raise) + # the helper returns the INPUT column. Legacy ``sampled`` + # text on the input still feeds the markdown cell — the + # pre-pass at lines 1347-1354 has already populated + # ``profile_by_name[col.name]`` from ``col.sampled``, + # so we only overwrite when we actually have something + # fresher (avoids clobbering the legacy fallback with + # ``None`` and producing an empty cell). + if refreshed.sampled is not None: + profile_by_name[col.name] = refreshed.sampled + profile_values_by_name[col.name] = refreshed.sampled_values + distinct_count_by_name[col.name] = refreshed.distinct_count # Numeric/temporal: one batched min/max query for all of # them at once (restores the pre-DEV-1480 batching for @@ -2825,7 +2823,9 @@ async def forget_memory(id: Any) -> str: # noqa: A002 — MCP arg name # ---------- DEV-1375: semantic search ----------------------------- - search_service = SearchService(storage=storage) + # DEV-1516: pass the engine so the search service's post-fusion + # column-hit hook can auto-refresh stale categorical columns. + search_service = SearchService(storage=storage, engine=engine) @mcp.tool() async def search( diff --git a/slayer/search/render.py b/slayer/search/render.py index f3ed1028..6ea8ec96 100644 --- a/slayer/search/render.py +++ b/slayer/search/render.py @@ -26,6 +26,7 @@ from __future__ import annotations +import json from typing import List from pydantic import BaseModel @@ -181,11 +182,36 @@ def render_column_text(*, model: SlayerModel, column: Column) -> str: lines.append(f"SQL: {column.sql}") if column.filter: lines.append(f"Filter: {column.filter}") - # DEV-1480: skip the line when ``sampled`` is empty (all-NULL profiled - # categorical column). Avoids a bare ``Sample values: `` trailer in the - # embedded doc text, and keeps the content_hash stable for columns whose - # only DEV-1480 change is the new structured ``sampled_values`` field. - if column.sampled: + # DEV-1516: prefer the structured ``sampled_values`` list (full top-50) + # over the 20-truncated ``sampled`` text. ``is None`` gates the fallback + # so an authoritative empty list (``[]``) does not re-surface a stale + # ``sampled`` text; an empty list simply skips the line (avoids a bare + # ``Sample values: `` trailer in the indexed text). + if column.sampled_values is not None: + if column.sampled_values: + # JSON-encode the list to preserve values that contain commas + # (e.g. ``"R$ 1,000–3,000"``) — comma-joining would re-introduce + # the exact ambiguity that the structured ``sampled_values`` field + # was meant to solve. + lines.append( + "Sample values: " + + json.dumps(column.sampled_values, ensure_ascii=False) + ) + # Overflow signal: render true cardinality on a follow-up line + # only when STRICTLY greater than the values we returned. Equal + # means we returned the entire set; emitting a hint would be + # noise. Gated on ``sampled_values is not None`` so the legacy + # ``"... (N distinct)"`` suffix in ``sampled`` text does not get + # duplicated by an extra line. + if ( + column.distinct_count is not None + and column.distinct_count > len(column.sampled_values) + ): + lines.append(f"Distinct count: {column.distinct_count}") + elif column.sampled: + # Fallback for numeric/temporal columns (``sampled`` is a min/max + # range, not a list) and pre-DEV-1480 legacy data where the + # structured field was never populated. lines.append(f"Sample values: {column.sampled}") if column.primary_key: lines.append("Primary key: yes") diff --git a/slayer/search/service.py b/slayer/search/service.py index 20c0b6ef..22dcc85b 100644 --- a/slayer/search/service.py +++ b/slayer/search/service.py @@ -43,6 +43,7 @@ from __future__ import annotations import asyncio +import logging from typing import Dict, List, Optional, Set, Tuple, Union from pydantic import BaseModel, Field @@ -50,6 +51,8 @@ from slayer.core.errors import AmbiguousModelError, EntityResolutionError from slayer.core.models import SlayerModel from slayer.core.query import SlayerQuery +from slayer.engine.profiling import ensure_column_sample_fresh +from slayer.engine.query_engine import SlayerQueryEngine from slayer.memories.models import MEMORY_CANONICAL_PREFIX as _MEMORY_PREFIX from slayer.memories.models import Memory from slayer.memories.resolver import ( @@ -60,6 +63,7 @@ from slayer.search.index import Corpus, build_in_memory_corpus from slayer.search.render import ( collect_model_entity_pairs, + render_column_text, render_datasource_pair, ) from slayer.search.retriever import RetrievalResult, Retriever @@ -72,6 +76,9 @@ from slayer.storage.base import StorageBackend +logger = logging.getLogger(__name__) + + _RRF_K = 60 @@ -499,6 +506,35 @@ async def _lookup_named_entity( # --------------------------------------------------------------------------- +def _group_column_hits( + entity_hits: List["EntityHit"], +) -> Dict[Tuple[str, str], List[Tuple[int, "EntityHit", str]]]: + """DEV-1516 helper: split a fused entity-hit list into per-model + buckets for the search-side sample-refresh hook. + + Walks ``entity_hits``, keeps only ``kind == "column"`` hits whose + canonical id parses as ``..`` (3 + segments), and groups them by ``(data_source, model_name)`` so the + caller can serialise writes within a model and parallelise across + models. Each member tuple is ``(original_hit_index, hit, + column_name)`` — the index is preserved so caller can splice + refreshed text back into the original list in place.""" + groups: Dict[Tuple[str, str], List[Tuple[int, EntityHit, str]]] = {} + for idx, hit in enumerate(entity_hits): + if hit.kind != "column": + continue + segments = hit.id.split(".") + if len(segments) != 3: + # Bare datasource / model canonicals are handled by other + # kinds; columns are always 3-segment. + continue + data_source, model_name, column_name = segments + groups.setdefault((data_source, model_name), []).append( + (idx, hit, column_name) + ) + return groups + + class SearchService: """Orchestrates the registered retrievers + RRF fusion.""" @@ -506,9 +542,16 @@ def __init__( self, *, storage: StorageBackend, + engine: Optional[SlayerQueryEngine] = None, retrievers: Optional[List[Retriever]] = None, ) -> None: + """DEV-1516: ``engine`` is optional so storage-only test contexts + keep working unchanged. When supplied, the post-fusion column-hit + hook auto-refreshes stale categorical columns via + :func:`ensure_column_sample_fresh` before rendering ``EntityHit.text``. + Without an engine the hook is a silent no-op.""" self._storage = storage + self._engine = engine self._retrievers: List[Retriever] = ( list(retrievers) if retrievers is not None else self._default_retrievers(storage) @@ -526,6 +569,82 @@ def _default_retrievers(storage: StorageBackend) -> List[Retriever]: def retrievers(self) -> List[Retriever]: return self._retrievers + async def _refresh_stale_column_hits( + self, + *, + entity_hits: List[EntityHit], + ) -> List[EntityHit]: + """DEV-1516 post-fusion column-hit refresh. + + Groups column hits by ``(data_source, model_name)`` and dispatches + each group to :meth:`_refresh_group_worker`. Per-model writes + serialise (storage's ``update_column_sampled`` is a model-level + read-modify-write); cross-model writes parallelise via + ``asyncio.gather``. Returns ``entity_hits`` with refreshed text + spliced in for each column hit whose helper call returned a + materially-updated column.""" + assert self._engine is not None # caller-guarded + groups = _group_column_hits(entity_hits) + if not groups: + return entity_hits + refreshed_by_idx: Dict[int, EntityHit] = {} + await asyncio.gather(*[ + self._refresh_group_worker( + ds_name=ds, model_name=model_name, + members=members, refreshed_by_idx=refreshed_by_idx, + ) + for (ds, model_name), members in groups.items() + ]) + if not refreshed_by_idx: + return entity_hits + return [ + refreshed_by_idx.get(i, h) for i, h in enumerate(entity_hits) + ] + + async def _refresh_group_worker( + self, + *, + ds_name: str, + model_name: str, + members: List[Tuple[int, EntityHit, str]], + refreshed_by_idx: Dict[int, EntityHit], + ) -> None: + """Refresh every column hit on one ``(data_source, model_name)`` + group sequentially (per-model serialisation). Loads the model + once, walks members, and writes refreshed hits into the shared + ``refreshed_by_idx`` buffer keyed by original hit index.""" + try: + model = await self._storage.get_model( + model_name, data_source=ds_name, + ) + except Exception as exc: # NOSONAR(S112) — best-effort + logger.warning( + "search refresh: failed to load model %s.%s: %s", + ds_name, model_name, exc, + ) + return + if model is None: + return + for idx, hit, column_name in members: + col = model.get_column(column_name) + if col is None: + continue + refreshed_col = await ensure_column_sample_fresh( + model=model, + column=col, + engine=self._engine, # type: ignore[arg-type] + storage=self._storage, + ) + if refreshed_col is col: + # Helper returned the input — cache hit, ineligible, or + # any failure. Leave the hit text as-is. + continue + refreshed_by_idx[idx] = hit.model_copy(update={ + "text": render_column_text( + model=model, column=refreshed_col, + ), + }) + # ------------------------------------------------------------------ # Read side — search() # ------------------------------------------------------------------ @@ -689,6 +808,17 @@ async def search( # NOSONAR(S3776) — single orchestrator entry point; stages named_kind_text=named_kind_text, max_entities=max_entities, ) + # DEV-1516: refresh stale categorical column hits in-place before + # returning. Group by (data_source, model_name) so persists for + # different columns of the SAME model are serialized (the storage + # write is a model-level read-modify-write — concurrent updates + # would lose data); persists across DIFFERENT models run + # concurrently via ``asyncio.gather``. Silently no-op when engine + # is None. + if self._engine is not None: + entity_hits = await self._refresh_stale_column_hits( + entity_hits=entity_hits, + ) return SearchResponse( memories=memory_hits, example_queries=example_query_hits, diff --git a/tests/integration/test_mcp_inspect.py b/tests/integration/test_mcp_inspect.py index 355130ed..9e4c382d 100644 --- a/tests/integration/test_mcp_inspect.py +++ b/tests/integration/test_mcp_inspect.py @@ -715,6 +715,122 @@ async def test_all_null_categorical_outputs_empty_string_not_all_null( # Text sampled is empty string, NOT the numeric-fallback "all NULL". assert notes["sampled"] == "" + async def test_markdown_table_caps_sampled_at_20_values( + self, tmp_path, + ) -> None: + """DEV-1516: per-column markdown rendering must still show at most + 20 values per column (via the persisted ``Column.sampled`` text) + even when ``sampled_values`` holds the full 50. The markdown table + is the all-columns-at-once surface and stays readable.""" + db_path = tmp_path / "wide_values.db" + conn = sqlite3.connect(str(db_path)) + conn.cursor().execute( + "CREATE TABLE many_values (id INTEGER PRIMARY KEY, kind TEXT)" + ) + # Insert 30 distinct categorical values. <= 50 so no overflow, but + # > 20 so the markdown text cap kicks in. + conn.executemany( + "INSERT INTO many_values VALUES (?, ?)", + [(i, f"k_{i:02d}") for i in range(1, 31)], + ) + conn.commit() + conn.close() + storage = YAMLStorage(base_dir=str(tmp_path / "storage")) + await storage.save_datasource(DatasourceConfig( + name="mv_ds", type="sqlite", database=str(db_path), + )) + await storage.save_model(SlayerModel( + name="many_values", sql_table="many_values", data_source="mv_ds", + columns=[ + Column(name="id", type=DataType.INT, primary_key=True), + Column(name="kind", type=DataType.TEXT), + ], + )) + server = create_mcp_server(storage=storage) + content, _ = await server.call_tool( + name="inspect_model", + arguments={"model_name": "many_values"}, # markdown + ) + text = content[0].text + col_section = text.split("## Columns")[1] + kind_row_start = col_section.find("| kind ") + if kind_row_start < 0: + pytest.fail("``kind`` row not found in markdown ## Columns table") + # Scan only the kind row (terminated by newline). + kind_row = col_section[kind_row_start:col_section.find("\n", kind_row_start)] + # Codex round-3 finding #9: positive AND negative assertions. + # Values within the cap MUST appear (the cell isn't empty). + assert "k_01" in kind_row, ( + "markdown table is missing in-cap sample values — sampled cell " + "appears empty" + ) + assert "k_20" in kind_row, ( + "markdown table dropped a top-20 sample value" + ) + # Values past the 20-cap MUST NOT appear. + assert "k_21" not in kind_row, ( + "DEV-1516 regression: markdown table leaked the 21st sample value " + "for one column — the 20-cap is broken." + ) + assert "k_29" not in kind_row, ( + "DEV-1516 regression: markdown table leaked the 29th sample value " + "for one column. The full 50 belongs only on per-column search " + "hits, not the all-columns inspect_model markdown surface." + ) + # And the persisted ``sampled_values`` still carries the full set. + reloaded = await storage.get_model("many_values", data_source="mv_ds") + assert reloaded is not None + kind_col = reloaded.get_column("kind") + assert kind_col is not None + assert kind_col.sampled_values is not None + assert len(kind_col.sampled_values) == 30 + + async def test_legacy_sampled_text_preserved_in_markdown_on_profile_failure( + self, env, monkeypatch, + ) -> None: + """Codex finding #6: when ``inspect_model``'s refactor delegates to + ``ensure_column_sample_fresh`` and ``profile_column`` raises, the + markdown column for ``sampled`` must still show the LEGACY persisted + text. The helper returns the input column on failure (which still + carries the legacy ``sampled`` set), so the markdown rendering + reads the legacy value from there. Don't surface an empty cell.""" + storage = env["storage"] + # Pre-populate a v6-style legacy state: ``sampled`` set, no list. + await storage.update_column_sampled( + data_source="test_sqlite", model_name="orders", + column_name="status", + sampled="legacy text from v6", + sampled_values=None, + distinct_count=None, + ) + + # Force every profile_column call to fail. + async def explodes(**_kwargs): + raise RuntimeError("simulated profile failure") + + monkeypatch.setattr( + "slayer.engine.profiling.profile_column", explodes, + ) + + server = create_mcp_server(storage=storage) + content, _ = await server.call_tool( + name="inspect_model", + arguments={"model_name": "orders"}, # markdown by default + ) + text = content[0].text + col_section = text.split("## Columns")[1] + status_row_start = col_section.find("| status ") + assert status_row_start >= 0 + status_row = col_section[status_row_start:col_section.find("\n", status_row_start)] + # Legacy text MUST appear in the cell so the agent still sees data. + assert "legacy text from v6" in status_row, ( + "Codex finding #6: legacy ``Column.sampled`` text must survive " + "a profile_column failure in inspect_model. The refactored " + "helper returns the INPUT column on failure (which carries " + "the legacy text); the markdown rendering must read from " + "there. An empty cell is a regression." + ) + async def test_profiles_more_than_10_categorical_columns( self, tmp_path, ) -> None: diff --git a/tests/test_engine_profiling.py b/tests/test_engine_profiling.py index 441560f6..2a1673c5 100644 --- a/tests/test_engine_profiling.py +++ b/tests/test_engine_profiling.py @@ -31,6 +31,7 @@ from slayer.engine.query_engine import SlayerQueryEngine from slayer.engine.profiling import ( ColumnSample, + ensure_column_sample_fresh, profile_column, refresh_table_backed_model_sampled, ) @@ -823,3 +824,382 @@ async def capturing(**kwargs): assert "sampled" in kw assert "sampled_values" in kw assert "distinct_count" in kw + + +# --------------------------------------------------------------------------- +# ensure_column_sample_fresh — DEV-1516 shared cache-aware refresh helper +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_ensure_fresh_returns_input_when_cache_hit( + sqlite_setup, monkeypatch, +) -> None: + """When ``_is_sample_cached(column)`` returns True (categorical with + ``sampled_values`` populated), the helper must short-circuit — no + profile call, no persist call, returns the same column object.""" + engine, storage = sqlite_setup + model = await storage.get_model("orders", data_source="ds") + col = model.get_column("status") + assert col is not None + # Pre-populate as if already cached. + col.sampled = "paid, refunded, cancelled" + col.sampled_values = ["paid", "refunded", "cancelled"] + col.distinct_count = 3 + + profile_calls = {"n": 0} + + async def boom_profile(*_args, **_kwargs): # NOSONAR(S7503) — required async signature: monkeypatches profile_column (async) + profile_calls["n"] += 1 + raise AssertionError("profile_column should not be called on cache hit") + + persist_calls = {"n": 0} + original_persist = storage.update_column_sampled + + async def counting_persist(**kwargs): + persist_calls["n"] += 1 + return await original_persist(**kwargs) + + monkeypatch.setattr("slayer.engine.profiling.profile_column", boom_profile) + monkeypatch.setattr(storage, "update_column_sampled", counting_persist) + + result = await ensure_column_sample_fresh( + model=model, column=col, engine=engine, storage=storage, + ) + assert result is col, "cache hit should return the same column object" + assert profile_calls["n"] == 0 + assert persist_calls["n"] == 0 + + +@pytest.mark.asyncio +async def test_ensure_fresh_categorical_miss_profiles_and_persists( + sqlite_setup, +) -> None: + """DEV-1516: a categorical column with stale ``sampled_values=None`` + triggers a live profile, persists via storage, and returns a refreshed + column model with the populated structured fields.""" + engine, storage = sqlite_setup + model = await storage.get_model("orders", data_source="ds") + col = model.get_column("status") + assert col is not None + # Force the stale state. + col.sampled = None + col.sampled_values = None + col.distinct_count = None + + refreshed = await ensure_column_sample_fresh( + model=model, column=col, engine=engine, storage=storage, + ) + assert refreshed.sampled is not None + assert refreshed.sampled_values is not None + assert refreshed.distinct_count is not None + # And persistence happened. + reloaded = await storage.get_model("orders", data_source="ds") + reloaded_col = reloaded.get_column("status") + assert reloaded_col is not None + assert reloaded_col.sampled_values is not None + assert reloaded_col.distinct_count is not None + + +@pytest.mark.asyncio +async def test_ensure_fresh_does_not_clobber_rich_sampled_on_overflow_retry_failure( + sqlite_setup, monkeypatch, +) -> None: + """CodeRabbit thread 1: a legacy v6 column has rich ``sampled`` + text (e.g. ``"a, b, c ... (1234 distinct)"``) but no + ``sampled_values``. If the secondary count_distinct query fails on + re-profile, ``profile_column`` returns + ``ColumnSample(sampled="> 50 distinct", sampled_values=None, + distinct_count=None)``. The helper must NOT clobber the richer + cached text with the generic fallback marker.""" + engine, storage = sqlite_setup + model = await storage.get_model("orders", data_source="ds") + assert model is not None + col = model.get_column("status") + assert col is not None + # Simulate legacy v6 state. + col.sampled = "paid, refunded, cancelled ... (1234 distinct)" + col.sampled_values = None + col.distinct_count = None + + # Fake profile_column returns the overflow-retry-failed marker. + async def overflow_retry_fail(**_kwargs): # NOSONAR(S7503) — required async signature: monkeypatches profile_column (async) + return ColumnSample( + sampled="> 50 distinct", + sampled_values=None, + distinct_count=None, + ) + + persist_calls: list = [] + original_persist = storage.update_column_sampled + + async def tracking_persist(**kwargs): + persist_calls.append(kwargs) + return await original_persist(**kwargs) + + monkeypatch.setattr( + "slayer.engine.profiling.profile_column", overflow_retry_fail, + ) + monkeypatch.setattr(storage, "update_column_sampled", tracking_persist) + + result = await ensure_column_sample_fresh( + model=model, column=col, engine=engine, storage=storage, + ) + # Returned column retains the rich legacy text. + assert result.sampled == "paid, refunded, cancelled ... (1234 distinct)" + # No persist attempted (would clobber the rich text in storage). + assert persist_calls == [], ( + "overflow-retry-failed sample must NOT be persisted when the " + "column already has a richer sampled text" + ) + + +@pytest.mark.asyncio +async def test_ensure_fresh_returns_input_when_profile_returns_none( + sqlite_setup, monkeypatch, +) -> None: + """When ``profile_column`` returns None (PK / hidden / failed query), + the helper returns the INPUT column unchanged and skips persistence.""" + engine, storage = sqlite_setup + model = await storage.get_model("orders", data_source="ds") + col = model.get_column("status") + assert col is not None + col.sampled = None + col.sampled_values = None + col.distinct_count = None + + async def returns_none(**_kwargs): # NOSONAR(S7503) — required async signature: monkeypatches profile_column (async) + return None + + persist_calls = {"n": 0} + + async def counting_persist(**_kwargs): # NOSONAR(S7503) — required async signature: monkeypatches update_column_sampled (async) + persist_calls["n"] += 1 + + monkeypatch.setattr("slayer.engine.profiling.profile_column", returns_none) + monkeypatch.setattr(storage, "update_column_sampled", counting_persist) + + result = await ensure_column_sample_fresh( + model=model, column=col, engine=engine, storage=storage, + ) + assert result is col + assert persist_calls["n"] == 0 + + +@pytest.mark.asyncio +async def test_ensure_fresh_returns_input_when_profile_raises( + sqlite_setup, monkeypatch, caplog, +) -> None: + """Best-effort: a raised ``profile_column`` exception is logged and + swallowed; the helper returns the INPUT column. Caller renders with + whatever was cached (or no sample-values line at all).""" + engine, storage = sqlite_setup + model = await storage.get_model("orders", data_source="ds") + col = model.get_column("status") + assert col is not None + col.sampled = None + col.sampled_values = None + col.distinct_count = None + + async def explodes(**_kwargs): # NOSONAR(S7503) — required async signature: monkeypatches profile_column (async) + raise RuntimeError("simulated profile failure") + + persist_calls: list = [] + original_persist = storage.update_column_sampled + + async def tracking_persist(**kwargs): + persist_calls.append(kwargs) + return await original_persist(**kwargs) + + monkeypatch.setattr("slayer.engine.profiling.profile_column", explodes) + monkeypatch.setattr(storage, "update_column_sampled", tracking_persist) + + import logging + with caplog.at_level(logging.WARNING, logger="slayer.engine.profiling"): + result = await ensure_column_sample_fresh( + model=model, column=col, engine=engine, storage=storage, + ) + assert result is col + # Codex round-3 finding #7: persist must NOT be attempted on profile + # failure. A wrong helper that swallows the failure but still calls + # ``update_column_sampled(sampled_values=None, ...)`` would clobber any + # stale cache with permanent None and pass a "returns input" assertion. + assert persist_calls == [], ( + "profile failure must NOT trigger update_column_sampled; " + "a wrong implementation that writes None on failure would clobber" + ) + # Logged with model/datasource/column context for observability. + assert any( + "status" in rec.getMessage() and "orders" in rec.getMessage() + for rec in caplog.records + ), "helper must log profile failure with context" + + +@pytest.mark.asyncio +async def test_ensure_fresh_swallows_persist_failure_returns_refreshed( + sqlite_setup, monkeypatch, caplog, +) -> None: + """If profile succeeds but ``storage.update_column_sampled`` raises, the + helper returns the IN-MEMORY refreshed column (so the caller can still + render fresh data this call) and logs the persist failure. Subsequent + calls will retry — the cache predicate still flags it stale because the + persist never landed.""" + engine, storage = sqlite_setup + model = await storage.get_model("orders", data_source="ds") + col = model.get_column("status") + assert col is not None + col.sampled = None + col.sampled_values = None + col.distinct_count = None + + async def boom_persist(**_kwargs): # NOSONAR(S7503) — required async signature: monkeypatches update_column_sampled (async) + raise RuntimeError("simulated persist failure") + + monkeypatch.setattr(storage, "update_column_sampled", boom_persist) + + import logging + with caplog.at_level(logging.WARNING, logger="slayer.engine.profiling"): + result = await ensure_column_sample_fresh( + model=model, column=col, engine=engine, storage=storage, + ) + # Fresh data is in-memory even though persist failed. + assert result.sampled_values is not None + assert result.distinct_count is not None + # And the persist failure was logged. + assert any( + "persist" in rec.getMessage().lower() or "update_column" in rec.getMessage() + for rec in caplog.records + ), "helper must log persist failure with context" + + +@pytest.mark.asyncio +async def test_ensure_fresh_skips_numeric_temporal( + sqlite_setup, monkeypatch, +) -> None: + """Numeric/temporal columns are out of scope for the helper. inspect_model + handles them via the batched min/max path. The helper returns the input + column unchanged regardless of cache state.""" + engine, storage = sqlite_setup + model = await storage.get_model("orders", data_source="ds") + col = model.get_column("amount") # DOUBLE + assert col is not None + col.sampled = None # stale numeric + + profile_calls = {"n": 0} + + async def counting_profile(**kwargs): # noqa: ARG001 # NOSONAR(S7503) — required async signature: monkeypatches profile_column (async) + profile_calls["n"] += 1 + return None + + monkeypatch.setattr("slayer.engine.profiling.profile_column", counting_profile) + + result = await ensure_column_sample_fresh( + model=model, column=col, engine=engine, storage=storage, + ) + assert result is col + assert profile_calls["n"] == 0, ( + "numeric/temporal columns must not trigger the helper's profile call" + ) + + +@pytest.mark.asyncio +async def test_ensure_fresh_skips_hidden_and_primary_key( + sqlite_setup, monkeypatch, +) -> None: + """Hidden / PK columns are never profiled. ``_is_sample_cached`` returns + True for them by convention; the helper short-circuits via the cache check. + + Codex round-3 finding #8: assert profile_column and update_column_sampled + are never called. The previous shape (input-equals-output) would pass for + a wrong helper that calls profile_column → gets None → returns input.""" + engine, storage = sqlite_setup + model = await storage.get_model("orders", data_source="ds") + + profile_calls: list = [] + + async def counting_profile(**kwargs): # NOSONAR(S7503) — required async signature: monkeypatches profile_column (async) + col_kw = kwargs.get("column") + if col_kw is not None: + profile_calls.append(col_kw.name) + return None + + persist_calls: list = [] + original_persist = storage.update_column_sampled + + async def counting_persist(**kwargs): # NOSONAR(S7503) — required async signature: monkeypatches update_column_sampled (async) + persist_calls.append(kwargs.get("column_name")) + return await original_persist(**kwargs) + + monkeypatch.setattr( + "slayer.engine.profiling.profile_column", counting_profile, + ) + monkeypatch.setattr(storage, "update_column_sampled", counting_persist) + + pk_col = model.get_column("id") + assert pk_col is not None + assert pk_col.primary_key is True + + result = await ensure_column_sample_fresh( + model=model, column=pk_col, engine=engine, storage=storage, + ) + assert result is pk_col + assert "id" not in profile_calls, ( + "PK columns must short-circuit BEFORE profile_column is called" + ) + assert "id" not in persist_calls + + # Hidden column. + hidden = Column(name="hidden_one", type=DataType.TEXT, hidden=True) + model.columns.append(hidden) + await storage.save_model(model) + refreshed_model = await storage.get_model("orders", data_source="ds") + hidden_col = refreshed_model.get_column("hidden_one") + assert hidden_col is not None + result_hidden = await ensure_column_sample_fresh( + model=refreshed_model, column=hidden_col, + engine=engine, storage=storage, + ) + assert result_hidden is hidden_col + assert "hidden_one" not in profile_calls, ( + "hidden columns must short-circuit BEFORE profile_column is called" + ) + assert "hidden_one" not in persist_calls + + +@pytest.mark.asyncio +async def test_ensure_fresh_does_not_hard_gate_sql_mode( + sqlite_setup, monkeypatch, +) -> None: + """Codex finding #3: helper must NOT early-return on sql-mode / + query-backed models — inspect_model historically calls ``profile_column`` + on those without a ``_is_table_backed`` gate. Let ``profile_column`` + decide; it returns None for unsupported shapes naturally.""" + engine, storage = sqlite_setup + # Construct a sql-mode model directly (not table-backed). + sql_model = SlayerModel( + name="sql_orders", + sql="SELECT * FROM orders", + data_source="ds", + columns=[Column(name="status", type=DataType.TEXT)], + ) + col = sql_model.get_column("status") + assert col is not None + col.sampled = None + col.sampled_values = None + col.distinct_count = None + + profile_calls = {"n": 0} + + async def counting_profile(*, model, column, engine): # noqa: ARG001 # NOSONAR(S7503) — required async signature: monkeypatches profile_column (async) + profile_calls["n"] += 1 + return None # simulate "profile didn't find anything" + + monkeypatch.setattr("slayer.engine.profiling.profile_column", counting_profile) + + await ensure_column_sample_fresh( + model=sql_model, column=col, engine=engine, storage=storage, + ) + assert profile_calls["n"] == 1, ( + "helper must reach profile_column even for sql-mode models; " + "the gate is at profile_column, not at the helper" + ) diff --git a/tests/test_search_render.py b/tests/test_search_render.py index b736749f..e797d7da 100644 --- a/tests/test_search_render.py +++ b/tests/test_search_render.py @@ -16,6 +16,8 @@ from __future__ import annotations +import json + from slayer.core.enums import DataType from slayer.core.models import ( Aggregation, @@ -191,8 +193,14 @@ def test_column_text_includes_per_field_metadata() -> None: def test_column_text_includes_sampled_when_present() -> None: + """Legacy ``sampled``-only fallback path: when ``sampled_values is None`` + the renderer falls back to the persisted text. Used by numeric columns + and pre-DEV-1480 legacy data.""" m = _make_orders_model() col = m.get_column("status") + assert col is not None + # Wipe sampled_values to force the fallback path. + col.sampled_values = None text = render_column_text(model=m, column=col) assert "paid, refunded, cancelled" in text @@ -202,69 +210,201 @@ def test_column_text_omits_sampled_when_absent() -> None: col = m.get_column("id") text = render_column_text(model=m, column=col) assert "Sample values" not in text + assert "Distinct count" not in text assert "None" not in text -def test_column_text_omits_sampled_when_empty_string() -> None: - """DEV-1480: an all-NULL profiled categorical column has ``sampled=""``. - The render must skip the ``Sample values:`` line for empty strings so - the embedded text doesn't get a bare ``Sample values: `` trailer.""" +def test_column_text_omits_sampled_when_empty_string_and_no_list() -> None: + """DEV-1480: an all-NULL profiled categorical column persisted pre-DEV-1516 + has ``sampled=""`` and ``sampled_values=None``. Renderer must skip the + ``Sample values:`` line for the empty-string fallback so the embedded text + doesn't get a bare ``Sample values: `` trailer.""" m = _make_orders_model() col = m.get_column("status") + assert col is not None col.sampled = "" + col.sampled_values = None + col.distinct_count = None + text = render_column_text(model=m, column=col) + assert "Sample values" not in text + + +def test_column_text_renders_full_sampled_values_when_present() -> None: + """DEV-1516: when ``sampled_values`` is populated, the renderer surfaces + the full list (typically 50) instead of falling back to the 20-truncated + ``sampled`` text. This is the per-column-data-pull contract. + + CodeRabbit thread 2: the list is JSON-encoded so values containing + commas survive unambiguously to the consumer.""" + m = _make_orders_model() + col = m.get_column("status") + assert col is not None + fifty = [f"v{i:02d}" for i in range(50)] + # Set ``sampled`` to the legacy 20-truncated text to prove the list wins. + col.sampled = ", ".join(fifty[:20]) + col.sampled_values = fifty + col.distinct_count = 50 + text = render_column_text(model=m, column=col) + expected_line = f"Sample values: {json.dumps(fifty, ensure_ascii=False)}" + assert expected_line in text + # Last value must appear — proves it's not the 20-truncated fallback. + assert "v49" in text + # Order is preserved. + v00_pos = text.index('"v00"') + v49_pos = text.index('"v49"') + assert v00_pos < v49_pos + + +def test_column_text_overflow_appends_distinct_count_line() -> None: + """DEV-1516: on overflow (distinct_count > len(sampled_values)) the + renderer emits a follow-up ``Distinct count: N`` line so the agent can + tell the column has more values than the top-50 we returned.""" + m = _make_orders_model() + col = m.get_column("status") + assert col is not None + fifty = [f"v{i:02d}" for i in range(50)] + col.sampled = "v00, v01, v02 ... (12345 distinct)" + col.sampled_values = fifty + col.distinct_count = 12345 + text = render_column_text(model=m, column=col) + # All 50 values rendered in the JSON-encoded list form. + assert json.dumps(fifty, ensure_ascii=False) in text + # Distinct-count line present. + assert "Distinct count: 12345" in text + # Order: sample-values line comes first, distinct-count line after. + assert text.index("Sample values:") < text.index("Distinct count:") + + +def test_column_text_no_distinct_line_when_count_equals_len_sampled_values() -> None: + """DEV-1516: emit the ``Distinct count`` line only when STRICTLY greater + than ``len(sampled_values)``. Equal means we returned every value; no + need to hint at more.""" + m = _make_orders_model() + col = m.get_column("status") + assert col is not None + col.sampled_values = ["paid", "refunded", "cancelled"] + col.distinct_count = 3 + col.sampled = "paid, refunded, cancelled" + text = render_column_text(model=m, column=col) + assert 'Sample values: ["paid", "refunded", "cancelled"]' in text + assert "Distinct count" not in text + + +def test_column_text_no_distinct_line_when_distinct_count_is_none() -> None: + """DEV-1516: distinct_count line only fires when distinct_count is set.""" + m = _make_orders_model() + col = m.get_column("status") + assert col is not None + col.sampled_values = ["paid", "refunded"] + col.distinct_count = None + col.sampled = "paid, refunded" + text = render_column_text(model=m, column=col) + assert 'Sample values: ["paid", "refunded"]' in text + assert "Distinct count" not in text + + +def test_column_text_sampled_values_with_commas_survive_intact() -> None: + """CodeRabbit thread 2: values containing commas (e.g. ``"R$ 1,000–3,000"``) + must survive intact in the rendered text — that's why we switched from + comma-join to JSON encoding. The structured ``sampled_values`` field is + the unambiguous channel; the rendered EntityHit.text must preserve it.""" + m = _make_orders_model() + col = m.get_column("status") + assert col is not None + col.sampled_values = ["R$ 1,000–3,000", "R$ 3,001–5,000", "Other"] + col.distinct_count = 3 + text = render_column_text(model=m, column=col) + # The literal commas inside each value must appear inside quotes — + # JSON encoding makes them unambiguous from the list separator. + assert '"R$ 1,000–3,000"' in text + assert '"R$ 3,001–5,000"' in text + assert '"Other"' in text + + +def test_column_text_authoritative_empty_list_wins_over_stale_sampled() -> None: + """DEV-1516 / codex finding #3: ``sampled_values=[]`` is authoritative + (we ran the profile and there were no values). It must NOT fall back to + a stale ``sampled`` text — that would surface ghost values the agent + would mistake for real data. The empty list also means the renderer + must SKIP the ``Sample values:`` line entirely — emitting a bare + ``Sample values: `` trailer would leak into the search index and tantivy + would parse the trailing colon as noise.""" + m = _make_orders_model() + col = m.get_column("status") + assert col is not None col.sampled_values = [] + col.sampled = "stale_value, leftover" # would-be fallback; must NOT win col.distinct_count = 0 text = render_column_text(model=m, column=col) + # Stale text MUST NOT leak. + assert "stale_value" not in text + assert "leftover" not in text + # Empty-list contract: NO ``Sample values:`` line at all (no bare trailer). assert "Sample values" not in text + # No distinct-count line (0 == len([])). + assert "Distinct count" not in text -def test_column_text_includes_overflow_sampled_with_total() -> None: - """DEV-1480 overflow case: ``sampled`` carries the top-20 + total suffix. - The text rendering must include the suffix so the embedded doc surfaces - the true cardinality at search time.""" +def test_column_text_overflow_legacy_fallback_no_duplicate_distinct_line() -> None: + """DEV-1516 / codex finding #4: a legacy categorical column persisted + pre-DEV-1480 has ``sampled_values=None`` and the overflow suffix baked + into the ``sampled`` text (``"... (N distinct)"``). The new + distinct-count line must NOT fire on that fallback path — it would + duplicate the suffix.""" m = _make_orders_model() col = m.get_column("status") + assert col is not None + col.sampled_values = None col.sampled = "a, b, c ... (1234 distinct)" + col.distinct_count = 1234 text = render_column_text(model=m, column=col) + # Fallback line present. assert "Sample values: a, b, c ... (1234 distinct)" in text + # No extra "Distinct count:" line that would double-report 1234. + assert "Distinct count" not in text -def test_column_text_unchanged_by_sampled_values_field() -> None: - """DEV-1480: only ``sampled`` (the text string) is embedded. Adding the - structured ``sampled_values`` list must NOT change the rendered text and - therefore must NOT bump the content_hash.""" +def test_column_text_preserves_value_order() -> None: + """The renderer must NOT sort or shuffle ``sampled_values`` — they're + already ordered by frequency (most common first) and the agent relies + on that ordering to identify dominant values.""" m = _make_orders_model() - col_with_list = m.get_column("status") - col_with_list.sampled_values = ["paid", "refunded", "cancelled"] - col_with_list.distinct_count = 3 - text_with_list = render_column_text(model=m, column=col_with_list) - - # Build a sibling column identical except for the structured field. - plain = Column( - name=col_with_list.name, - type=col_with_list.type, - description=col_with_list.description, - sampled=col_with_list.sampled, - ) - text_plain = render_column_text(model=m, column=plain) - assert text_with_list == text_plain + col = m.get_column("status") + assert col is not None + col.sampled_values = ["z_dominant", "a_minor", "m_middle"] + col.distinct_count = 3 + text = render_column_text(model=m, column=col) + expected = 'Sample values: ["z_dominant", "a_minor", "m_middle"]' + assert expected in text -def test_column_text_unchanged_by_distinct_count_field() -> None: - """DEV-1480: ``distinct_count`` is metadata; not embedded.""" +def test_column_text_both_none_emits_no_sample_lines() -> None: + """Defensive: when both ``sampled`` and ``sampled_values`` are None + (unprofiled column), the renderer must emit neither the sample-values + line nor the distinct-count line.""" m = _make_orders_model() - col_with_count = m.get_column("status") - col_with_count.distinct_count = 999 - text_with = render_column_text(model=m, column=col_with_count) - - plain = Column( - name=col_with_count.name, - type=col_with_count.type, - description=col_with_count.description, - sampled=col_with_count.sampled, - ) - text_plain = render_column_text(model=m, column=plain) - assert text_with == text_plain + col = m.get_column("status") + assert col is not None + col.sampled = None + col.sampled_values = None + col.distinct_count = None + text = render_column_text(model=m, column=col) + assert "Sample values" not in text + assert "Distinct count" not in text + + +def test_column_text_includes_overflow_sampled_with_total_legacy_path() -> None: + """DEV-1480 overflow legacy path: when ``sampled_values is None`` the + renderer falls back to the persisted ``sampled`` text which already + carries the ``... (N distinct)`` suffix. Kept to pin the fallback shape + for pre-DEV-1480 / count_distinct-failed retry rows.""" + m = _make_orders_model() + col = m.get_column("status") + assert col is not None + col.sampled_values = None + col.sampled = "a, b, c ... (1234 distinct)" + text = render_column_text(model=m, column=col) + assert "Sample values: a, b, c ... (1234 distinct)" in text def test_column_text_excludes_meta() -> None: diff --git a/tests/test_search_service.py b/tests/test_search_service.py index 01bbf661..4c108fc9 100644 --- a/tests/test_search_service.py +++ b/tests/test_search_service.py @@ -20,8 +20,10 @@ from __future__ import annotations +import asyncio +import sqlite3 import tempfile -from typing import AsyncIterator +from typing import AsyncIterator, Tuple import pytest import pytest_asyncio @@ -29,6 +31,8 @@ from slayer.core.enums import DataType from slayer.core.models import Column, DatasourceConfig, ModelMeasure, SlayerModel from slayer.core.query import SlayerQuery +from slayer.engine import profiling as _profiling_mod +from slayer.engine.query_engine import SlayerQueryEngine from slayer.search.service import ( EntityHit, ExampleQueryHit, @@ -491,3 +495,471 @@ async def test_memory_hit_no_longer_carries_query_field() -> None: field has moved to `ExampleQueryHit`.""" assert "query" not in MemoryHit.model_fields assert "query" in ExampleQueryHit.model_fields + + +# --------------------------------------------------------------------------- +# DEV-1516: stale-sample auto-refresh on column hits +# --------------------------------------------------------------------------- + + +@pytest_asyncio.fixture +async def stale_setup() -> AsyncIterator[Tuple[StorageBackend, SlayerQueryEngine]]: + """SQLite-backed storage + engine with a populated ``orders`` table + AND a stored ``orders`` model whose ``status`` column has stale + sample-value data (``sampled_values=None``).""" + with tempfile.TemporaryDirectory() as tmpdir: + db_file = f"{tmpdir}/data.db" + conn = sqlite3.connect(db_file) + conn.execute( + "CREATE TABLE orders (id INTEGER PRIMARY KEY, amount REAL, status TEXT)" + ) + # Populate with two distinct statuses so distinct_count is meaningful. + rows = [ + (i, float(i), "paid" if i % 2 == 0 else "refunded") + for i in range(1, 21) + ] + conn.executemany("INSERT INTO orders VALUES (?, ?, ?)", rows) + conn.commit() + conn.close() + + storage = resolve_storage(f"{tmpdir}/storage") + await storage.save_datasource(DatasourceConfig( + name="warehouse", type="sqlite", database=db_file, + )) + await storage.save_model(SlayerModel( + name="orders", + sql_table="orders", + data_source="warehouse", + description="Checkout orders fixture.", + columns=[ + Column(name="id", type=DataType.INT, primary_key=True), + Column(name="amount", type=DataType.DOUBLE), + # status: stale — sampled_values intentionally None. + Column(name="status", type=DataType.TEXT, + description="Order status."), + ], + )) + # Add a memory so search() recency/channel-1 corpora are non-empty. + await storage.save_memory( + learning="status='paid' captures completed orders.", + entities=["warehouse.orders.status"], + ) + engine = SlayerQueryEngine(storage=storage) + yield storage, engine + + +@pytest.mark.asyncio +async def test_search_service_accepts_engine_kwarg( + stale_setup: Tuple[StorageBackend, SlayerQueryEngine], +) -> None: + """DEV-1516: SearchService now accepts an optional ``engine=`` kwarg so + the search path can run the sample-refresh helper. Codex finding #1. + Also pins that the supplied engine actually lands on the instance — + a bare ``is not None`` check on the constructor result was tautological.""" + storage, engine = stale_setup + svc = SearchService(storage=storage, engine=engine) + assert svc._engine is engine + + +@pytest.mark.asyncio +async def test_search_service_engine_default_none_works( + storage_with_corpus: StorageBackend, +) -> None: + """The ``engine`` kwarg defaults to None so existing callers (and the + storage-only test fixtures) keep working unchanged.""" + svc = SearchService(storage=storage_with_corpus) + # And search() must still succeed when engine is None — refresh is just + # skipped. + response = await svc.search( + entities=["warehouse.orders.status"], + max_entities=5, + ) + assert response is not None + + +@pytest.mark.asyncio +async def test_search_refreshes_stale_categorical_column_hit( + stale_setup: Tuple[StorageBackend, SlayerQueryEngine], +) -> None: + """DEV-1516 core contract: when search returns a stale categorical + column EntityHit, the helper fires, persists, and the EntityHit.text + surfaces the freshly-profiled sample values.""" + storage, engine = stale_setup + svc = SearchService(storage=storage, engine=engine) + response = await svc.search( + entities=["warehouse.orders.status"], + max_entities=5, + ) + column_hits = [h for h in response.entities if h.kind == "column"] + assert column_hits, "expected the status column to surface as an entity hit" + status_hit = next( + h for h in column_hits if h.id == "warehouse.orders.status" + ) + # The stale text would not have any sample values. The refreshed text + # must surface "paid" and "refunded" (the only two values in the DB). + assert "paid" in status_hit.text + assert "refunded" in status_hit.text + # And persistence happened — reloading the model shows populated cache. + reloaded = await storage.get_model("orders", data_source="warehouse") + assert reloaded is not None + status_col = reloaded.get_column("status") + assert status_col is not None + assert status_col.sampled_values is not None + assert set(status_col.sampled_values) == {"paid", "refunded"} + + +@pytest.mark.asyncio +async def test_search_refreshes_stale_column_hit_via_question_corpus( + stale_setup: Tuple[StorageBackend, SlayerQueryEngine], +) -> None: + """Codex round-3 finding #2: refresh must also apply to column hits that + surface via the question/corpus path (channels 2 & 3), not only the + named-entity (channel-1) path. A wrong implementation that only refreshes + inside the named-lookup branch would pass the entities-only test but + miss this one. + + Uses a question that the tantivy corpus matches against the stored + ``status`` column (description ``"Order status."``). The column has + stale ``sampled_values=None`` from the fixture.""" + storage, engine = stale_setup + svc = SearchService(storage=storage, engine=engine) + response = await svc.search( + question="order status", + max_entities=5, + ) + column_hits = [ + h for h in response.entities + if h.kind == "column" and h.id == "warehouse.orders.status" + ] + assert column_hits, ( + "expected the status column to surface via the question/corpus path" + ) + text = column_hits[0].text + # Refreshed text must include the live values. + assert "paid" in text and "refunded" in text, ( + "question-path column hit must also receive the post-fusion refresh" + ) + # And the refresh persisted to storage. + reloaded = await storage.get_model("orders", data_source="warehouse") + assert reloaded is not None + status_col = reloaded.get_column("status") + assert status_col is not None + assert status_col.sampled_values is not None + + +@pytest.mark.asyncio +async def test_search_no_refresh_when_engine_is_none( + stale_setup: Tuple[StorageBackend, SlayerQueryEngine], +) -> None: + """Without an engine the refresh hook is a no-op. The hit still + surfaces (with stale text); storage stays untouched.""" + storage, _ = stale_setup + svc = SearchService(storage=storage) # no engine + response = await svc.search( + entities=["warehouse.orders.status"], + max_entities=5, + ) + # The hit may or may not include "paid" depending on what the stale + # corpus rendered, but storage MUST NOT have been written to. + reloaded = await storage.get_model("orders", data_source="warehouse") + assert reloaded is not None + status_col = reloaded.get_column("status") + assert status_col is not None + assert status_col.sampled_values is None, ( + "engine=None must skip the refresh; storage stays stale" + ) + # And no crash — response is returned normally. + assert response is not None + + +@pytest.mark.asyncio +async def test_search_stale_text_preserved_when_profile_raises( + stale_setup: Tuple[StorageBackend, SlayerQueryEngine], monkeypatch, +) -> None: + """If the helper's profile_column raises, the search hit falls back to + the original (stale) rendered text. No crash. Storage stays untouched. + + Codex round-3 finding #6: assert the hit text itself survives (not just + that storage is untouched), and finding #7: assert ``update_column_sampled`` + is never called when profile fails.""" + storage, engine = stale_setup + svc = SearchService(storage=storage, engine=engine) + + async def explodes(**_kwargs): + raise RuntimeError("simulated profile failure") + + persist_calls: list = [] + original_persist = storage.update_column_sampled + + async def tracking_persist(**kwargs): + persist_calls.append(kwargs) + return await original_persist(**kwargs) + + monkeypatch.setattr("slayer.engine.profiling.profile_column", explodes) + monkeypatch.setattr(storage, "update_column_sampled", tracking_persist) + + response = await svc.search( + entities=["warehouse.orders.status"], + max_entities=5, + ) + # Search still returned a response and the column hit still exists. + assert response is not None + status_hits = [ + h for h in response.entities if h.id == "warehouse.orders.status" + ] + assert status_hits, ( + "even on profile failure, the column hit must still surface in " + "the search response (with stale text)" + ) + # Hit text retains its baseline content — at minimum the canonical id + + # type line + description that the renderer always emits for a column. + text = status_hits[0].text + assert "warehouse.orders.status" in text + assert "Order status" in text # column description + # Storage stays stale. + reloaded = await storage.get_model("orders", data_source="warehouse") + assert reloaded is not None + status_col = reloaded.get_column("status") + assert status_col is not None + assert status_col.sampled_values is None + # And no persist call was attempted. + assert persist_calls == [], ( + "profile failure must NOT trigger update_column_sampled; " + "a wrong implementation that catches the failure and writes " + "None/empty would silently clobber the stale cache" + ) + + +@pytest.mark.asyncio +async def test_search_numeric_column_hit_not_refreshed( + stale_setup: Tuple[StorageBackend, SlayerQueryEngine], monkeypatch, +) -> None: + """The helper is categorical-only. A numeric column hit must NOT + trigger profile_column from the search refresh path. + + Codex round-3 finding #5: assert the profile-call recording explicitly + rather than relying only on "storage stays untouched". A wrong + implementation that calls ``profile_column`` for numeric and then + skips persistence would otherwise slip through.""" + storage, engine = stale_setup + svc = SearchService(storage=storage, engine=engine) + + profile_call_columns: list = [] + real_profile = _profiling_mod.profile_column + + async def counting_profile(*, model, column, engine): + profile_call_columns.append(column.name) + return await real_profile(model=model, column=column, engine=engine) + + monkeypatch.setattr( + "slayer.engine.profiling.profile_column", counting_profile, + ) + + response = await svc.search( + entities=["warehouse.orders.amount"], + max_entities=5, + ) + # The helper short-circuits for numeric/temporal columns; profile_column + # must NEVER be invoked for the ``amount`` column from the search hook. + assert "amount" not in profile_call_columns, ( + f"numeric column must NOT trigger profile_column from search refresh; " + f"got calls for columns: {profile_call_columns}" + ) + # And storage for ``amount`` stays untouched. + reloaded = await storage.get_model("orders", data_source="warehouse") + assert reloaded is not None + amount_col = reloaded.get_column("amount") + assert amount_col is not None + assert amount_col.sampled is None, ( + "numeric column must NOT be refreshed by the search hook" + ) + assert response is not None + + +@pytest.mark.asyncio +async def test_search_per_model_serialization_concurrent_hits( + stale_setup: Tuple[StorageBackend, SlayerQueryEngine], monkeypatch, +) -> None: + """Codex finding #2: storage.update_column_sampled does a model-level + read-modify-write. Two stale-column hits on the SAME model must NOT + run in parallel — that would race and one column's update would lose. + Per-model serialization is required; cross-model parallelism is fine.""" + storage, engine = stale_setup + # Add another stale categorical column to the SAME model so two + # hits on the same model would trigger parallel persist if not + # serialized. + model = await storage.get_model("orders", data_source="warehouse") + assert model is not None + model.columns.append( + Column(name="region", type=DataType.TEXT, description="Region of sale."), + ) + await storage.save_model(model) + # Add a region column to the underlying table. (Without it, the + # profile query will return no rows but the column will still be + # in the refresh path.) + ds = await storage.get_datasource("warehouse") + assert ds is not None + assert ds.database is not None + conn = sqlite3.connect(ds.database) + try: + conn.execute("ALTER TABLE orders ADD COLUMN region TEXT DEFAULT 'EMEA'") + conn.commit() + finally: + conn.close() + # Track the order of update_column_sampled calls per model. Capture + # the column name and a start/stop marker so we can detect overlap. + persist_events: list = [] + original = storage.update_column_sampled + + async def tracking_persist(**kwargs): + persist_events.append(("start", kwargs.get("model_name"), kwargs.get("column_name"))) + # tiny sleep to widen any race window + await asyncio.sleep(0.01) + result = await original(**kwargs) + persist_events.append(("stop", kwargs.get("model_name"), kwargs.get("column_name"))) + return result + + monkeypatch.setattr(storage, "update_column_sampled", tracking_persist) + + svc = SearchService(storage=storage, engine=engine) + response = await svc.search( + entities=[ + "warehouse.orders.status", + "warehouse.orders.region", + ], + max_entities=5, + ) + assert response is not None + + # Codex round-3 finding #3: assert BOTH columns produced persist + # events. Otherwise the test could pass with only one column refreshed + # (no overlap by accident, not by design). + started_columns = { + col_name for ev, _mn, col_name in persist_events if ev == "start" + } + assert "status" in started_columns, ( + "expected status to be refreshed and persisted" + ) + assert "region" in started_columns, ( + "expected region to be refreshed and persisted" + ) + + # And BOTH columns reload with populated sampled_values — proving both + # writes landed (no last-write-wins overwrite). + reloaded2 = await storage.get_model("orders", data_source="warehouse") + assert reloaded2 is not None + status_col = reloaded2.get_column("status") + region_col = reloaded2.get_column("region") + assert status_col is not None + assert region_col is not None + assert status_col.sampled_values is not None, ( + "status refresh lost — codex finding #2 regression " + "(per-model serialization broken; one column's write overwrote the other)" + ) + assert region_col.sampled_values is not None, ( + "region refresh lost — codex finding #2 regression" + ) + + # Verify no two persist calls for the SAME model overlap. + open_for_model: dict = {} + for event, model_name, col_name in persist_events: + if event == "start": + assert open_for_model.get(model_name) is None, ( + f"persist for {model_name}.{col_name} overlaps with another " + f"persist on the same model " + f"(open={open_for_model[model_name]}). " + "Codex finding #2: per-model writes must serialize." + ) + open_for_model[model_name] = col_name + else: + open_for_model[model_name] = None + + +@pytest.mark.asyncio +async def test_search_cross_model_concurrency_is_allowed( + stale_setup: Tuple[StorageBackend, SlayerQueryEngine], monkeypatch, +) -> None: + """Codex round-3 finding #4: per the plan, refresh serializes within a + ``(data_source, model_name)`` group AND parallelizes across groups via + ``asyncio.gather``. Pin the cross-model parallelism — two stale columns + on DIFFERENT models must be allowed to refresh concurrently.""" + storage, engine = stale_setup + # Add a second table + stored model so we have two stale categorical + # columns on different models. + ds = await storage.get_datasource("warehouse") + assert ds is not None + assert ds.database is not None + conn = sqlite3.connect(ds.database) + try: + conn.execute( + "CREATE TABLE customers (id INTEGER PRIMARY KEY, region TEXT)" + ) + conn.executemany( + "INSERT INTO customers VALUES (?, ?)", + [(i, "EMEA" if i % 2 == 0 else "APAC") for i in range(1, 11)], + ) + conn.commit() + finally: + conn.close() + await storage.save_model(SlayerModel( + name="customers", + sql_table="customers", + data_source="warehouse", + description="Customers.", + columns=[ + Column(name="id", type=DataType.INT, primary_key=True), + Column(name="region", type=DataType.TEXT, + description="Customer region."), + ], + )) + + persist_events: list = [] + original = storage.update_column_sampled + + async def tracking_persist(**kwargs): + persist_events.append( + ("start", kwargs.get("model_name"), kwargs.get("column_name")) + ) + # Force overlap: hold the write open. If implementation does NOT + # parallelize across models, both writes still complete sequentially + # — which would pass the "no overlap" test but fail this one's + # "must overlap" check. + await asyncio.sleep(0.05) + result = await original(**kwargs) + persist_events.append( + ("stop", kwargs.get("model_name"), kwargs.get("column_name")) + ) + return result + + monkeypatch.setattr(storage, "update_column_sampled", tracking_persist) + + svc = SearchService(storage=storage, engine=engine) + response = await svc.search( + entities=[ + "warehouse.orders.status", + "warehouse.customers.region", + ], + max_entities=5, + ) + assert response is not None + # Detect overlap across DIFFERENT models. + open_models: dict = {} + cross_model_overlap_seen = False + for event, model_name, _col_name in persist_events: + if event == "start": + # Any other model already open? + others_open = [ + k for k, v in open_models.items() if v and k != model_name + ] + if others_open: + cross_model_overlap_seen = True + open_models[model_name] = True + else: + open_models[model_name] = False + + assert cross_model_overlap_seen, ( + "Cross-model refresh must run concurrently (codex round-3 #4). " + "Two columns on DIFFERENT models had their persist calls fully " + "sequenced — the plan's asyncio.gather across (data_source, model_name) " + "groups is not being honored." + ) diff --git a/tests/test_search_surfaces.py b/tests/test_search_surfaces.py index b71b0111..5726960a 100644 --- a/tests/test_search_surfaces.py +++ b/tests/test_search_surfaces.py @@ -98,6 +98,77 @@ async def test_mcp_search_tool_returns_json_with_three_lists( assert "warehouse.orders.amount_paid" in payload["resolved_input_entities"] +@pytest.mark.asyncio +async def test_mcp_search_path_wires_engine_into_search_service( + storage_with_corpus: StorageBackend, +) -> None: + """DEV-1516 codex finding #1: ``create_mcp_server`` constructs a + ``SearchService`` with an engine kwarg so the search-side refresh + actually fires in the MCP product path. This test catches a regression + where the engine wiring is dropped (the helper would silently no-op).""" + from slayer.mcp.server import create_mcp_server + from slayer.search.service import SearchService + + constructed: list = [] + + real_init = SearchService.__init__ + + def capturing_init(self, *args, **kwargs): + constructed.append(kwargs) + return real_init(self, *args, **kwargs) + + # Patch on the class so the MCP factory's `SearchService(storage=..., engine=...)` + # call funnels through us. + original = SearchService.__init__ + SearchService.__init__ = capturing_init # type: ignore[assignment] + try: + create_mcp_server(storage=storage_with_corpus) + finally: + SearchService.__init__ = original # type: ignore[assignment] + + assert constructed, "create_mcp_server should construct SearchService" + # At least one construction must include a non-None engine kwarg. + kw_lists = constructed + assert any( + kw.get("engine") is not None for kw in kw_lists + ), ( + "MCP wiring regression: SearchService constructed without engine. " + "Search-side sample-refresh would silently no-op." + ) + + +@pytest.mark.asyncio +async def test_rest_search_path_wires_engine_into_search_service( + storage_with_corpus: StorageBackend, +) -> None: + """REST counterpart of the MCP wiring test. ``create_app`` must also + pass the engine to SearchService.""" + from slayer.api.server import create_app + from slayer.search.service import SearchService + + constructed: list = [] + real_init = SearchService.__init__ + + def capturing_init(self, *args, **kwargs): + constructed.append(kwargs) + return real_init(self, *args, **kwargs) + + original = SearchService.__init__ + SearchService.__init__ = capturing_init # type: ignore[assignment] + try: + create_app(storage=storage_with_corpus) + finally: + SearchService.__init__ = original # type: ignore[assignment] + + assert constructed, "create_app should construct SearchService" + assert any( + kw.get("engine") is not None for kw in constructed + ), ( + "REST wiring regression: SearchService constructed without engine. " + "Search-side sample-refresh would silently no-op on REST calls." + ) + + @pytest.mark.asyncio async def test_mcp_search_friendly_warning_on_unknown_entity( storage_with_corpus: StorageBackend,