From f57f43ca3e655ec594944d3563ce0402452c5ae6 Mon Sep 17 00:00:00 2001 From: Ptah-CT <221234802+Ptah-CT@users.noreply.github.com> Date: Thu, 14 May 2026 22:10:51 +0000 Subject: [PATCH 1/2] fix(qdrant): route service-level vector repo lookups through env-aware factory MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR #5 made the service start with VECTOR_STORE_BACKEND=qdrant by gating the Milvus lifespan and lazifying BaseMilvusRepository's collection property. But the service-layer write/read path was still hard-coded to the Milvus repositories: 5 service classes either called ``EpisodicMemoryMilvusRepository()`` directly or asked the DI container for ``get_bean_by_type(EpisodicMemoryMilvusRepository)``. With the Milvus collections never loaded, every insert / search through that path raised ``ValueError("Collection instance not created, please call ensure_loaded() first")`` — which the calling services swallowed as ``ERROR`` logs while returning success with ``indexed=0``. Net effect: data continued to land in MongoDB, but never reached Qdrant, and search returned empty results. This commit introduces ``core/oxm/vector_backend_router.py`` — a thin factory with one ``get__repo()`` per memory type. Each factory reads ``VECTOR_STORE_BACKEND`` (case-insensitive, normalised) and resolves the appropriate Qdrant or Milvus repo via the DI container, preserving singleton bean scope. Direct instantiation is the fallback for unit-test contexts without a DI scan. Service-layer sites updated: - ``agentic_layer/search_mem_service.py`` — 4 hard-coded ``*MilvusRepository()`` direct instantiations now call the factory; field names keep their historical ``_milvus_`` token for backward compatibility with the rest of the service. - ``agentic_layer/memory_manager.py`` — the ``match mem_type`` switch that picked one of 5 ``MilvusRepository`` beans now routes to the factory. - ``agentic_layer/profile_search_service.py`` — the lazy ``milvus_repo`` property now resolves through ``get_user_profile_repo()``. - ``memory_layer/profile_indexer/profile_indexer.py`` — same lazy property, same factory call. - ``biz_layer/mem_sync.py`` — constructor defaults for ``foresight_milvus_repo`` and ``atomic_fact_milvus_repo`` now come from the factory. - ``biz_layer/mem_memorize.py`` — the inline ``agent_skill_milvus_repo`` lookup inside ``upsert_records`` sync now uses ``get_agent_skill_repo()``. Both backends already expose the same public surface (``vector_search``, ``create_and_save_*``, ``delete_by_*``), so callers need no further changes. --- .../src/agentic_layer/memory_manager.py | 21 +++- .../agentic_layer/profile_search_service.py | 10 +- .../src/agentic_layer/search_mem_service.py | 19 ++- .../EverCore/src/biz_layer/mem_memorize.py | 8 +- methods/EverCore/src/biz_layer/mem_sync.py | 13 +- .../src/core/oxm/vector_backend_router.py | 117 ++++++++++++++++++ .../profile_indexer/profile_indexer.py | 10 +- 7 files changed, 176 insertions(+), 22 deletions(-) create mode 100644 methods/EverCore/src/core/oxm/vector_backend_router.py diff --git a/methods/EverCore/src/agentic_layer/memory_manager.py b/methods/EverCore/src/agentic_layer/memory_manager.py index e6e93005..ccd134c2 100644 --- a/methods/EverCore/src/agentic_layer/memory_manager.py +++ b/methods/EverCore/src/agentic_layer/memory_manager.py @@ -671,18 +671,27 @@ async def get_vector_search_results( f"Query text vectorization completed, vector dimension: {len(query_vector_list)}" ) - # Select Milvus repository based on memory type + # Select vector repository based on memory type. Routed by + # VECTOR_STORE_BACKEND env via vector_backend_router; the local + # variable keeps the ``milvus_repo`` name only for legacy clarity. + from core.oxm.vector_backend_router import ( + get_agent_case_repo, + get_agent_skill_repo, + get_atomic_fact_repo, + get_episodic_repo, + get_foresight_repo, + ) match mem_type: case MemoryType.FORESIGHT: - milvus_repo = get_bean_by_type(ForesightMilvusRepository) + milvus_repo = get_foresight_repo() case MemoryType.ATOMIC_FACT: - milvus_repo = get_bean_by_type(AtomicFactMilvusRepository) + milvus_repo = get_atomic_fact_repo() case MemoryType.EPISODIC_MEMORY: - milvus_repo = get_bean_by_type(EpisodicMemoryMilvusRepository) + milvus_repo = get_episodic_repo() case MemoryType.AGENT_CASE: - milvus_repo = get_bean_by_type(AgentCaseMilvusRepository) + milvus_repo = get_agent_case_repo() case MemoryType.AGENT_SKILL: - milvus_repo = get_bean_by_type(AgentSkillMilvusRepository) + milvus_repo = get_agent_skill_repo() case _: raise ValueError(f"Unsupported memory type: {mem_type}") diff --git a/methods/EverCore/src/agentic_layer/profile_search_service.py b/methods/EverCore/src/agentic_layer/profile_search_service.py index a86f5de6..0d8b2621 100644 --- a/methods/EverCore/src/agentic_layer/profile_search_service.py +++ b/methods/EverCore/src/agentic_layer/profile_search_service.py @@ -97,9 +97,15 @@ def __init__( @property def milvus_repo(self) -> UserProfileMilvusRepository: - """Lazy load Milvus repository""" + """Lazy load the vector repository for the active backend. + + Named ``milvus_repo`` for caller-compatibility; the actual instance + is routed by ``VECTOR_STORE_BACKEND`` via + ``core.oxm.vector_backend_router.get_user_profile_repo()``. + """ if self._milvus_repo is None: - self._milvus_repo = get_bean_by_type(UserProfileMilvusRepository) + from core.oxm.vector_backend_router import get_user_profile_repo + self._milvus_repo = get_user_profile_repo() return self._milvus_repo async def search_profiles( diff --git a/methods/EverCore/src/agentic_layer/search_mem_service.py b/methods/EverCore/src/agentic_layer/search_mem_service.py index 9c25f919..e420faba 100644 --- a/methods/EverCore/src/agentic_layer/search_mem_service.py +++ b/methods/EverCore/src/agentic_layer/search_mem_service.py @@ -141,15 +141,24 @@ def __init__(self): # ES Repositories self.episodic_es_repo = EpisodicMemoryEsRepository() - # Milvus Repositories - self.episodic_milvus_repo = EpisodicMemoryMilvusRepository() - self.profile_milvus_repo = UserProfileMilvusRepository() + # Vector Repositories — routed by VECTOR_STORE_BACKEND env at + # construction time. Field names keep the historical ``_milvus_`` + # token for caller compatibility, but the actual backend is the + # one configured for this process (Qdrant or Milvus). + from core.oxm.vector_backend_router import ( + get_agent_case_repo, + get_agent_skill_repo, + get_episodic_repo, + get_user_profile_repo, + ) + self.episodic_milvus_repo = get_episodic_repo() + self.profile_milvus_repo = get_user_profile_repo() # Agent memory repositories self.agent_case_es_repo = AgentCaseEsRepository() self.agent_skill_es_repo = AgentSkillEsRepository() - self.agent_case_milvus_repo = AgentCaseMilvusRepository() - self.agent_skill_milvus_repo = AgentSkillMilvusRepository() + self.agent_case_milvus_repo = get_agent_case_repo() + self.agent_skill_milvus_repo = get_agent_skill_repo() # MongoDB raw repositories (for fetching full docs by id) self.episodic_raw_repo = EpisodicMemoryRawRepository() diff --git a/methods/EverCore/src/biz_layer/mem_memorize.py b/methods/EverCore/src/biz_layer/mem_memorize.py index be1adcbc..02f31144 100644 --- a/methods/EverCore/src/biz_layer/mem_memorize.py +++ b/methods/EverCore/src/biz_layer/mem_memorize.py @@ -660,9 +660,13 @@ async def _trigger_agent_skill_extraction( remove_ids = extraction_result.deleted_ids + updated_ids if upsert_records or remove_ids: - # Milvus sync: delete stale entries -> insert new/updated + # Vector-store sync: delete stale entries -> insert new/updated. + # Backend (Qdrant or Milvus) is routed by VECTOR_STORE_BACKEND + # via vector_backend_router; variable kept named + # ``agent_skill_milvus_repo`` for local-clarity only. try: - agent_skill_milvus_repo = get_bean_by_type(AgentSkillMilvusRepository) + from core.oxm.vector_backend_router import get_agent_skill_repo + agent_skill_milvus_repo = get_agent_skill_repo() for old_id in remove_ids: await agent_skill_milvus_repo.delete_by_id(old_id) inserted_count = 0 diff --git a/methods/EverCore/src/biz_layer/mem_sync.py b/methods/EverCore/src/biz_layer/mem_sync.py index 33f4dd36..b54beeff 100644 --- a/methods/EverCore/src/biz_layer/mem_sync.py +++ b/methods/EverCore/src/biz_layer/mem_sync.py @@ -62,12 +62,15 @@ def __init__( foresight_es_repo: Foresight ES repository instance (optional, obtained from DI if not provided) atomic_fact_es_repo: Atomic fact ES repository instance (optional, obtained from DI if not provided) """ - self.foresight_milvus_repo = foresight_milvus_repo or get_bean_by_type( - ForesightMilvusRepository - ) - self.atomic_fact_milvus_repo = atomic_fact_milvus_repo or get_bean_by_type( - AtomicFactMilvusRepository + # Field names keep the ``_milvus_`` token for caller-compat, but + # the actual instance is routed by VECTOR_STORE_BACKEND via the + # vector_backend_router factory. + from core.oxm.vector_backend_router import ( + get_atomic_fact_repo, + get_foresight_repo, ) + self.foresight_milvus_repo = foresight_milvus_repo or get_foresight_repo() + self.atomic_fact_milvus_repo = atomic_fact_milvus_repo or get_atomic_fact_repo() self.foresight_es_repo = foresight_es_repo or get_bean_by_type( ForesightEsRepository ) diff --git a/methods/EverCore/src/core/oxm/vector_backend_router.py b/methods/EverCore/src/core/oxm/vector_backend_router.py new file mode 100644 index 00000000..11e2ef6b --- /dev/null +++ b/methods/EverCore/src/core/oxm/vector_backend_router.py @@ -0,0 +1,117 @@ +""" +Vector-Backend-Router — env-gated factory for vector repositories. + +The service layer (``agentic_layer/*``, ``memory_layer/*``, ``biz_layer/*``) +historically instantiated ``*MilvusRepository()`` classes directly. After +the Qdrant adapter was added (see [[Qdrant]] in the EWM docs), this +hard-coded write/read path bypassed ``VECTOR_STORE_BACKEND`` entirely — +the lifespan layer correctly skipped Milvus, but every service still +constructed Milvus repos and crashed at first use. + +This router resolves the env flag once per construction call and returns +the right repository instance. Both backends expose the same public +surface (``vector_search``, ``create_and_save_*``, ``delete_by_*``) so +callers don't need to know which backend they got. + +Usage:: + + from core.oxm.vector_backend_router import get_episodic_repo + self.episodic_repo = get_episodic_repo() + # caller-facing methods are identical across backends: + results = await self.episodic_repo.vector_search(query_vector=v, ...) + +Adding a new memory type: write a ``get__repo()`` thin factory that +follows the same pattern. Lazy imports keep startup cheap and avoid +pulling Qdrant client packages into Milvus-only deployments. +""" + +from __future__ import annotations + +import os +from typing import Any, Type + + +def _backend() -> str: + """Return the active backend name in normalised form (``"qdrant"`` or + ``"milvus"``). Mirrors the case-insensitive gate used by + ``QdrantLifespanProvider`` and ``MilvusLifespanProvider``.""" + return os.getenv("VECTOR_STORE_BACKEND", "milvus").strip().lower() + + +def _is_qdrant() -> bool: + return _backend() == "qdrant" + + +def _resolve(qdrant_cls: Type[Any], milvus_cls: Type[Any]) -> Any: + """Resolve the right backend bean via the DI container so existing + singleton scope (and any constructor wiring done by the DI scanner) + is preserved. Falls back to direct instantiation only if no bean is + registered — that should not happen in production, but the safety + net keeps stand-alone unit tests with no DI scan from crashing. + """ + cls = qdrant_cls if _is_qdrant() else milvus_cls + try: + from core.di import get_bean_by_type + return get_bean_by_type(cls) + except Exception: + return cls() + + +def get_episodic_repo() -> Any: + from infra_layer.adapters.out.search.repository.episodic_memory_qdrant_repository import ( + EpisodicMemoryQdrantRepository, + ) + from infra_layer.adapters.out.search.repository.episodic_memory_milvus_repository import ( + EpisodicMemoryMilvusRepository, + ) + return _resolve(EpisodicMemoryQdrantRepository, EpisodicMemoryMilvusRepository) + + +def get_atomic_fact_repo() -> Any: + from infra_layer.adapters.out.search.repository.atomic_fact_qdrant_repository import ( + AtomicFactQdrantRepository, + ) + from infra_layer.adapters.out.search.repository.atomic_fact_milvus_repository import ( + AtomicFactMilvusRepository, + ) + return _resolve(AtomicFactQdrantRepository, AtomicFactMilvusRepository) + + +def get_foresight_repo() -> Any: + from infra_layer.adapters.out.search.repository.foresight_qdrant_repository import ( + ForesightQdrantRepository, + ) + from infra_layer.adapters.out.search.repository.foresight_milvus_repository import ( + ForesightMilvusRepository, + ) + return _resolve(ForesightQdrantRepository, ForesightMilvusRepository) + + +def get_agent_case_repo() -> Any: + from infra_layer.adapters.out.search.repository.agent_case_qdrant_repository import ( + AgentCaseQdrantRepository, + ) + from infra_layer.adapters.out.search.repository.agent_case_milvus_repository import ( + AgentCaseMilvusRepository, + ) + return _resolve(AgentCaseQdrantRepository, AgentCaseMilvusRepository) + + +def get_agent_skill_repo() -> Any: + from infra_layer.adapters.out.search.repository.agent_skill_qdrant_repository import ( + AgentSkillQdrantRepository, + ) + from infra_layer.adapters.out.search.repository.agent_skill_milvus_repository import ( + AgentSkillMilvusRepository, + ) + return _resolve(AgentSkillQdrantRepository, AgentSkillMilvusRepository) + + +def get_user_profile_repo() -> Any: + from infra_layer.adapters.out.search.repository.user_profile_qdrant_repository import ( + UserProfileQdrantRepository, + ) + from infra_layer.adapters.out.search.repository.user_profile_milvus_repository import ( + UserProfileMilvusRepository, + ) + return _resolve(UserProfileQdrantRepository, UserProfileMilvusRepository) diff --git a/methods/EverCore/src/memory_layer/profile_indexer/profile_indexer.py b/methods/EverCore/src/memory_layer/profile_indexer/profile_indexer.py index 94765730..b4a45e95 100644 --- a/methods/EverCore/src/memory_layer/profile_indexer/profile_indexer.py +++ b/methods/EverCore/src/memory_layer/profile_indexer/profile_indexer.py @@ -57,9 +57,15 @@ def __init__(self, milvus_repo: Optional[UserProfileMilvusRepository] = None): @property def milvus_repo(self) -> UserProfileMilvusRepository: - """Lazy load Milvus repository""" + """Lazy load the vector repository for the active backend. + + Property name kept for caller-compatibility; the actual instance + is routed by ``VECTOR_STORE_BACKEND`` via + ``core.oxm.vector_backend_router.get_user_profile_repo()``. + """ if self._milvus_repo is None: - self._milvus_repo = get_bean_by_type(UserProfileMilvusRepository) + from core.oxm.vector_backend_router import get_user_profile_repo + self._milvus_repo = get_user_profile_repo() return self._milvus_repo async def index_profile( From 035444c355811199892225348db6ca73188d0c87 Mon Sep 17 00:00:00 2001 From: Ptah-CT <221234802+Ptah-CT@users.noreply.github.com> Date: Thu, 14 May 2026 22:23:05 +0000 Subject: [PATCH 2/2] fix(qdrant): Milvus-compat insert/insert_batch adapter + env validation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses 5 of the 6 PR-6 review findings (Codex P1×3 + CodeRabbit minor×2). The remaining one (CodeRabbit analysis-only on get_bean_by_type exception scope) requires no code change — the fallback in ``_resolve`` already converts the missing-bean case into direct instantiation, so the broad ``except Exception`` is intentional. P1 fixes — Milvus-compat write API on Qdrant repositories Codex correctly identified that the previous factory swap could route ``ProfileIndexer.insert_batch(...)``, ``sync_foresight/atomic_fact.insert(...)`` and ``mem_memorize.agent_skill insert/delete`` calls to Qdrant repos that never had those methods. The Qdrant repos exposed only ``create_and_save_*`` + native ``upsert(PointStruct)`` — different signatures, different entity shape. Instead of rewriting every service-layer call (large diff, easy to miss sites), this commit adds a thin compatibility layer to ``BaseQdrantRepository`` so all six concrete Qdrant repos inherit it: - ``insert(entity_dict, flush=False) -> str`` — translates a Milvus-converter output (``{"id", "vector", payload-fields…}``) into a ``PointStruct`` via the same ``mongo_id_to_qdrant_id`` mapping the sweep uses, upserts, and returns the resulting Qdrant point id. The raw Mongo id is preserved as ``payload.mongo_id`` for round-trip. - ``insert_batch(entity_dicts, flush=False) -> UpdateResult`` — batch version. Entities without a vector or id are silently skipped (the source converter is supposed to guard, but the pipeline sometimes hands partial docs in early-lifecycle states). - The ``flush`` parameter is accepted and ignored — Qdrant has no separate flush; ``upsert(wait=True)`` already covers durability. This keeps the existing service-layer ``MilvusConverter.from_mongo()`` → ``milvus_repo.insert()`` pipeline intact while transparently routing to Qdrant when the env flag is set. CodeRabbit minors - ``typing.Type`` → builtin ``type`` (Python 3.9+ idiom, ruff UP035). - ``VECTOR_STORE_BACKEND`` validation: an unrecognised value now logs a warning and falls back to ``milvus`` instead of silently routing there. Catches typos like ``VECTOR_STORE_BACKEND=qdarnt``. --- .../src/core/oxm/qdrant/base_repository.py | 76 ++++++++++++++++++- .../src/core/oxm/vector_backend_router.py | 25 +++++- 2 files changed, 96 insertions(+), 5 deletions(-) diff --git a/methods/EverCore/src/core/oxm/qdrant/base_repository.py b/methods/EverCore/src/core/oxm/qdrant/base_repository.py index a7c13145..f0b50120 100644 --- a/methods/EverCore/src/core/oxm/qdrant/base_repository.py +++ b/methods/EverCore/src/core/oxm/qdrant/base_repository.py @@ -17,7 +17,7 @@ import uuid from abc import ABC from datetime import datetime, timezone -from typing import Any, Generic, List, Optional, Type, TypeVar +from typing import Any, Dict, Generic, List, Optional, Type, TypeVar from qdrant_client.http import models as qmodels @@ -155,6 +155,80 @@ def collection(self) -> T: def get_model_name(self) -> str: return self.model_name + # ============================== Milvus-compat insert/insert_batch + # The service layer (memory_manager, mem_sync, mem_memorize, + # profile_indexer) was historically written against the Milvus + # repository surface: ``insert(entity_dict)`` and + # ``insert_batch(entity_dicts)`` taking the output of the + # corresponding ``*MilvusConverter.from_mongo()``. The Qdrant + # repositories had a different, type-safer surface + # (``create_and_save_*`` + native ``upsert(PointStruct)``). + # + # Rewriting every service-layer call would be a large, risky change. + # Instead, this base class provides a thin adapter: the Milvus dict + # is translated to a Qdrant ``PointStruct`` (vector lifted out, mongo + # ``_id`` mapped through ``mongo_id_to_qdrant_id``, all other fields + # become payload, raw mongo id preserved as ``mongo_id`` for round + # trip). ``flush`` is accepted and ignored — Qdrant durability is + # already covered by ``upsert(wait=True)``. With this in place, the + # ``vector_backend_router`` factory can hand a Qdrant repo to any + # caller that previously held a Milvus repo, and the existing + # converter+insert pipeline keeps working. + + @staticmethod + def _milvus_entity_to_point(entity: Dict[str, Any]) -> Optional[qmodels.PointStruct]: + """Translate a Milvus-style entity dict into a ``PointStruct``. + + Returns ``None`` when the entity has no vector to embed; callers + should silently skip those (the source converter is supposed to + guard, but the indexing pipeline sometimes gets called with + partially-built docs during early lifecycle). + """ + vector = entity.get("vector") + if not vector: + return None + mongo_id = entity.get("id") or entity.get("mongo_id") + if mongo_id is None or mongo_id == "": + return None + payload: Dict[str, Any] = { + k: v for k, v in entity.items() if k != "vector" + } + payload["mongo_id"] = str(mongo_id) + return qmodels.PointStruct( + id=mongo_id_to_qdrant_id(mongo_id), + vector=vector, + payload=payload, + ) + + async def insert(self, entity: Dict[str, Any], flush: bool = False) -> str: + """Milvus-compat single-row insert. ``flush`` is accepted and ignored.""" + point = self._milvus_entity_to_point(entity) + if point is None: + raise ValueError( + f"{self.model_name}: entity missing vector or id; cannot upsert" + ) + await self.upsert(point) + return str(point.id) + + async def insert_batch( + self, entities: List[Dict[str, Any]], flush: bool = False + ) -> qmodels.UpdateResult: + """Milvus-compat batch insert. Entities without vector/id are skipped.""" + points: List[qmodels.PointStruct] = [] + for entity in entities: + point = self._milvus_entity_to_point(entity) + if point is not None: + points.append(point) + if not points: + logger.debug( + "Qdrant insert_batch [%s]: 0 valid points after filtering", + self.model_name, + ) + return qmodels.UpdateResult( + operation_id=0, status=qmodels.UpdateStatus.COMPLETED + ) + return await self.upsert_batch(points) + # =================================================== Basic CRUD (async) async def upsert( diff --git a/methods/EverCore/src/core/oxm/vector_backend_router.py b/methods/EverCore/src/core/oxm/vector_backend_router.py index 11e2ef6b..1e0f7165 100644 --- a/methods/EverCore/src/core/oxm/vector_backend_router.py +++ b/methods/EverCore/src/core/oxm/vector_backend_router.py @@ -27,22 +27,39 @@ from __future__ import annotations +import logging import os -from typing import Any, Type +from typing import Any + +logger = logging.getLogger(__name__) + +_VALID_BACKENDS = {"qdrant", "milvus"} def _backend() -> str: """Return the active backend name in normalised form (``"qdrant"`` or ``"milvus"``). Mirrors the case-insensitive gate used by - ``QdrantLifespanProvider`` and ``MilvusLifespanProvider``.""" - return os.getenv("VECTOR_STORE_BACKEND", "milvus").strip().lower() + ``QdrantLifespanProvider`` and ``MilvusLifespanProvider``. An + unrecognised value (e.g. a typo like ``VECTOR_STORE_BACKEND=qdarnt``) + falls back to ``"milvus"`` but emits a warning so the operator + notices instead of silently routing to the wrong backend. + """ + raw = os.getenv("VECTOR_STORE_BACKEND", "milvus").strip().lower() + if raw not in _VALID_BACKENDS: + logger.warning( + "VECTOR_STORE_BACKEND=%r is not a known backend (expected one of " + "%s); falling back to 'milvus'", + raw, sorted(_VALID_BACKENDS), + ) + return "milvus" + return raw def _is_qdrant() -> bool: return _backend() == "qdrant" -def _resolve(qdrant_cls: Type[Any], milvus_cls: Type[Any]) -> Any: +def _resolve(qdrant_cls: type[Any], milvus_cls: type[Any]) -> Any: """Resolve the right backend bean via the DI container so existing singleton scope (and any constructor wiring done by the DI scanner) is preserved. Falls back to direct instantiation only if no bean is