diff --git a/methods/EverCore/src/agentic_layer/memory_manager.py b/methods/EverCore/src/agentic_layer/memory_manager.py index e6e93005..ccd134c2 100644 --- a/methods/EverCore/src/agentic_layer/memory_manager.py +++ b/methods/EverCore/src/agentic_layer/memory_manager.py @@ -671,18 +671,27 @@ async def get_vector_search_results( f"Query text vectorization completed, vector dimension: {len(query_vector_list)}" ) - # Select Milvus repository based on memory type + # Select vector repository based on memory type. Routed by + # VECTOR_STORE_BACKEND env via vector_backend_router; the local + # variable keeps the ``milvus_repo`` name only for legacy clarity. + from core.oxm.vector_backend_router import ( + get_agent_case_repo, + get_agent_skill_repo, + get_atomic_fact_repo, + get_episodic_repo, + get_foresight_repo, + ) match mem_type: case MemoryType.FORESIGHT: - milvus_repo = get_bean_by_type(ForesightMilvusRepository) + milvus_repo = get_foresight_repo() case MemoryType.ATOMIC_FACT: - milvus_repo = get_bean_by_type(AtomicFactMilvusRepository) + milvus_repo = get_atomic_fact_repo() case MemoryType.EPISODIC_MEMORY: - milvus_repo = get_bean_by_type(EpisodicMemoryMilvusRepository) + milvus_repo = get_episodic_repo() case MemoryType.AGENT_CASE: - milvus_repo = get_bean_by_type(AgentCaseMilvusRepository) + milvus_repo = get_agent_case_repo() case MemoryType.AGENT_SKILL: - milvus_repo = get_bean_by_type(AgentSkillMilvusRepository) + milvus_repo = get_agent_skill_repo() case _: raise ValueError(f"Unsupported memory type: {mem_type}") diff --git a/methods/EverCore/src/agentic_layer/profile_search_service.py b/methods/EverCore/src/agentic_layer/profile_search_service.py index a86f5de6..0d8b2621 100644 --- a/methods/EverCore/src/agentic_layer/profile_search_service.py +++ b/methods/EverCore/src/agentic_layer/profile_search_service.py @@ -97,9 +97,15 @@ def __init__( @property def milvus_repo(self) -> UserProfileMilvusRepository: - """Lazy load Milvus repository""" + """Lazy load the vector repository for the active backend. + + Named ``milvus_repo`` for caller-compatibility; the actual instance + is routed by ``VECTOR_STORE_BACKEND`` via + ``core.oxm.vector_backend_router.get_user_profile_repo()``. + """ if self._milvus_repo is None: - self._milvus_repo = get_bean_by_type(UserProfileMilvusRepository) + from core.oxm.vector_backend_router import get_user_profile_repo + self._milvus_repo = get_user_profile_repo() return self._milvus_repo async def search_profiles( diff --git a/methods/EverCore/src/agentic_layer/search_mem_service.py b/methods/EverCore/src/agentic_layer/search_mem_service.py index 9c25f919..e420faba 100644 --- a/methods/EverCore/src/agentic_layer/search_mem_service.py +++ b/methods/EverCore/src/agentic_layer/search_mem_service.py @@ -141,15 +141,24 @@ def __init__(self): # ES Repositories self.episodic_es_repo = EpisodicMemoryEsRepository() - # Milvus Repositories - self.episodic_milvus_repo = EpisodicMemoryMilvusRepository() - self.profile_milvus_repo = UserProfileMilvusRepository() + # Vector Repositories — routed by VECTOR_STORE_BACKEND env at + # construction time. Field names keep the historical ``_milvus_`` + # token for caller compatibility, but the actual backend is the + # one configured for this process (Qdrant or Milvus). + from core.oxm.vector_backend_router import ( + get_agent_case_repo, + get_agent_skill_repo, + get_episodic_repo, + get_user_profile_repo, + ) + self.episodic_milvus_repo = get_episodic_repo() + self.profile_milvus_repo = get_user_profile_repo() # Agent memory repositories self.agent_case_es_repo = AgentCaseEsRepository() self.agent_skill_es_repo = AgentSkillEsRepository() - self.agent_case_milvus_repo = AgentCaseMilvusRepository() - self.agent_skill_milvus_repo = AgentSkillMilvusRepository() + self.agent_case_milvus_repo = get_agent_case_repo() + self.agent_skill_milvus_repo = get_agent_skill_repo() # MongoDB raw repositories (for fetching full docs by id) self.episodic_raw_repo = EpisodicMemoryRawRepository() diff --git a/methods/EverCore/src/biz_layer/mem_memorize.py b/methods/EverCore/src/biz_layer/mem_memorize.py index be1adcbc..02f31144 100644 --- a/methods/EverCore/src/biz_layer/mem_memorize.py +++ b/methods/EverCore/src/biz_layer/mem_memorize.py @@ -660,9 +660,13 @@ async def _trigger_agent_skill_extraction( remove_ids = extraction_result.deleted_ids + updated_ids if upsert_records or remove_ids: - # Milvus sync: delete stale entries -> insert new/updated + # Vector-store sync: delete stale entries -> insert new/updated. + # Backend (Qdrant or Milvus) is routed by VECTOR_STORE_BACKEND + # via vector_backend_router; variable kept named + # ``agent_skill_milvus_repo`` for local-clarity only. try: - agent_skill_milvus_repo = get_bean_by_type(AgentSkillMilvusRepository) + from core.oxm.vector_backend_router import get_agent_skill_repo + agent_skill_milvus_repo = get_agent_skill_repo() for old_id in remove_ids: await agent_skill_milvus_repo.delete_by_id(old_id) inserted_count = 0 diff --git a/methods/EverCore/src/biz_layer/mem_sync.py b/methods/EverCore/src/biz_layer/mem_sync.py index 33f4dd36..b54beeff 100644 --- a/methods/EverCore/src/biz_layer/mem_sync.py +++ b/methods/EverCore/src/biz_layer/mem_sync.py @@ -62,12 +62,15 @@ def __init__( foresight_es_repo: Foresight ES repository instance (optional, obtained from DI if not provided) atomic_fact_es_repo: Atomic fact ES repository instance (optional, obtained from DI if not provided) """ - self.foresight_milvus_repo = foresight_milvus_repo or get_bean_by_type( - ForesightMilvusRepository - ) - self.atomic_fact_milvus_repo = atomic_fact_milvus_repo or get_bean_by_type( - AtomicFactMilvusRepository + # Field names keep the ``_milvus_`` token for caller-compat, but + # the actual instance is routed by VECTOR_STORE_BACKEND via the + # vector_backend_router factory. + from core.oxm.vector_backend_router import ( + get_atomic_fact_repo, + get_foresight_repo, ) + self.foresight_milvus_repo = foresight_milvus_repo or get_foresight_repo() + self.atomic_fact_milvus_repo = atomic_fact_milvus_repo or get_atomic_fact_repo() self.foresight_es_repo = foresight_es_repo or get_bean_by_type( ForesightEsRepository ) diff --git a/methods/EverCore/src/core/oxm/qdrant/base_repository.py b/methods/EverCore/src/core/oxm/qdrant/base_repository.py index a7c13145..f0b50120 100644 --- a/methods/EverCore/src/core/oxm/qdrant/base_repository.py +++ b/methods/EverCore/src/core/oxm/qdrant/base_repository.py @@ -17,7 +17,7 @@ import uuid from abc import ABC from datetime import datetime, timezone -from typing import Any, Generic, List, Optional, Type, TypeVar +from typing import Any, Dict, Generic, List, Optional, Type, TypeVar from qdrant_client.http import models as qmodels @@ -155,6 +155,80 @@ def collection(self) -> T: def get_model_name(self) -> str: return self.model_name + # ============================== Milvus-compat insert/insert_batch + # The service layer (memory_manager, mem_sync, mem_memorize, + # profile_indexer) was historically written against the Milvus + # repository surface: ``insert(entity_dict)`` and + # ``insert_batch(entity_dicts)`` taking the output of the + # corresponding ``*MilvusConverter.from_mongo()``. The Qdrant + # repositories had a different, type-safer surface + # (``create_and_save_*`` + native ``upsert(PointStruct)``). + # + # Rewriting every service-layer call would be a large, risky change. + # Instead, this base class provides a thin adapter: the Milvus dict + # is translated to a Qdrant ``PointStruct`` (vector lifted out, mongo + # ``_id`` mapped through ``mongo_id_to_qdrant_id``, all other fields + # become payload, raw mongo id preserved as ``mongo_id`` for round + # trip). ``flush`` is accepted and ignored — Qdrant durability is + # already covered by ``upsert(wait=True)``. With this in place, the + # ``vector_backend_router`` factory can hand a Qdrant repo to any + # caller that previously held a Milvus repo, and the existing + # converter+insert pipeline keeps working. + + @staticmethod + def _milvus_entity_to_point(entity: Dict[str, Any]) -> Optional[qmodels.PointStruct]: + """Translate a Milvus-style entity dict into a ``PointStruct``. + + Returns ``None`` when the entity has no vector to embed; callers + should silently skip those (the source converter is supposed to + guard, but the indexing pipeline sometimes gets called with + partially-built docs during early lifecycle). + """ + vector = entity.get("vector") + if not vector: + return None + mongo_id = entity.get("id") or entity.get("mongo_id") + if mongo_id is None or mongo_id == "": + return None + payload: Dict[str, Any] = { + k: v for k, v in entity.items() if k != "vector" + } + payload["mongo_id"] = str(mongo_id) + return qmodels.PointStruct( + id=mongo_id_to_qdrant_id(mongo_id), + vector=vector, + payload=payload, + ) + + async def insert(self, entity: Dict[str, Any], flush: bool = False) -> str: + """Milvus-compat single-row insert. ``flush`` is accepted and ignored.""" + point = self._milvus_entity_to_point(entity) + if point is None: + raise ValueError( + f"{self.model_name}: entity missing vector or id; cannot upsert" + ) + await self.upsert(point) + return str(point.id) + + async def insert_batch( + self, entities: List[Dict[str, Any]], flush: bool = False + ) -> qmodels.UpdateResult: + """Milvus-compat batch insert. Entities without vector/id are skipped.""" + points: List[qmodels.PointStruct] = [] + for entity in entities: + point = self._milvus_entity_to_point(entity) + if point is not None: + points.append(point) + if not points: + logger.debug( + "Qdrant insert_batch [%s]: 0 valid points after filtering", + self.model_name, + ) + return qmodels.UpdateResult( + operation_id=0, status=qmodels.UpdateStatus.COMPLETED + ) + return await self.upsert_batch(points) + # =================================================== Basic CRUD (async) async def upsert( diff --git a/methods/EverCore/src/core/oxm/vector_backend_router.py b/methods/EverCore/src/core/oxm/vector_backend_router.py new file mode 100644 index 00000000..1e0f7165 --- /dev/null +++ b/methods/EverCore/src/core/oxm/vector_backend_router.py @@ -0,0 +1,134 @@ +""" +Vector-Backend-Router — env-gated factory for vector repositories. + +The service layer (``agentic_layer/*``, ``memory_layer/*``, ``biz_layer/*``) +historically instantiated ``*MilvusRepository()`` classes directly. After +the Qdrant adapter was added (see [[Qdrant]] in the EWM docs), this +hard-coded write/read path bypassed ``VECTOR_STORE_BACKEND`` entirely — +the lifespan layer correctly skipped Milvus, but every service still +constructed Milvus repos and crashed at first use. + +This router resolves the env flag once per construction call and returns +the right repository instance. Both backends expose the same public +surface (``vector_search``, ``create_and_save_*``, ``delete_by_*``) so +callers don't need to know which backend they got. + +Usage:: + + from core.oxm.vector_backend_router import get_episodic_repo + self.episodic_repo = get_episodic_repo() + # caller-facing methods are identical across backends: + results = await self.episodic_repo.vector_search(query_vector=v, ...) + +Adding a new memory type: write a ``get__repo()`` thin factory that +follows the same pattern. Lazy imports keep startup cheap and avoid +pulling Qdrant client packages into Milvus-only deployments. +""" + +from __future__ import annotations + +import logging +import os +from typing import Any + +logger = logging.getLogger(__name__) + +_VALID_BACKENDS = {"qdrant", "milvus"} + + +def _backend() -> str: + """Return the active backend name in normalised form (``"qdrant"`` or + ``"milvus"``). Mirrors the case-insensitive gate used by + ``QdrantLifespanProvider`` and ``MilvusLifespanProvider``. An + unrecognised value (e.g. a typo like ``VECTOR_STORE_BACKEND=qdarnt``) + falls back to ``"milvus"`` but emits a warning so the operator + notices instead of silently routing to the wrong backend. + """ + raw = os.getenv("VECTOR_STORE_BACKEND", "milvus").strip().lower() + if raw not in _VALID_BACKENDS: + logger.warning( + "VECTOR_STORE_BACKEND=%r is not a known backend (expected one of " + "%s); falling back to 'milvus'", + raw, sorted(_VALID_BACKENDS), + ) + return "milvus" + return raw + + +def _is_qdrant() -> bool: + return _backend() == "qdrant" + + +def _resolve(qdrant_cls: type[Any], milvus_cls: type[Any]) -> Any: + """Resolve the right backend bean via the DI container so existing + singleton scope (and any constructor wiring done by the DI scanner) + is preserved. Falls back to direct instantiation only if no bean is + registered — that should not happen in production, but the safety + net keeps stand-alone unit tests with no DI scan from crashing. + """ + cls = qdrant_cls if _is_qdrant() else milvus_cls + try: + from core.di import get_bean_by_type + return get_bean_by_type(cls) + except Exception: + return cls() + + +def get_episodic_repo() -> Any: + from infra_layer.adapters.out.search.repository.episodic_memory_qdrant_repository import ( + EpisodicMemoryQdrantRepository, + ) + from infra_layer.adapters.out.search.repository.episodic_memory_milvus_repository import ( + EpisodicMemoryMilvusRepository, + ) + return _resolve(EpisodicMemoryQdrantRepository, EpisodicMemoryMilvusRepository) + + +def get_atomic_fact_repo() -> Any: + from infra_layer.adapters.out.search.repository.atomic_fact_qdrant_repository import ( + AtomicFactQdrantRepository, + ) + from infra_layer.adapters.out.search.repository.atomic_fact_milvus_repository import ( + AtomicFactMilvusRepository, + ) + return _resolve(AtomicFactQdrantRepository, AtomicFactMilvusRepository) + + +def get_foresight_repo() -> Any: + from infra_layer.adapters.out.search.repository.foresight_qdrant_repository import ( + ForesightQdrantRepository, + ) + from infra_layer.adapters.out.search.repository.foresight_milvus_repository import ( + ForesightMilvusRepository, + ) + return _resolve(ForesightQdrantRepository, ForesightMilvusRepository) + + +def get_agent_case_repo() -> Any: + from infra_layer.adapters.out.search.repository.agent_case_qdrant_repository import ( + AgentCaseQdrantRepository, + ) + from infra_layer.adapters.out.search.repository.agent_case_milvus_repository import ( + AgentCaseMilvusRepository, + ) + return _resolve(AgentCaseQdrantRepository, AgentCaseMilvusRepository) + + +def get_agent_skill_repo() -> Any: + from infra_layer.adapters.out.search.repository.agent_skill_qdrant_repository import ( + AgentSkillQdrantRepository, + ) + from infra_layer.adapters.out.search.repository.agent_skill_milvus_repository import ( + AgentSkillMilvusRepository, + ) + return _resolve(AgentSkillQdrantRepository, AgentSkillMilvusRepository) + + +def get_user_profile_repo() -> Any: + from infra_layer.adapters.out.search.repository.user_profile_qdrant_repository import ( + UserProfileQdrantRepository, + ) + from infra_layer.adapters.out.search.repository.user_profile_milvus_repository import ( + UserProfileMilvusRepository, + ) + return _resolve(UserProfileQdrantRepository, UserProfileMilvusRepository) diff --git a/methods/EverCore/src/memory_layer/profile_indexer/profile_indexer.py b/methods/EverCore/src/memory_layer/profile_indexer/profile_indexer.py index 94765730..b4a45e95 100644 --- a/methods/EverCore/src/memory_layer/profile_indexer/profile_indexer.py +++ b/methods/EverCore/src/memory_layer/profile_indexer/profile_indexer.py @@ -57,9 +57,15 @@ def __init__(self, milvus_repo: Optional[UserProfileMilvusRepository] = None): @property def milvus_repo(self) -> UserProfileMilvusRepository: - """Lazy load Milvus repository""" + """Lazy load the vector repository for the active backend. + + Property name kept for caller-compatibility; the actual instance + is routed by ``VECTOR_STORE_BACKEND`` via + ``core.oxm.vector_backend_router.get_user_profile_repo()``. + """ if self._milvus_repo is None: - self._milvus_repo = get_bean_by_type(UserProfileMilvusRepository) + from core.oxm.vector_backend_router import get_user_profile_repo + self._milvus_repo = get_user_profile_repo() return self._milvus_repo async def index_profile(