Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions methods/EverCore/src/agentic_layer/memory_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -671,18 +671,27 @@ async def get_vector_search_results(
f"Query text vectorization completed, vector dimension: {len(query_vector_list)}"
)

# Select Milvus repository based on memory type
# Select vector repository based on memory type. Routed by
# VECTOR_STORE_BACKEND env via vector_backend_router; the local
# variable keeps the ``milvus_repo`` name only for legacy clarity.
from core.oxm.vector_backend_router import (
get_agent_case_repo,
get_agent_skill_repo,
get_atomic_fact_repo,
get_episodic_repo,
get_foresight_repo,
)
match mem_type:
case MemoryType.FORESIGHT:
milvus_repo = get_bean_by_type(ForesightMilvusRepository)
milvus_repo = get_foresight_repo()
case MemoryType.ATOMIC_FACT:
milvus_repo = get_bean_by_type(AtomicFactMilvusRepository)
milvus_repo = get_atomic_fact_repo()
case MemoryType.EPISODIC_MEMORY:
milvus_repo = get_bean_by_type(EpisodicMemoryMilvusRepository)
milvus_repo = get_episodic_repo()
case MemoryType.AGENT_CASE:
milvus_repo = get_bean_by_type(AgentCaseMilvusRepository)
milvus_repo = get_agent_case_repo()
case MemoryType.AGENT_SKILL:
Comment on lines 689 to 693
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Keep hybrid dedupe keys consistent across vector backends

This routing change also affects MemoryManager hybrid retrieval: _search_hybrid deduplicates keyword/vector hits by id, but keyword hits use ES document IDs while Qdrant vector hits now carry point UUIDs, so the same memory is no longer recognized as duplicate. In Qdrant mode this inflates duplicate candidates before rerank and can crowd out distinct results in the final top-k.

Useful? React with 👍 / 👎.

milvus_repo = get_bean_by_type(AgentSkillMilvusRepository)
milvus_repo = get_agent_skill_repo()
case _:
raise ValueError(f"Unsupported memory type: {mem_type}")

Expand Down
10 changes: 8 additions & 2 deletions methods/EverCore/src/agentic_layer/profile_search_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,15 @@ def __init__(

@property
def milvus_repo(self) -> UserProfileMilvusRepository:
"""Lazy load Milvus repository"""
"""Lazy load the vector repository for the active backend.

Named ``milvus_repo`` for caller-compatibility; the actual instance
is routed by ``VECTOR_STORE_BACKEND`` via
``core.oxm.vector_backend_router.get_user_profile_repo()``.
"""
if self._milvus_repo is None:
self._milvus_repo = get_bean_by_type(UserProfileMilvusRepository)
from core.oxm.vector_backend_router import get_user_profile_repo
self._milvus_repo = get_user_profile_repo()
return self._milvus_repo

async def search_profiles(
Expand Down
19 changes: 14 additions & 5 deletions methods/EverCore/src/agentic_layer/search_mem_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,24 @@ def __init__(self):
# ES Repositories
self.episodic_es_repo = EpisodicMemoryEsRepository()

# Milvus Repositories
self.episodic_milvus_repo = EpisodicMemoryMilvusRepository()
self.profile_milvus_repo = UserProfileMilvusRepository()
# Vector Repositories — routed by VECTOR_STORE_BACKEND env at
# construction time. Field names keep the historical ``_milvus_``
# token for caller compatibility, but the actual backend is the
# one configured for this process (Qdrant or Milvus).
from core.oxm.vector_backend_router import (
get_agent_case_repo,
get_agent_skill_repo,
get_episodic_repo,
get_user_profile_repo,
)
self.episodic_milvus_repo = get_episodic_repo()
self.profile_milvus_repo = get_user_profile_repo()
Comment on lines +154 to +155
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Normalize episodic timestamp type after backend-routed repo swap

By routing episodic_milvus_repo through get_episodic_repo(), Qdrant mode now feeds search_mem_service results where timestamp is already a datetime, but _search_episodic_memories only handles numeric timestamps and otherwise sets timestamp=None. This makes vector episodic results lose their timestamp field in Qdrant deployments, which breaks time display/sorting behavior even when the underlying hit is valid.

Useful? React with 👍 / 👎.


# Agent memory repositories
self.agent_case_es_repo = AgentCaseEsRepository()
self.agent_skill_es_repo = AgentSkillEsRepository()
self.agent_case_milvus_repo = AgentCaseMilvusRepository()
self.agent_skill_milvus_repo = AgentSkillMilvusRepository()
self.agent_case_milvus_repo = get_agent_case_repo()
self.agent_skill_milvus_repo = get_agent_skill_repo()
Comment on lines +160 to +161
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve Mongo IDs when routing search repos through backend router

After SearchMemoryService starts resolving agent_case_milvus_repo/agent_skill_milvus_repo via the backend router, Qdrant mode returns point UUIDs in id, but this service still treats id as a Mongo _id for backfill (_fetch_agent_cases_by_ids / _fetch_agent_skills_by_ids) and for hybrid fusion dedupe. In Qdrant deployments this causes vector/hybrid agent-case and agent-skill results to be dropped because Mongo lookups miss, even though vector hits were found.

Useful? React with 👍 / 👎.


# MongoDB raw repositories (for fetching full docs by id)
self.episodic_raw_repo = EpisodicMemoryRawRepository()
Expand Down
8 changes: 6 additions & 2 deletions methods/EverCore/src/biz_layer/mem_memorize.py
Original file line number Diff line number Diff line change
Expand Up @@ -660,9 +660,13 @@ async def _trigger_agent_skill_extraction(
remove_ids = extraction_result.deleted_ids + updated_ids

if upsert_records or remove_ids:
# Milvus sync: delete stale entries -> insert new/updated
# Vector-store sync: delete stale entries -> insert new/updated.
# Backend (Qdrant or Milvus) is routed by VECTOR_STORE_BACKEND
# via vector_backend_router; variable kept named
# ``agent_skill_milvus_repo`` for local-clarity only.
try:
agent_skill_milvus_repo = get_bean_by_type(AgentSkillMilvusRepository)
from core.oxm.vector_backend_router import get_agent_skill_repo
agent_skill_milvus_repo = get_agent_skill_repo()
Comment thread
Ptah-CT marked this conversation as resolved.
for old_id in remove_ids:
await agent_skill_milvus_repo.delete_by_id(old_id)
inserted_count = 0
Expand Down
13 changes: 8 additions & 5 deletions methods/EverCore/src/biz_layer/mem_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,15 @@ def __init__(
foresight_es_repo: Foresight ES repository instance (optional, obtained from DI if not provided)
atomic_fact_es_repo: Atomic fact ES repository instance (optional, obtained from DI if not provided)
"""
self.foresight_milvus_repo = foresight_milvus_repo or get_bean_by_type(
ForesightMilvusRepository
)
self.atomic_fact_milvus_repo = atomic_fact_milvus_repo or get_bean_by_type(
AtomicFactMilvusRepository
# Field names keep the ``_milvus_`` token for caller-compat, but
# the actual instance is routed by VECTOR_STORE_BACKEND via the
# vector_backend_router factory.
from core.oxm.vector_backend_router import (
get_atomic_fact_repo,
get_foresight_repo,
)
self.foresight_milvus_repo = foresight_milvus_repo or get_foresight_repo()
self.atomic_fact_milvus_repo = atomic_fact_milvus_repo or get_atomic_fact_repo()
Comment thread
Ptah-CT marked this conversation as resolved.
self.foresight_es_repo = foresight_es_repo or get_bean_by_type(
ForesightEsRepository
)
Expand Down
76 changes: 75 additions & 1 deletion methods/EverCore/src/core/oxm/qdrant/base_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import uuid
from abc import ABC
from datetime import datetime, timezone
from typing import Any, Generic, List, Optional, Type, TypeVar
from typing import Any, Dict, Generic, List, Optional, Type, TypeVar

from qdrant_client.http import models as qmodels

Expand Down Expand Up @@ -155,6 +155,80 @@ def collection(self) -> T:
def get_model_name(self) -> str:
return self.model_name

# ============================== Milvus-compat insert/insert_batch
# The service layer (memory_manager, mem_sync, mem_memorize,
# profile_indexer) was historically written against the Milvus
# repository surface: ``insert(entity_dict)`` and
# ``insert_batch(entity_dicts)`` taking the output of the
# corresponding ``*MilvusConverter.from_mongo()``. The Qdrant
# repositories had a different, type-safer surface
# (``create_and_save_*`` + native ``upsert(PointStruct)``).
#
# Rewriting every service-layer call would be a large, risky change.
# Instead, this base class provides a thin adapter: the Milvus dict
# is translated to a Qdrant ``PointStruct`` (vector lifted out, mongo
# ``_id`` mapped through ``mongo_id_to_qdrant_id``, all other fields
# become payload, raw mongo id preserved as ``mongo_id`` for round
# trip). ``flush`` is accepted and ignored — Qdrant durability is
# already covered by ``upsert(wait=True)``. With this in place, the
# ``vector_backend_router`` factory can hand a Qdrant repo to any
# caller that previously held a Milvus repo, and the existing
# converter+insert pipeline keeps working.

@staticmethod
def _milvus_entity_to_point(entity: Dict[str, Any]) -> Optional[qmodels.PointStruct]:
"""Translate a Milvus-style entity dict into a ``PointStruct``.

Returns ``None`` when the entity has no vector to embed; callers
should silently skip those (the source converter is supposed to
guard, but the indexing pipeline sometimes gets called with
partially-built docs during early lifecycle).
"""
vector = entity.get("vector")
if not vector:
return None
mongo_id = entity.get("id") or entity.get("mongo_id")
if mongo_id is None or mongo_id == "":
return None
payload: Dict[str, Any] = {
k: v for k, v in entity.items() if k != "vector"
}
payload["mongo_id"] = str(mongo_id)
return qmodels.PointStruct(
id=mongo_id_to_qdrant_id(mongo_id),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Add delete-ID compatibility alongside Mongo->Qdrant ID mapping

The new Milvus-compat insert adapter rewrites Mongo IDs to UUID5 point IDs, but delete call sites in routed services still pass raw Mongo IDs into delete_by_id. That mismatch means update/delete sync flows can fail or silently miss deletions in Qdrant mode; in mem_memorize this is especially harmful because delete and insert are inside one try, so a delete failure skips the subsequent upserts and leaves vectors stale.

Useful? React with 👍 / 👎.

vector=vector,
payload=payload,
)

async def insert(self, entity: Dict[str, Any], flush: bool = False) -> str:
"""Milvus-compat single-row insert. ``flush`` is accepted and ignored."""
point = self._milvus_entity_to_point(entity)
if point is None:
raise ValueError(
f"{self.model_name}: entity missing vector or id; cannot upsert"
)
await self.upsert(point)
return str(point.id)

async def insert_batch(
self, entities: List[Dict[str, Any]], flush: bool = False
) -> qmodels.UpdateResult:
"""Milvus-compat batch insert. Entities without vector/id are skipped."""
points: List[qmodels.PointStruct] = []
for entity in entities:
point = self._milvus_entity_to_point(entity)
if point is not None:
points.append(point)
if not points:
logger.debug(
"Qdrant insert_batch [%s]: 0 valid points after filtering",
self.model_name,
)
return qmodels.UpdateResult(
operation_id=0, status=qmodels.UpdateStatus.COMPLETED
)
return await self.upsert_batch(points)

# =================================================== Basic CRUD (async)

async def upsert(
Expand Down
134 changes: 134 additions & 0 deletions methods/EverCore/src/core/oxm/vector_backend_router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
"""
Vector-Backend-Router — env-gated factory for vector repositories.

The service layer (``agentic_layer/*``, ``memory_layer/*``, ``biz_layer/*``)
historically instantiated ``*MilvusRepository()`` classes directly. After
the Qdrant adapter was added (see [[Qdrant]] in the EWM docs), this
hard-coded write/read path bypassed ``VECTOR_STORE_BACKEND`` entirely —
the lifespan layer correctly skipped Milvus, but every service still
constructed Milvus repos and crashed at first use.

This router resolves the env flag once per construction call and returns
the right repository instance. Both backends expose the same public
surface (``vector_search``, ``create_and_save_*``, ``delete_by_*``) so
callers don't need to know which backend they got.

Usage::

from core.oxm.vector_backend_router import get_episodic_repo
self.episodic_repo = get_episodic_repo()
# caller-facing methods are identical across backends:
results = await self.episodic_repo.vector_search(query_vector=v, ...)

Adding a new memory type: write a ``get_<name>_repo()`` thin factory that
follows the same pattern. Lazy imports keep startup cheap and avoid
pulling Qdrant client packages into Milvus-only deployments.
"""

from __future__ import annotations

import logging
import os
from typing import Any

logger = logging.getLogger(__name__)

_VALID_BACKENDS = {"qdrant", "milvus"}


def _backend() -> str:
"""Return the active backend name in normalised form (``"qdrant"`` or
``"milvus"``). Mirrors the case-insensitive gate used by
``QdrantLifespanProvider`` and ``MilvusLifespanProvider``. An
unrecognised value (e.g. a typo like ``VECTOR_STORE_BACKEND=qdarnt``)
falls back to ``"milvus"`` but emits a warning so the operator
notices instead of silently routing to the wrong backend.
"""
raw = os.getenv("VECTOR_STORE_BACKEND", "milvus").strip().lower()
if raw not in _VALID_BACKENDS:
logger.warning(
"VECTOR_STORE_BACKEND=%r is not a known backend (expected one of "
"%s); falling back to 'milvus'",
raw, sorted(_VALID_BACKENDS),
)
return "milvus"
return raw


def _is_qdrant() -> bool:
return _backend() == "qdrant"


def _resolve(qdrant_cls: type[Any], milvus_cls: type[Any]) -> Any:
"""Resolve the right backend bean via the DI container so existing
singleton scope (and any constructor wiring done by the DI scanner)
is preserved. Falls back to direct instantiation only if no bean is
registered — that should not happen in production, but the safety
net keeps stand-alone unit tests with no DI scan from crashing.
"""
cls = qdrant_cls if _is_qdrant() else milvus_cls
try:
from core.di import get_bean_by_type
return get_bean_by_type(cls)
except Exception:
return cls()


def get_episodic_repo() -> Any:
from infra_layer.adapters.out.search.repository.episodic_memory_qdrant_repository import (
EpisodicMemoryQdrantRepository,
)
from infra_layer.adapters.out.search.repository.episodic_memory_milvus_repository import (
EpisodicMemoryMilvusRepository,
)
return _resolve(EpisodicMemoryQdrantRepository, EpisodicMemoryMilvusRepository)


def get_atomic_fact_repo() -> Any:
from infra_layer.adapters.out.search.repository.atomic_fact_qdrant_repository import (
AtomicFactQdrantRepository,
)
from infra_layer.adapters.out.search.repository.atomic_fact_milvus_repository import (
AtomicFactMilvusRepository,
)
return _resolve(AtomicFactQdrantRepository, AtomicFactMilvusRepository)


def get_foresight_repo() -> Any:
from infra_layer.adapters.out.search.repository.foresight_qdrant_repository import (
ForesightQdrantRepository,
)
from infra_layer.adapters.out.search.repository.foresight_milvus_repository import (
ForesightMilvusRepository,
)
return _resolve(ForesightQdrantRepository, ForesightMilvusRepository)


def get_agent_case_repo() -> Any:
from infra_layer.adapters.out.search.repository.agent_case_qdrant_repository import (
AgentCaseQdrantRepository,
)
from infra_layer.adapters.out.search.repository.agent_case_milvus_repository import (
AgentCaseMilvusRepository,
)
return _resolve(AgentCaseQdrantRepository, AgentCaseMilvusRepository)


def get_agent_skill_repo() -> Any:
from infra_layer.adapters.out.search.repository.agent_skill_qdrant_repository import (
AgentSkillQdrantRepository,
)
from infra_layer.adapters.out.search.repository.agent_skill_milvus_repository import (
AgentSkillMilvusRepository,
)
return _resolve(AgentSkillQdrantRepository, AgentSkillMilvusRepository)


def get_user_profile_repo() -> Any:
from infra_layer.adapters.out.search.repository.user_profile_qdrant_repository import (
UserProfileQdrantRepository,
)
from infra_layer.adapters.out.search.repository.user_profile_milvus_repository import (
UserProfileMilvusRepository,
)
return _resolve(UserProfileQdrantRepository, UserProfileMilvusRepository)
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,15 @@ def __init__(self, milvus_repo: Optional[UserProfileMilvusRepository] = None):

@property
def milvus_repo(self) -> UserProfileMilvusRepository:
"""Lazy load Milvus repository"""
"""Lazy load the vector repository for the active backend.

Property name kept for caller-compatibility; the actual instance
is routed by ``VECTOR_STORE_BACKEND`` via
``core.oxm.vector_backend_router.get_user_profile_repo()``.
"""
if self._milvus_repo is None:
self._milvus_repo = get_bean_by_type(UserProfileMilvusRepository)
from core.oxm.vector_backend_router import get_user_profile_repo
self._milvus_repo = get_user_profile_repo()
Comment thread
Ptah-CT marked this conversation as resolved.
return self._milvus_repo

async def index_profile(
Expand Down
Loading