From 8b42fdb0c3aed9d6907e563010ebbb37b0c5a3b4 Mon Sep 17 00:00:00 2001 From: phernandez Date: Thu, 11 Jun 2026 23:54:41 -0500 Subject: [PATCH] fix(core): use (type, id) keys in vector search hydration to prevent id collisions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Root cause: entity, observation, and relation rows in search_index carry ids from independent auto-increment sequences, so rows of different types routinely share the same numeric id (guaranteed in young databases). _search_vector_only parsed each vector hit's chunk_key (e.g. 'entity:4:0') but discarded the type, and _fetch_search_index_rows_by_ids keyed its result dict by bare row.id with no type discrimination. Whichever row the database returned last clobbered the other in the dict; the clobbered hit then hydrated against the wrong row or found None and was silently dropped from results. The FTS-filter branch already guarded this with (id, type) tuples, but the primary vector lookup path and the hybrid fusion maps missed the same treatment. Fix: introduce a SearchIndexKey = tuple[str, int] alias and key every map in the vector/hybrid retrieval path by (type, id) — the similarity and chunk maps in _search_vector_only, the _fetch_search_index_rows_by_ids result, the FTS-filter allowed keys, and the rows/fts/vec/fused score maps in _search_hybrid. The SQL stays unchanged; bare ids are deduped before the IN query and rows are discriminated by row.type when building dict keys. Tests: end-to-end SQLite regression test indexes an entity row and a relation row sharing id 7, syncs vectors for both, and asserts vector search returns both rows (and that the entity survives a search_item_types filter); a hybrid fusion unit test asserts an entity and relation sharing id 1 stay distinct with single-source scores. Both fail without the fix. Existing mocked vector tests updated for tuple keys. Fixes #982 Co-Authored-By: Claude Fable 5 Signed-off-by: phernandez --- .../repository/search_repository_base.py | 95 +++++++++++-------- tests/repository/test_hybrid_fusion.py | 28 ++++++ .../test_sqlite_vector_search_repository.py | 91 ++++++++++++++++++ tests/repository/test_vector_pagination.py | 2 +- tests/repository/test_vector_threshold.py | 16 ++-- 5 files changed, 181 insertions(+), 51 deletions(-) diff --git a/src/basic_memory/repository/search_repository_base.py b/src/basic_memory/repository/search_repository_base.py index 39c032273..2e4e687a3 100644 --- a/src/basic_memory/repository/search_repository_base.py +++ b/src/basic_memory/repository/search_repository_base.py @@ -40,6 +40,11 @@ OVERSIZED_ENTITY_VECTOR_SHARD_SIZE = 256 _SQLITE_MAX_PREPARE_WINDOW = 8 +# Entity, observation, and relation rows in search_index carry ids from independent +# auto-increment sequences, so a bare id is ambiguous across row types. Every map in +# the vector/hybrid retrieval path must key rows by (type, id) to avoid collisions. +type SearchIndexKey = tuple[str, int] + @dataclass class VectorSyncBatchResult: @@ -1857,7 +1862,7 @@ async def _dispatch_retrieval_mode( # ------------------------------------------------------------------ @staticmethod - def _parse_chunk_key(chunk_key: str) -> tuple[str, int]: + def _parse_chunk_key(chunk_key: str) -> SearchIndexKey: """Parse a chunk_key like 'observation:5:0' into (type, search_index_id).""" parts = chunk_key.split(":") return parts[0], int(parts[1]) @@ -1932,26 +1937,27 @@ def _log_vector_summary() -> None: hydrate_start = time.perf_counter() # Build per-search_index_row similarity scores from chunk-level results. - # Each chunk_key encodes the search_index row type and id. + # Each chunk_key encodes the search_index row type and id; keep both as the + # key because different row types can share the same numeric id (#982). # Track the best similarity per row (for ranking) and all chunks (for context). - similarity_by_si_id: dict[int, float] = {} - chunks_by_si_id: dict[int, list[tuple[float, str]]] = {} + similarity_by_si_key: dict[SearchIndexKey, float] = {} + chunks_by_si_key: dict[SearchIndexKey, list[tuple[float, str]]] = {} for row in vector_rows: chunk_key = row.get("chunk_key", "") distance = float(row["best_distance"]) similarity = self._distance_to_similarity(distance) chunk_text = row.get("chunk_text", "") try: - _, si_id = self._parse_chunk_key(chunk_key) + si_key = self._parse_chunk_key(chunk_key) except (ValueError, IndexError): # Fallback: group by entity_id for chunks without parseable keys continue - current = similarity_by_si_id.get(si_id) + current = similarity_by_si_key.get(si_key) if current is None or similarity > current: - similarity_by_si_id[si_id] = similarity - chunks_by_si_id.setdefault(si_id, []).append((similarity, chunk_text)) + similarity_by_si_key[si_key] = similarity + chunks_by_si_key.setdefault(si_key, []).append((similarity, chunk_text)) - if not similarity_by_si_id: + if not similarity_by_si_key: hydrate_ms = (time.perf_counter() - hydrate_start) * 1000 _log_vector_summary() return [] @@ -1962,16 +1968,17 @@ def _log_vector_summary() -> None: min_similarity if min_similarity is not None else self._semantic_min_similarity ) if effective_min_similarity > 0.0: - similarity_by_si_id = { - k: v for k, v in similarity_by_si_id.items() if v >= effective_min_similarity + similarity_by_si_key = { + k: v for k, v in similarity_by_si_key.items() if v >= effective_min_similarity } - if not similarity_by_si_id: + if not similarity_by_si_key: hydrate_ms = (time.perf_counter() - hydrate_start) * 1000 _log_vector_summary() return [] - # Fetch the actual search_index rows - si_ids = list(similarity_by_si_id.keys()) + # Fetch the actual search_index rows. Colliding (type, id) keys share one + # bare id, so deduplicate while preserving first-seen order. + si_ids = list(dict.fromkeys(si_id for _, si_id in similarity_by_si_key)) search_index_rows = await self._fetch_search_index_rows_by_ids(si_ids) # Apply optional filters if requested @@ -2003,16 +2010,14 @@ def _log_vector_summary() -> None: limit=VECTOR_FILTER_SCAN_LIMIT, offset=0, ) - # Use (id, type) tuples to avoid collisions between different + # Use (type, id) tuples to avoid collisions between different # search_index row types that share the same auto-increment id. - allowed_keys = {(row.id, row.type) for row in filtered_rows if row.id is not None} - search_index_rows = { - k: v for k, v in search_index_rows.items() if (v.id, v.type) in allowed_keys - } + allowed_keys = {(row.type, row.id) for row in filtered_rows if row.id is not None} + search_index_rows = {k: v for k, v in search_index_rows.items() if k in allowed_keys} ranked_rows: list[SearchIndexRow] = [] - for si_id, similarity in similarity_by_si_id.items(): - row = search_index_rows.get(si_id) + for si_key, similarity in similarity_by_si_key.items(): + row = search_index_rows.get(si_key) if row is None: continue @@ -2022,7 +2027,7 @@ def _log_vector_summary() -> None: if content_snippet and len(content_snippet) <= SMALL_NOTE_CONTENT_LIMIT: matched_chunk_text = content_snippet else: - si_chunks = chunks_by_si_id.get(si_id, []) + si_chunks = chunks_by_si_key.get(si_key, []) si_chunks.sort(key=lambda c: c[0], reverse=True) top_texts = [text for _, text in si_chunks[:TOP_CHUNKS_PER_RESULT]] matched_chunk_text = "\n---\n".join(top_texts) if top_texts else None @@ -2088,8 +2093,12 @@ async def _fetch_entity_rows_by_ids(self, entity_ids: list[int]) -> dict[int, Se async def _fetch_search_index_rows_by_ids( self, row_ids: list[int] - ) -> dict[int, SearchIndexRow]: - """Fetch search_index rows by their primary key (id), any type.""" + ) -> dict[SearchIndexKey, SearchIndexRow]: + """Fetch search_index rows by id, keyed by (type, id) to disambiguate types. + + A bare id can match one row per type (independent id sequences), so the + result must carry every matching row rather than letting one clobber another. + """ if not row_ids: return {} placeholders = ",".join(f":id_{idx}" for idx in range(len(row_ids))) @@ -2106,11 +2115,11 @@ async def _fetch_search_index_rows_by_ids( WHERE project_id = :project_id AND id IN ({placeholders}) """ - result: dict[int, SearchIndexRow] = {} + result: dict[SearchIndexKey, SearchIndexRow] = {} async with db.scoped_session(self.session_maker) as session: row_result = await session.execute(text(sql), params) for row in row_result.fetchall(): - result[row.id] = SearchIndexRow( + result[(row.type, row.id)] = SearchIndexRow( project_id=self.project_id, id=row.id, title=row.title, @@ -2156,7 +2165,7 @@ async def _search_hybrid( ) -> List[SearchIndexRow]: """Fuse FTS and vector results using score-based fusion. - Uses search_index row id as the fusion key. The formula + Uses the search_index (type, id) pair as the fusion key. The formula ``max(vec, fts) + FUSION_BONUS * min(vec, fts)`` preserves the dominant signal and rewards dual-source agreement. """ @@ -2199,17 +2208,19 @@ async def _search_hybrid( vector_ms = (time.perf_counter() - vector_start) * 1000 fusion_start = time.perf_counter() - # --- Score-based fusion keyed on search_index row id --- + # --- Score-based fusion keyed on (type, id) --- + # A bare row id collides across row types (independent id sequences), so + # fusion must key on (type, id) or distinct rows would merge (#982). # FTS scores are normalized to [0, 1] (BM25 is unbounded). # Vector scores are used raw — already calibrated [0, 1] by _distance_to_similarity(). - rows_by_id: dict[int, SearchIndexRow] = {} + rows_by_key: dict[SearchIndexKey, SearchIndexRow] = {} # Normalize FTS scores to [0, 1] — handles both SQLite (negative bm25) # and Postgres (positive ts_rank) by using absolute values fts_abs = [abs(row.score or 0.0) for row in fts_results] fts_max = max(fts_abs) if fts_abs else 1.0 - fts_scores: dict[int, float] = {} + fts_scores: dict[SearchIndexKey, float] = {} for row in fts_results: if row.id is None: continue @@ -2217,32 +2228,32 @@ async def _search_hybrid( # Gate: FTS scores below threshold contribute zero if norm < FTS_GATE_THRESHOLD: norm = 0.0 - fts_scores[row.id] = norm - rows_by_id[row.id] = row + fts_scores[(row.type, row.id)] = norm + rows_by_key[(row.type, row.id)] = row - vec_scores: dict[int, float] = {} + vec_scores: dict[SearchIndexKey, float] = {} for row in vector_results: if row.id is None: continue # Trigger: no re-normalization by vec_max # Why: vector similarity is already calibrated [0, 1]; re-normalizing # inflates weak matches when the entire result set is mediocre - vec_scores[row.id] = row.score or 0.0 - rows_by_id[row.id] = row + vec_scores[(row.type, row.id)] = row.score or 0.0 + rows_by_key[(row.type, row.id)] = row # Fuse: max(v, f) + FUSION_BONUS * min(v, f) # Preserves the dominant signal; bonus rewards dual-source agreement. # Output range: [0, 1.3] for dual-source, [0, 1.0] for single-source. - fused_scores: dict[int, float] = {} - for row_id in fts_scores.keys() | vec_scores.keys(): - v = vec_scores.get(row_id, 0.0) - f = fts_scores.get(row_id, 0.0) - fused_scores[row_id] = max(v, f) + FUSION_BONUS * min(v, f) + fused_scores: dict[SearchIndexKey, float] = {} + for row_key in fts_scores.keys() | vec_scores.keys(): + v = vec_scores.get(row_key, 0.0) + f = fts_scores.get(row_key, 0.0) + fused_scores[row_key] = max(v, f) + FUSION_BONUS * min(v, f) ranked = sorted(fused_scores.items(), key=lambda item: item[1], reverse=True) output: list[SearchIndexRow] = [] - for row_id, fused_score in ranked[offset : offset + limit]: - row = rows_by_id[row_id] + for row_key, fused_score in ranked[offset : offset + limit]: + row = rows_by_key[row_key] # Trigger: FTS-only results have no matched_chunk_text from vector search. # Why: without chunk text, API falls back to truncated content, losing answer text. # Outcome: FTS-only results get full content_snippet as matched_chunk. diff --git a/tests/repository/test_hybrid_fusion.py b/tests/repository/test_hybrid_fusion.py index 7ae8a9e01..0a4a2c290 100644 --- a/tests/repository/test_hybrid_fusion.py +++ b/tests/repository/test_hybrid_fusion.py @@ -228,6 +228,34 @@ async def test_zero_score_produces_zero_fused(): assert results[0].score == pytest.approx(0.0, rel=1e-6) +@pytest.mark.asyncio +async def test_cross_type_id_collision_keeps_both_results(): + """An entity and a relation sharing the same numeric id stay distinct (#982). + + search_index row types have independent id sequences, so fusing on a bare + row id merged unrelated rows into one result and dropped the other. + """ + repo = ConcreteSearchRepo() + + fts_results = [FakeRow(id=1, type="entity", score=5.0, title="entity-row")] + vector_results = [FakeRow(id=1, type="relation", score=0.8, title="relation-row")] + + with ( + patch.object(repo, "search", new_callable=AsyncMock, return_value=fts_results), + patch.object( + repo, "_search_vector_only", new_callable=AsyncMock, return_value=vector_results + ), + ): + results = await repo._search_hybrid(**HYBRID_KWARGS) + + assert {(r.type, r.id) for r in results} == {("entity", 1), ("relation", 1)} + # Single-source scores must not earn the dual-source fusion bonus across types. + entity_result = next(r for r in results if r.type == "entity") + relation_result = next(r for r in results if r.type == "relation") + assert entity_result.score == pytest.approx(1.0, rel=1e-6) + assert relation_result.score == pytest.approx(0.8, rel=1e-6) + + @pytest.mark.asyncio async def test_fts_only_result_gets_matched_chunk_from_content_snippet(): """FTS-only results should have matched_chunk_text populated from content_snippet.""" diff --git a/tests/repository/test_sqlite_vector_search_repository.py b/tests/repository/test_sqlite_vector_search_repository.py index 42df8cddd..9bcd72da7 100644 --- a/tests/repository/test_sqlite_vector_search_repository.py +++ b/tests/repository/test_sqlite_vector_search_repository.py @@ -76,6 +76,32 @@ def _entity_row( ) +def _relation_row( + *, + project_id: int, + row_id: int, + entity_id: int, + title: str, + permalink: str, + relation_type: str, +) -> SearchIndexRow: + now = datetime.now(timezone.utc) + return SearchIndexRow( + project_id=project_id, + id=row_id, + type=SearchItemType.RELATION.value, + title=title, + permalink=permalink, + file_path=f"{permalink}.md", + metadata=None, + entity_id=entity_id, + from_id=entity_id, + relation_type=relation_type, + created_at=now, + updated_at=now, + ) + + def _enable_semantic( search_repository: SQLiteSearchRepository, embedding_provider: StubEmbeddingProvider | None = None, @@ -498,6 +524,71 @@ async def test_sqlite_vector_search_returns_ranked_entities(search_repository): assert all(result.type == SearchItemType.ENTITY.value for result in results) +@pytest.mark.asyncio +async def test_sqlite_vector_search_survives_cross_type_id_collision(search_repository): + """Entity and relation rows sharing one numeric id must both hydrate (#982). + + Entity, observation, and relation rows carry ids from independent + auto-increment sequences, so search_index rows of different types routinely + share the same numeric id. Keying vector hydration by bare id collapsed + colliding hits into one dict slot and silently dropped the other result. + """ + if not isinstance(search_repository, SQLiteSearchRepository): + pytest.skip("sqlite-vec repository behavior is local SQLite-only.") + + _enable_semantic(search_repository) + await search_repository.init_search_index() + await search_repository.bulk_index_items( + [ + _entity_row( + project_id=search_repository.project_id, + row_id=7, + entity_id=701, + title="Auth Token Design", + permalink="specs/auth-token-design", + content_stems="auth token session login design", + ), + # Same numeric id as the entity row above, different row type. + _relation_row( + project_id=search_repository.project_id, + row_id=7, + entity_id=702, + title="login flow relates to auth token design", + permalink="specs/login-flow/relates-to/auth-token-design", + relation_type="relates_to", + ), + ] + ) + await search_repository.sync_entity_vectors(701) + await search_repository.sync_entity_vectors(702) + + results = await search_repository.search( + search_text="session token auth", + retrieval_mode=SearchRetrievalMode.VECTOR, + limit=5, + offset=0, + ) + + # Both rows match the query; both share id=7 and must survive hydration. + assert len(results) == 2 + assert {result.type for result in results} == { + SearchItemType.ENTITY.value, + SearchItemType.RELATION.value, + } + entity_result = next(r for r in results if r.type == SearchItemType.ENTITY.value) + assert entity_result.permalink == "specs/auth-token-design" + + # The type filter must keep the entity even though a relation shares its id. + filtered = await search_repository.search( + search_text="session token auth", + search_item_types=[SearchItemType.ENTITY], + retrieval_mode=SearchRetrievalMode.VECTOR, + limit=5, + offset=0, + ) + assert [r.permalink for r in filtered] == ["specs/auth-token-design"] + + @pytest.mark.asyncio async def test_sqlite_hybrid_search_combines_fts_and_vector(search_repository): """Hybrid mode fuses FTS and vector results with score-based fusion.""" diff --git a/tests/repository/test_vector_pagination.py b/tests/repository/test_vector_pagination.py index 21bc98a8d..09a001e3e 100644 --- a/tests/repository/test_vector_pagination.py +++ b/tests/repository/test_vector_pagination.py @@ -133,7 +133,7 @@ async def test_page1_scores_gte_page2_scores(): repo._embedding_provider = _EmbeddingProvider() - fake_index_rows = {i: FakeRow(id=i) for i in range(20)} + fake_index_rows = {("entity", i): FakeRow(id=i) for i in range(20)} async def run_page(offset, limit): with ( diff --git a/tests/repository/test_vector_threshold.py b/tests/repository/test_vector_threshold.py index 266ee9ae3..34a66d7d3 100644 --- a/tests/repository/test_vector_threshold.py +++ b/tests/repository/test_vector_threshold.py @@ -160,7 +160,7 @@ async def test_threshold_zero_returns_all(): repo, "_fetch_search_index_rows_by_ids", new_callable=AsyncMock, - return_value={i: FakeRow(id=i) for i in range(3)}, + return_value={("entity", i): FakeRow(id=i) for i in range(3)}, ), ): results = await repo._search_vector_only(**COMMON_SEARCH_KWARGS) @@ -192,7 +192,7 @@ async def test_threshold_filters_low_scores(): "_fetch_search_index_rows_by_ids", new_callable=AsyncMock, # Only entity_0 (score=0.9) passes the threshold; the fetch only gets id 0 - return_value={0: FakeRow(id=0)}, + return_value={("entity", 0): FakeRow(id=0)}, ), ): results = await repo._search_vector_only(**COMMON_SEARCH_KWARGS) @@ -255,7 +255,7 @@ async def test_per_query_min_similarity_overrides_instance_default(): repo, "_fetch_search_index_rows_by_ids", new_callable=AsyncMock, - return_value={i: FakeRow(id=i) for i in range(3)}, + return_value={("entity", i): FakeRow(id=i) for i in range(3)}, ), ): # Override to 0.0 → all results pass through despite instance default of 0.6 @@ -289,7 +289,7 @@ async def test_per_query_min_similarity_tightens_threshold(): "_fetch_search_index_rows_by_ids", new_callable=AsyncMock, # Only id=0 (score=0.9) will be fetched after filtering - return_value={0: FakeRow(id=0)}, + return_value={("entity", 0): FakeRow(id=0)}, ), ): # Override to 0.8 → only score=0.9 passes @@ -321,7 +321,7 @@ async def test_matched_chunk_text_populated_on_vector_results(): repo, "_fetch_search_index_rows_by_ids", new_callable=AsyncMock, - return_value={i: FakeRow(id=i) for i in range(2)}, + return_value={("entity", i): FakeRow(id=i) for i in range(2)}, ), ): results = await repo._search_vector_only(**COMMON_SEARCH_KWARGS) @@ -379,7 +379,7 @@ async def test_top_n_chunks_joined_in_matched_chunk_text(): repo, "_fetch_search_index_rows_by_ids", new_callable=AsyncMock, - return_value={0: FakeRow(id=0, content_snippet=large_content)}, + return_value={("entity", 0): FakeRow(id=0, content_snippet=large_content)}, ), ): results = await repo._search_vector_only(**COMMON_SEARCH_KWARGS) @@ -423,7 +423,7 @@ async def test_small_note_returns_full_content_as_matched_chunk(): repo, "_fetch_search_index_rows_by_ids", new_callable=AsyncMock, - return_value={0: FakeRow(id=0, content_snippet=small_content)}, + return_value={("entity", 0): FakeRow(id=0, content_snippet=small_content)}, ), ): results = await repo._search_vector_only(**COMMON_SEARCH_KWARGS) @@ -457,7 +457,7 @@ async def test_large_note_returns_chunks_not_full_content(): repo, "_fetch_search_index_rows_by_ids", new_callable=AsyncMock, - return_value={0: FakeRow(id=0, content_snippet=large_content)}, + return_value={("entity", 0): FakeRow(id=0, content_snippet=large_content)}, ), ): results = await repo._search_vector_only(**COMMON_SEARCH_KWARGS)