Skip to content

Qdrant adapter: full Milvus parity + standalone re-embed sweep#1

Closed
Ptah-CT wants to merge 20 commits into
mainfrom
feature/qdrant-adapter
Closed

Qdrant adapter: full Milvus parity + standalone re-embed sweep#1
Ptah-CT wants to merge 20 commits into
mainfrom
feature/qdrant-adapter

Conversation

@Ptah-CT
Copy link
Copy Markdown

@Ptah-CT Ptah-CT commented May 13, 2026

Summary

End of the Qdrant-vs-Milvus parity work plus the Phase 3 standalone re-embed pipeline.

The branch carries:

  • Phase 1: QdrantCollectionBase (tenant-aware naming, payload indexes, query_points instead of removed search).
  • Phase 2: 6 collections + 6 converters covering EpisodicMemory, AtomicFact, Foresight, AgentCase, AgentSkill, UserProfile.
  • Phase 2.5: 6 @repository adapters with two-stage score gating, client.count(exact=True) for delete-by-filter, and tz-aware epoch helpers.
  • Phase 3: standalone migrate_milvus_to_qdrant CLI (Mongo -> OpenRouter -> Qdrant) plus the new re_embed_sweep wrapper that fans the workhorse over every active tenant database and every supported collection type.
  • Two rounds of CodeRabbit feedback addressed inline (30 findings on the first pass, 7 on the second).

The last two commits on this branch are the deploy-readiness items:

  • fix(qdrant): map Mongo ids via uuid5 in all 5 production converters — converters previously used id=str(source_doc.id) which Qdrant refuses (400 Bad Request: point id must be uint or RFC-4122 UUID). The Mongo id is now mapped through the same mongo_id_to_qdrant_id (uuid5 over a stable namespace) the standalone CLI already uses, and the raw Mongo id is persisted in the payload as mongo_id for round-trip lookup.
  • feat(qdrant): sweep wrapper for batch re-embed across tenants — iterates active databases x collection types and runs the migrate workhorse per non-empty pair.

Test plan

  • ruff check clean on changed files.
  • pytest green for methods/evermemos/tests/ (collection + converter + repository suites).
  • Smoke: standalone migrate writes a single document into a smoketest Qdrant collection (already exercised against a real Qdrant on the target host).
  • Sweep dry-run prints the expected (db, coll) pair plan and does not call OpenRouter.
  • Full sweep (after merge) writes the re-embedded points into Qdrant collections per tenant.

Out of scope / follow-ups

  • UserProfile reindex is intentionally not in the sweep wrapper — it splits one source document into many points and needs its own Phase 3.1 path.

Ptah-CT added 19 commits May 13, 2026 16:45
Phase 1 (skeleton) of the milvus -> qdrant migration. No runtime behavior
change yet — Milvus stays the default backend until cutover.

- README: prepend fork header documenting motivation, status, approach,
  and concept mapping. Links to the Qdrant migration guide.
- pyproject: add qdrant-client>=1.12,<2.
- src/core/oxm/qdrant/qdrant_collection_base.py: stub IndexConfig and
  QdrantCollectionBase. ensure_all() is a no-op so the lifespan provider
  can iterate registered subclasses without crashing during the skeleton
  phase. upsert/search/delete raise NotImplementedError (Phase 2).
- src/core/component/qdrant_client_factory.py: full QdrantClientFactory
  with env-driven get_qdrant_config(prefix=...), per-alias client caching,
  named clients, and graceful shutdown. https=Optional[bool] preserves
  qdrant-client's URL-scheme TLS detection; api_key=Optional[str] passes
  through cleanly without empty-string coercion. Registered as
  @component(primary=False) so the milvus factory remains the default
  until cutover.
- Empty __init__.py for new oxm/qdrant and tenants/.../oxm/qdrant
  packages.

Next: qdrant_lifespan.py (gated by VECTOR_STORE_BACKEND env flag) and
full collection-base impl.
…D env

Provides FastAPI lifespan startup/shutdown for the Qdrant adapter, analogous
to MilvusLifespanProvider but no-op unless VECTOR_STORE_BACKEND=qdrant. So
the Milvus backend stays the default at runtime until cutover.

On startup (when active):
- Resolves the 'qdrant_client_factory' DI bean.
- Collects all concrete QdrantCollectionBase subclasses via get_all_subclasses.
- Groups them by _DB_USING and ensures the client per group, then runs
  ensure_all() on each collection (currently a stub no-op; will create
  collections + payload indexes in Phase 1.2).

On shutdown: closes all cached Qdrant clients. Cleans the same app.state
attributes pattern as the milvus provider.

Order=19 sits between milvus_lifespan (18) and business_lifespan (20),
so during cutover both backends can briefly coexist.
Generic[QdrantCollectionType] ABC with an abstract @classmethod from_mongo()
that subclasses implement to convert Mongo source docs into Qdrant point
payloads (PointStruct or compatible dicts).

get_qdrant_model() introspects the Generic argument from __orig_bases__ so
the bound collection class can be retrieved at runtime — same pattern as
the Milvus base converter, which the search-repository layer relies on.
…query_points()

Replaces the Phase 1.1 stub with a full collection-management base class:

- IndexConfig as @DataClass: size, distance, on_disk, hnsw_m,
  hnsw_ef_construct, payload_indexes (dict field_name -> schema_type).
  to_vectors_config() builds the qdrant_client VectorParams + HnswConfigDiff.
- Module-level _DISTANCE_MAP and _PAYLOAD_SCHEMA_TYPE_MAP translate string
  configs to SDK enums — subclasses stay decoupled from the SDK.
- QdrantCollectionBase methods: client() (lazy DI lookup), exists(),
  count(), ensure_collection() (idempotent), ensure_payload_indexes()
  (idempotent), ensure_all(), upsert(), search() (qdrant-client
  query_points wrapper), delete(), drop().
- Compared to MilvusCollectionBase this is ~half the LOC because Qdrant
  has no alias mechanism — collection names are direct.

CRITICAL fix in the same commit: qdrant-client 1.16.1 (the version
resolved against our >=1.12,<2 pin) removed the legacy
QdrantClient.search method; only query_points is available now. The
search() wrapper here calls query_points(query=..., ...) and unwraps
QueryResponse.points so call sites still get List[ScoredPoint].

uv.lock is regenerated to include qdrant-client (1.16.1) and its
transitive deps (h2, hpack, hyperframe, portalocker).
…t naming

Adds the multi-tenancy adapter for Qdrant. Deliberately schlanker than the
Milvus counterpart (~270 LOC total vs ~750 LOC for milvus' tenant layer)
because Qdrant has no alias mechanism and no partition_key feature — multi-
tenancy is realized via collection-per-tenant naming alone.

src/core/tenants/tenantize/oxm/qdrant/config_utils.py:
- get_tenant_aware_collection_name(original_name): resolves the final
  Qdrant collection name from the active tenant context. Lookup order is
  storage_info['qdrant'] -> storage_info['milvus'] (migration bridge,
  reuses the same collection_prefix for both backends until per-tenant
  qdrant config is wired) -> base resource prefix fallback.
- get_qdrant_connection_cache_key(config): builds a stable factory cache
  key, hashing api_key fingerprints (8 hex chars) so the raw key never
  appears in the cache identifier.
- _load_qdrant_env(prefix): env-fallback loader for tenant-aware connection
  routing. Currently exported as private until the routing layer consumes
  it; documented to avoid dead-code flags.

src/core/tenants/tenantize/oxm/qdrant/tenant_aware_qdrant_collection_with_suffix.py:
- TenantAwareQdrantCollectionWithSuffix(QdrantCollectionBase): overrides
  the name property to return tenant-prefixed + optional explicit suffix.
  The collection base remains unchanged; concrete Phase-2 collections
  inherit from this class instead of QdrantCollectionBase directly.
- __init__(suffix) accepts explicit override or falls back to the
  SELF_QDRANT_COLLECTION_NS env-var.
- _MULTI_TENANT_STRATEGY ClassVar is informational; a future version may
  opt into Qdrant's native payload-partitioning instead of separate
  collections per tenant.
…ollectionBase

Wraps the sync qdrant-client API via asyncio.to_thread so the repository
surface stays async (parity with the milvus repository layer).

Methods:
- upsert(point) -> str (point id, parity with milvus 'insert' returning entity_id)
- upsert_batch(points) -> UpdateResult (full result, exposes wait-status)
- find_by_id(id) / find_by_ids(ids) -> Optional[Record] / List[Record]
- delete_by_id(id) -> bool; delete_batch(ids) -> UpdateResult
- search(query_vector, limit, query_filter, ...) -> List[ScoredPoint]
- count(exact=True) -> int
- collection: lazy-instantiated QdrantCollectionBase subclass
- get_model_name() -> str

Error-handling semantics mirror the milvus counterpart:
- upsert / upsert_batch / delete_batch / search → log + raise on failure
- find_by_id / find_by_ids / delete_by_id → log + return None / False
  (resilient read path)

Subclasses bind the generic parameter to the concrete collection model
class:

    class EpisodicMemoryRepository(
        BaseQdrantRepository[EpisodicMemoryCollection]
    ):
        def __init__(self):
            super().__init__(EpisodicMemoryCollection)
…+converter

First two of six Qdrant collections matching the Milvus search adapter layout
under src/infra_layer/adapters/out/search/. Subsequent commits add the
remaining four (agent_skill, foresight, atomic_fact, user_profile).

Collections (src/infra_layer/adapters/out/search/qdrant/memory/):
- episodic_memory_collection.py: TenantAwareQdrantCollectionWithSuffix
  subclass, base name v1_episodic_memory. Vector 1024-dim Cosine, HNSW
  m=16 ef_construct=200. Payload indexes: user_id/group_id/session_id/
  parent_id/parent_type/type (keyword) + timestamp (integer, epoch ms).
- agent_case_collection.py: base name v1_agent_case. Same vector params;
  payload indexes minus participants/type, plus timestamp as epoch
  seconds (parity with milvus converter).

Converters (src/infra_layer/adapters/out/search/qdrant/converter/):
- episodic_memory_qdrant_converter.py: BaseQdrantConverter[
  EpisodicMemoryCollection]. from_mongo() builds a PointStruct from a
  MongoDB v1_episodic_memories document. Handles missing optional fields
  (sender_ids, type, subject, summary) and serializes the search_content
  list to JSON for downstream search service consumption.
- agent_case_qdrant_converter.py: BaseQdrantConverter[AgentCaseCollection].
  Maps AgentCaseRecord -> PointStruct, truncates task_intent to 5000 chars
  (Milvus parity), uses epoch-seconds timestamp.

Qdrant is schema-flexible, so unlike the Milvus side there is no field
schema declaration — only the vector params and the explicit payload
indexes (the rest of the payload is whatever the converter writes).
…verter

Adds collections 3 and 4 of six. Same TenantAwareQdrantCollectionWithSuffix
+ BaseQdrantConverter pattern as batch 1.

Collections:
- agent_skill_collection.py: base name v1_agent_skill. Payload indexes for
  user_id/group_id/cluster_id (keyword) + maturity_score/confidence (float)
  for threshold range queries.
- foresight_collection.py: base name v1_foresight_record. Payload indexes
  for user_id/group_id/session_id/parent_id/parent_type/type (keyword) +
  start_time/end_time (integer, epoch ms).

Converters:
- agent_skill_qdrant_converter.py: AgentSkillRecord -> PointStruct. Builds
  the content payload from name + description, truncates to 5000 chars.
  Coerces optional maturity_score / confidence to 0.0 when absent (Qdrant
  silently excludes null-valued payloads from range filters, so treating
  'unscored' as 'lowest score' keeps them visible to threshold queries).
- foresight_qdrant_converter.py: ForesightRecord -> PointStruct. Time-field
  parser accepts datetime / ISO-8601 / numeric. **Diverges from Milvus
  template**: numeric inputs above 1e10 are treated as already-milliseconds
  rather than blindly multiplied by 1000 — the Milvus version corrupts
  already-ms inputs. content payload is intentionally passed verbatim
  (incl. None) for downstream sentinel semantics; documented inline.
…onverter (Phase 2 complete)

Adds the final two of six Qdrant collections — Phase 2 of the
Milvus->Qdrant migration is now structurally complete (6 collections + 6
converters under src/infra_layer/adapters/out/search/qdrant/).

Collections:
- atomic_fact_collection.py: base name v1_atomic_fact_record. Payload
  indexes for user_id/group_id/session_id/parent_id/parent_type/type
  (keyword) + timestamp (integer, epoch ms).
- user_profile_collection.py: base name v1_user_profile. No session_id
  (user-level aggregation). Payload indexes for user_id/group_id/scenario/
  item_type (all keyword).

Converters:
- atomic_fact_qdrant_converter.py: AtomicFactRecord -> PointStruct. Falls
  back to RawDataType.CONVERSATION when source.type is absent. exc_info
  on error log.
- user_profile_qdrant_converter.py: **diverges from the other converters'
  return type** — returns List[Dict[str, Any]] (one item per
  explicit_info / implicit_trait / user_goal entry) for parity with the
  Milvus counterpart. ProfileIndexer downstream wraps each item into a
  PointStruct after embedding. Module-level _EXPLICIT_FIELDS and
  _IMPLICIT_FIELDS constants (Milvus version had them inline). Carries
  '# type: ignore[override]' on from_mongo with docstring justification.

Phase 2 file inventory:
  src/infra_layer/adapters/out/search/qdrant/
  ├── __init__.py
  ├── memory/
  │   ├── __init__.py
  │   ├── episodic_memory_collection.py
  │   ├── agent_case_collection.py
  │   ├── agent_skill_collection.py
  │   ├── foresight_collection.py
  │   ├── atomic_fact_collection.py
  │   └── user_profile_collection.py
  └── converter/
      ├── __init__.py
      ├── episodic_memory_qdrant_converter.py
      ├── agent_case_qdrant_converter.py
      ├── agent_skill_qdrant_converter.py
      ├── foresight_qdrant_converter.py
      ├── atomic_fact_qdrant_converter.py
      └── user_profile_qdrant_converter.py
…ories

First 2 of 6 Qdrant repositories, mirroring the surface of the corresponding
Milvus repositories so the search-service layer can swap backends via the
VECTOR_STORE_BACKEND env flag.

Both repositories:
- Inherit from BaseQdrantRepository[<Collection>], registered as
  @repository(name='..._qdrant_repository', primary=False).
- Build filters as qmodels.Filter(must=[FieldCondition(...)]) using
  MatchValue / MatchAny / Range instead of Milvus' string expression
  syntax. Filter-construction is fully typed — no injection vector
  remains in the search path.
- Honour the MAGIC_ALL sentinel for user_id / group_id with the same
  semantics as the Milvus repositories.
- Use a two-stage score gating pattern (server-side score_threshold via
  Qdrant + client-side post-filter at the caller's hard threshold);
  the rationale is documented inline so future readers don't read it
  as a duplicated check.

AgentSkillQdrantRepository:
- vector_search() with maturity_threshold / confidence_threshold range
  filters plus optional cluster_id / group_ids.
- delete_by_cluster_id() — uses scroll() for a best-effort delete count
  (Qdrant's filter-based delete doesn't return one) then deletes via
  FilterSelector(filter=...).

EpisodicMemoryQdrantRepository:
- create_and_save_episodic_memory() — convenience constructor that builds
  a PointStruct and upserts, returns the same lightweight summary dict
  as the Milvus repository for caller parity.
- vector_search() with full scope + time-range filters.
- delete_by_filters() — batch delete by user_id/group_id/time-range;
  same MAGIC_ALL guard and 'at least one filter required' contract as
  the Milvus repository.
Resolves the full set of findings from a CodeRabbit code review on
feature/qdrant-adapter (1 critical, 10 major, 19 minor).

**Critical**
- episodic_memory_qdrant_repository.delete_by_filters: scroll with hard-
  coded limit=10_000 could undercount large tenants. Replaced with an
  exact client.count() call so the returned delete-count reflects the
  full set.

**Major**
- agent_skill_qdrant_repository.delete_by_cluster_id: same scroll-limit
  bug. Same fix (exact count + filter delete) and re-raise on error.
- episodic_memory_qdrant_repository.vector_search: two-stage score
  gating now uses min(radius, score_threshold) for server-side
  filtering so a wider radius doesn't accidentally make the server cut
  stricter than the client-side post-filter.
- AgentCase / AgentSkill / EpisodicMemory / Foresight / AtomicFact
  converters: explicit non-empty vector validation (raise instead of
  silently writing an empty list) and explicit id None-guard
  (no more str(None) -> 'None' point ids).
- foresight_qdrant_converter._parse_time_field: 'if not time_value'
  treated epoch 0 as missing. Now 'if time_value is None'.
- atomic_fact_qdrant_converter: _build_search_content is now actually
  written into the point payload (was dead code); vector access uses
  getattr defensive; type fallback only when source.type is None
  rather than any falsy value.
- base_repository.find_by_id / find_by_ids / delete_by_id: stop
  swallowing all exceptions. Errors are logged and re-raised so callers
  can distinguish 'not found' from operational failures. Behaviourally
  consistent with the rest of the base methods (upsert/delete_batch/
  search) that already raised on failure.
- qdrant_collection_base.drop: log + re-raise instead of swallow.
- config_utils._load_qdrant_env and qdrant_client_factory.get_qdrant_config:
  safe int(QDRANT_PORT) with try/except + TCP range guard (1-65535).
- qdrant_client_factory: URL assembly preserves an already-schemed host
  verbatim (e.g. 'https://my-qdrant.cloud') instead of force-prefixing
  http:// and double-appending the port.
- qdrant_client_factory.get_client: threading.Lock with double-checked
  locking eliminates the cache-miss race that could create duplicate
  QdrantClient instances under concurrent FastAPI requests.

**Minor**
- 6x collection docstrings: 'dim=1024' -> 'dim=VECTORIZE_DIMENSIONS'
  (no more drift if the constant changes).
- 2x ValueError messages: 'cannot be empty' -> 'cannot be None' to
  match the actual 'is None' guard.
- config_utils.get_qdrant_connection_cache_key: api_key.encode now
  tolerates bytes/str/other.
- agent_skill_qdrant_repository.vector_search: 'user_id is None' now
  skips the filter entirely instead of matching the empty string.
- qdrant_client_factory.get_named_client: cache key normalized via
  .lower() so 'Default'/'DEFAULT'/'default' share one client.
- qdrant_client_factory.get_qdrant_config: 'https_raw' now uses the
  _env helper consistently with the other env vars.
- 2x qdrant_collection_base: 'assert cfg is not None' replaced with
  explicit RuntimeError so the guard survives python -O.

Total: 17 files changed, ~30 distinct fixes.
Adds repositories 3 and 4 of 6. Both apply all CodeRabbit-derived patterns
established in the previous Phase 2.5 fix-pass:

- user_id/MAGIC_ALL guard skips the filter when None/empty (no spurious
  user_id == '' match).
- Two-stage score gating: server-side passes min(radius, score_threshold)
  so a wider radius cannot accidentally tighten the cut; client-side post-
  filter enforces the hard caller minimum.
- client.count(exact=True) for delete-by-filter return values (not a
  bounded scroll page).
- All error paths re-raise after a structured log.

AgentCaseQdrantRepository:
- vector_search() with scope (user_id / session_id / group_ids / parent_id)
  and time-range filters in epoch seconds (parity with the AgentCase
  converter and the Milvus repository — agent_case is the one collection
  storing seconds, not milliseconds).
- Returns a datetime for timestamp (round-tripped from epoch
  seconds with tz=UTC).

AtomicFactQdrantRepository:
- create_and_save_atomic_fact() convenience constructor: builds the
  PointStruct (with empty vector validation), upserts, returns the
  Milvus-shaped summary dict for caller parity.
- vector_search() with full scope + time-range filters in epoch ms.
- batch_vector_search_by_parent_ids(): MRAG-Phase-3 expansion path —
  MatchAny over parent_ids with total_limit = limit * len(parent_ids).
  Returns early with an empty list when no parent_ids are passed.
- delete_by_filters(): uses exact count + filter-based delete; raises on
  any operational error so callers can distinguish 'no points' from
  failure.
- All search paths return datetime for timestamp (consistent with
  create_and_save_atomic_fact, parity with agent_case).
…s (Phase 2.5 complete)

Final 2 of 6 Qdrant repositories. Phase 2.5 is now structurally complete:
all six adapters (agent_skill, episodic_memory, agent_case, atomic_fact,
foresight, user_profile) exist as Qdrant repositories matching their
Milvus counterparts.

All four established Phase-2.5 patterns applied:
- user_id/MAGIC_ALL guard skips the filter on None/empty
- Two-stage score gating: min(radius, score_threshold) server-side
- client.count(exact=True) for delete-by-filter return values
- Re-raise after structured log (no swallowed errors)

ForesightQdrantRepository:
- create_and_save_foresight_mem(): convenience constructor + upsert.
  session_id is now an explicit parameter (writes into payload) so the
  matching vector_search(session_id=...) filter actually hits — the
  Milvus repository signature lacked this parameter and produced a
  silent zero-hit filter.
- vector_search() with scope (user_id/group_ids/session_id/sender_id/
  parent_type/parent_id) + time-range filters.
  **Diverges from Milvus**: filters on start_time/end_time payload
  fields (semantically correct range overlap) instead of the Milvus
  repository's non-existent 'timestamp' field. Documented inline.
  sender_id filter uses Qdrant's element-wise MatchValue on the
  sender_ids array — equivalent to Milvus' array_contains.
- delete_by_filters() with the same start_time/end_time semantics.

UserProfileQdrantRepository:
- vector_search() with user_id/group_id/scenario scoping (no
  session_id — user_profile is user-level aggregation).
- delete_by_user_group(): count + filter-based delete, raises on
  operational error (consistent with the Phase 2.5 fix-pass on
  base_repository).
CodeRabbit pass 2 found 7 follow-up findings after pass 1. All addressed:

**Major**
- Timezone-naive datetimes in time-range filters silently used the local
  timezone for .timestamp() conversion, producing wrong epoch values.
  Added module-level helpers in base_repository: to_epoch_ms(dt) and
  to_epoch_s(dt) which coerce tz-naive datetimes to UTC. All five
  repositories now use these helpers consistently:
    - agent_case (epoch seconds via to_epoch_s)
    - atomic_fact / episodic_memory / foresight (epoch ms via to_epoch_ms)
    - user_profile (no time fields, untouched)

**Minor**
- foresight create_and_save_foresight_mem: start_time/end_time fall back
  to None instead of 0 when missing — 0 would silently match epoch-1970
  records. Documented inline.
- base_repository.count(): wrapped in try/except + structured log to
  match the rest of the async methods.
- atomic_fact create_and_save: vector validation now explicit None/empty
  check (`if vector is None or len(vector) == 0`) instead of `if not
  vector`, so a legitimate all-zero embedding is no longer falsy-rejected.
- user_profile_qdrant_repository.vector_search: group_id and scenario
  filters now mirror user_id's MAGIC_ALL guard (skip filter on MAGIC_ALL
  sentinel) instead of treating MAGIC_ALL as a literal value to match.

Note: the foresight repository's two-stage score-gating pattern was
flagged as 'redundant filtering'. It is intentional — server-side uses
the more permissive bound (radius widening) and the client-side post-
filter enforces the caller's hard cut. The behaviour is documented in
the inline comment block; the CodeRabbit finding is a false positive.
… Qdrant)

Adds devops_scripts/migrate_milvus_to_qdrant.py, the workhorse for the
Phase 3 cutover. Standalone: no EverOS DI container required.

Reads OpenRouter, Mongo, Qdrant config from env (auto-loads .env via
python-dotenv when present). Migrates one (mongo-db, mongo-collection)
pair to one Qdrant collection per invocation; shell-loop over the six
EverOS collection types × N tenants for the full sweep.

Defaults match the documented xinfty stack:
  - VECTORIZE_MODEL=qwen/qwen3-embedding-8b
  - VECTORIZE_DIMENSIONS=4096
  - OPENROUTER_BASE_URL=https://openrouter.ai/api/v1
  - MONGO_URI=mongodb://localhost:27017
  - QDRANT_HOST=localhost, QDRANT_PORT=6333

CLI args expose the per-collection variation:
  --text-field         primary text used for embedding
  --extra-text-fields  comma-separated secondary text fields
  --timestamp-field    + --timestamp-unit ms|s
  --payload-fields     comma-separated mongo fields projected to qdrant payload
  --batch-size         embedding batch size (default 32)
  --limit              cap for smoke tests
  --force              re-embed and overwrite existing points
  --dry-run            count without calling OpenRouter or Qdrant.upsert
  --log-level          DEBUG/INFO/WARNING/ERROR

Idempotent by default: client.retrieve filters out point ids that
already exist in the target Qdrant collection (skip path); --force
overwrites them.

Embedded behaviour:
  - extract_text concatenates text_field + extra_text_fields with newlines
  - build_payload projects payload_fields + normalizes timestamp via
    datetime.timestamp() (epoch ms or s depending on --timestamp-unit)
  - search_content is JSON-serialized from the text pieces, mirroring the
    converter's payload shape used by EverOS' search service.

ensure_qdrant_collection creates the target collection with the same
HNSW/Cosine config the EverOS adapter writes (m=16, ef_construct=200,
distance=Cosine) so the schema matches what the live service expects.
Qdrant only accepts unsigned-int or RFC-4122-UUID point ids; the Mongo
ObjectId hex (e.g. 69ed6acfaf31e5cd7977bc56) is neither and the live
pilot hit a 400 Bad Request from the Qdrant retrieve endpoint.

Fix:
- base_repository: add mongo_id_to_qdrant_id() helper that does
  str(uuid.uuid5(NAMESPACE, str(mongo_id))). Namespace is a fixed UUID
  embedded in code (must never change without a full re-migration).
- migrate_milvus_to_qdrant.py: use the helper for the Qdrant point id;
  keep the original Mongo id in the payload as 'mongo_id' for reverse
  lookup.

The Phase-2 converters (search/qdrant/converter/*) still use
str(source_doc.id) and would fail the same way the moment they go live.
That fix is the next commit.
ModuleNotFoundError: 'core' when invoked as
    python src/devops_scripts/migrate_milvus_to_qdrant.py ...

Fix: prepend the script's parent-of-parent (the EverOS src/ tree) to
sys.path before the core.oxm.qdrant.base_repository import. Lets the
script run without PYTHONPATH or pip install.
The Phase 2 converters used ``id=str(source_doc.id)`` directly when building
``PointStruct`` payloads. Qdrant only accepts unsigned integers or RFC-4122
UUIDs as point ids — a 24-hex-char Mongo ``ObjectId`` is neither, so the
Live-indexing path produced a 400 Bad Request on upsert.

The standalone re-embed CLI (commit c17ba60) already routed Mongo ids
through ``mongo_id_to_qdrant_id`` (uuid5 over a stable namespace), so the
two paths now agree:

- standalone migrate: Mongo doc -> uuid5 point id
- live converter:     Mongo doc -> uuid5 point id (this commit)

Both also persist the raw Mongo id in the payload as ``mongo_id`` for
round-trip lookup, idempotent re-embed, and debugging.

``user_profile_qdrant_converter`` is intentionally left untouched: it
emits multiple points per source doc (one per explicit_info/implicit_trait
entry), assigns fresh ObjectIds, and has its own Phase 3.1 path that needs
a separate point-id scheme.
Adds ``re_embed_sweep.py`` next to the standalone ``migrate_milvus_to_qdrant``
workhorse. The wrapper iterates every active (non-hyphen) Mongo database
times every supported collection type and invokes ``migrate(...)`` for each
non-empty pair.

Five collection types are covered:

- episodic_memory  (v1_episodic_memories  -> <prefix>_v1_episodic_memory)
- atomic_fact      (v1_atomic_fact_records -> <prefix>_v1_atomic_fact_record)
- foresight        (v1_foresight_records  -> <prefix>_v1_foresight_record)
- agent_case       (v1_agent_cases        -> <prefix>_v1_agent_case)
- agent_skill      (v1_agent_skills       -> <prefix>_v1_agent_skill)

``v1_user_profiles`` is deliberately excluded — it needs per-doc splitting
(one source doc -> many Qdrant points), handled by a separate Phase 3.1
script.

CLI shape mirrors the workhorse: ``--tenant``, ``--collection``,
``--batch-size``, ``--limit-per-pair``, ``--force``, ``--dry-run``,
``--log-level``. The wrapper imports ``migrate`` directly (no subprocess
fan-out) so config is read once and progress logs interleave naturally.
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 13, 2026

Review Change Stack

Warning

Rate limit exceeded

@Ptah-CT has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 38 minutes and 29 seconds before requesting another review.

You’ve run out of usage credits. Purchase more in the billing tab.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 57deed9b-1df1-4d7b-86ae-cf4d057d9c2b

📥 Commits

Reviewing files that changed from the base of the PR and between 4fe11a2 and ed457a6.

📒 Files selected for processing (13)
  • methods/evermemos/src/core/oxm/qdrant/qdrant_collection_base.py
  • methods/evermemos/src/devops_scripts/migrate_milvus_to_qdrant.py
  • methods/evermemos/src/devops_scripts/re_embed_sweep.py
  • methods/evermemos/src/infra_layer/adapters/out/search/qdrant/converter/agent_case_qdrant_converter.py
  • methods/evermemos/src/infra_layer/adapters/out/search/qdrant/converter/agent_skill_qdrant_converter.py
  • methods/evermemos/src/infra_layer/adapters/out/search/qdrant/converter/atomic_fact_qdrant_converter.py
  • methods/evermemos/src/infra_layer/adapters/out/search/qdrant/converter/episodic_memory_qdrant_converter.py
  • methods/evermemos/src/infra_layer/adapters/out/search/qdrant/converter/foresight_qdrant_converter.py
  • methods/evermemos/src/infra_layer/adapters/out/search/qdrant/converter/user_profile_qdrant_converter.py
  • methods/evermemos/src/infra_layer/adapters/out/search/repository/agent_case_qdrant_repository.py
  • methods/evermemos/src/infra_layer/adapters/out/search/repository/agent_skill_qdrant_repository.py
  • methods/evermemos/src/infra_layer/adapters/out/search/repository/atomic_fact_qdrant_repository.py
  • methods/evermemos/src/infra_layer/adapters/out/search/repository/episodic_memory_qdrant_repository.py
📝 Walkthrough

Überblick

Dieses PR implementiert eine umfassende Migration des Vektor-Backends von Milvus zu Qdrant mit Kern-Infrastruktur, Mandanten-Unterstützung, Datenmodell-Definitionen, Datenkonvertierern, Persistierungs-Repositories und DevOps-Migrationswerk­zeugen.

Änderungen

Qdrant-Migrations-Integration

Layer / Datei(en) Zusammenfassung
Kern-Infrastruktur und Client-Verwaltung
methods/evermemos/src/core/component/qdrant_client_factory.py, methods/evermemos/src/core/lifespan/qdrant_lifespan.py, methods/evermemos/src/core/oxm/qdrant/base_converter.py, methods/evermemos/src/core/oxm/qdrant/base_repository.py, methods/evermemos/src/core/oxm/qdrant/qdrant_collection_base.py
QdrantClientFactory mit Thread-sicherer Caching-Logik und umgebungssteuerte Konfiguration; QdrantLifespanProvider für bedingte FastAPI-Startup/Shutdown-Hooks basierend auf VECTOR_STORE_BACKEND; BaseQdrantConverter und BaseQdrantRepository als Abstraktionen; QdrantCollectionBase für Sammlungserstellung, Payload-Indizierung und Datenoperationen.
Mandanten-bewusste Konfiguration
methods/evermemos/src/core/tenants/tenantize/oxm/qdrant/config_utils.py, methods/evermemos/src/core/tenants/tenantize/oxm/qdrant/tenant_aware_qdrant_collection_with_suffix.py
Tenant-Kontext-Auflösung für Qdrant-Verbindungsparameter und Sammlungsnamen mit Fallback-Mechanismen; TenantAwareQdrantCollectionWithSuffix für optionales Suffix-Handling.
Sammlungs-Definitionen
methods/evermemos/src/infra_layer/adapters/out/search/qdrant/memory/*
Sechs spezialisierte Qdrant-Sammlungs-Definitionen (AgentCase, AgentSkill, AtomicFact, EpisodicMemory, Foresight, UserProfile) mit Vektorparametern, HNSW-Konfiguration, Payload-Indizierung und Tenant-Isolation.
Mongo-zu-Qdrant-Konverter
methods/evermemos/src/infra_layer/adapters/out/search/qdrant/converter/*
Sechs spezialisierte Konverter mit Validierung, Payload-Konstruktion, Zeitkonvertierung (Epoch-Millisekunden), Vektor-Erforderlichkeit und Fehlerprotokollierung.
Qdrant-Repositories
methods/evermemos/src/infra_layer/adapters/out/search/repository/*qdrant*
Sechs spezialisierte Repositories mit async CRUD-Methoden, Vektor-Ähnlichkeitssuche mit optionalen Filtern, Zwei-Stufen-Score-Gating (radius/score_threshold) und Batch-Delete-Operationen mit Thread-Offloading.
DevOps-Migrationswerk­zeuge
methods/evermemos/src/devops_scripts/migrate_milvus_to_qdrant.py, methods/evermemos/src/devops_scripts/re_embed_sweep.py
CLI-Skripte für Qdrant-Migrations-Orchestrierung: direkte Batch-Neubettungen mit OpenRouter-Embeddings und Idempotenz-Handling; mandanten­übergreifende Automations-Sweeps.
Dokumentation und Abhängigkeiten
README.md, methods/evermemos/pyproject.toml
Migrations-Übersicht mit Konzept-Mapping und Qdrant-Abhängigkeit.

Geschätzter Code-Review-Aufwand

🎯 4 (Komplex) | ⏱️ ~45 Minuten

🐰 Vector rabbits now hop through Qdrant's halls,
Where embeddings dance and collections call,
Milvus migrations fade to distant dreams,
As tenant-aware threads weave seamless streams,
The backend blooms with cosmic index delight!

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 77.23% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main objective: implementing a Qdrant adapter achieving Milvus parity and adding a standalone re-embed sweep pipeline.
Description check ✅ Passed The description comprehensively explains the changes across phases, addresses CodeRabbit feedback, and provides clear test/deployment guidance.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feature/qdrant-adapter

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 17

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@methods/evermemos/src/core/oxm/qdrant/qdrant_collection_base.py`:
- Around line 181-191: The exists() method currently catches all Exceptions and
masks real failures; change it to catch only the Qdrant client-specific
exception(s) (e.g., import and catch qdrant_client.exceptions.QdrantException or
the appropriate client Error/Exception class) around the call to
client().collection_exists(self.name), log and return False for those expected
"collection missing" errors using the existing logger.warning, and allow any
other unexpected exceptions to propagate (i.e., do not use a bare except
Exception). Ensure the except block references the specific exception type and
leave other errors unhandled so they surface.

In `@methods/evermemos/src/devops_scripts/migrate_milvus_to_qdrant.py`:
- Around line 266-271: The MongoClient and QdrantClient created in the migration
(instances named mongo and qdrant) are not closed and can leak connections;
update the migration code that constructs MongoClient(config.mongo_uri) and
QdrantClient(host=config.qdrant_host, port=config.qdrant_port) to use context
managers or a try/finally so both clients are explicitly closed (e.g., with
mongo.close() and qdrant.close() in the finally block) after the migration work
completes; if the OpenAI client (OpenAI(...)) requires cleanup, include it in
the same scope so all three clients are closed reliably even on exceptions.
- Around line 175-186: The timestamp normalization block handling
timestamp_field should log a warning when the field exists but ts_value is
neither a datetime (has .timestamp) nor an int/float so values aren't silently
dropped; update the code around the timestamp normalization (references:
timestamp_field, ts_value, timestamp_unit, payload, doc) to detect the
unexpected type and emit a warning via the module logger (e.g., logging.warning
or the file's existing logger) including the document id or identifying info and
the actual type/value, then skip or set a sensible default as before.

In `@methods/evermemos/src/devops_scripts/re_embed_sweep.py`:
- Around line 258-263: Replace the logger.error call in the except Exception as
e block with logger.exception so the traceback is logged automatically; keep the
existing message and variables (db, spec.mongo_collection, qdrant_coll, e) and
still increment pairs_failed—i.e., inside the except Exception as e handler use
logger.exception(...) instead of logger.error(...) to include the full stack
trace.

In
`@methods/evermemos/src/infra_layer/adapters/out/search/qdrant/converter/agent_case_qdrant_converter.py`:
- Around line 73-77: Replace the logger.error call in the exception handler
inside the AgentCaseRecord -> Qdrant point converter with logger.exception so
the full stack trace is recorded; keep the existing message text (e.g., "Failed
to convert AgentCaseRecord to Qdrant point") and continue re-raising the
exception so behavior doesn't change (look for the except block in
agent_case_qdrant_converter.py that catches Exception as e during the conversion
of AgentCaseRecord to a Qdrant point and update logger.error(...) to
logger.exception(...)).

In
`@methods/evermemos/src/infra_layer/adapters/out/search/qdrant/converter/agent_skill_qdrant_converter.py`:
- Around line 83-87: In der except-Block in agent_skill_qdrant_converter
(während der Konvertierung von AgentSkillRecord zu Qdrant point) ersetze den
Aufruf logger.error(...) durch logger.exception(...) und bewahre die bestehende
Meldung ("Failed to convert AgentSkillRecord to Qdrant point: %s") so bei, damit
der Stack-Trace automatisch geloggt wird; stelle sicher, dass das
Exception-Objekt weiterhin übergeben oder in der Message referenziert wird und
dass anschließend wie bisher die Exception erneut geworfen wird (raise).

In
`@methods/evermemos/src/infra_layer/adapters/out/search/qdrant/converter/atomic_fact_qdrant_converter.py`:
- Around line 91-97: In the except block that catches exceptions while
converting a MongoDB AtomicFact to a Qdrant point (the conversion routine in
atomic_fact_qdrant_converter.py / the AtomicFact -> Qdrant conversion function),
replace logger.error(..., exc_info=True) with the idiomatic
logger.exception(...) call so the exception is logged correctly and concisely;
keep the same message text but call logger.exception("Failed to convert MongoDB
AtomicFact to Qdrant point: %s", e) (or simply logger.exception("Failed to
convert MongoDB AtomicFact to Qdrant point")) inside that except block.

In
`@methods/evermemos/src/infra_layer/adapters/out/search/qdrant/converter/episodic_memory_qdrant_converter.py`:
- Around line 108-110: In the except Exception as e block inside
episodic_memory_qdrant_converter.py where you currently call
logger.error("Failed to convert MongoDB document to Qdrant point: %s", e),
replace that with logger.exception(...) so the stack trace is logged
automatically; keep the same descriptive message text to remain consistent with
the other converters and ensure the exception context is captured when the
conversion function (the try/except surrounding the MongoDB-to-Qdrant
conversion) fails.

In
`@methods/evermemos/src/infra_layer/adapters/out/search/qdrant/converter/foresight_qdrant_converter.py`:
- Around line 139-144: In the except block inside foresight_qdrant_converter
(the converter that transforms MongoDB foresight documents to Qdrant points),
replace the logger.error(...) call with logger.exception(...) so the stacktrace
is logged consistently with other converters; keep the same message text, pass
the exception info via logger.exception, and then re-raise the exception as
currently done.

In
`@methods/evermemos/src/infra_layer/adapters/out/search/qdrant/converter/user_profile_qdrant_converter.py`:
- Around line 164-170: Replace the current exception logging call in the except
block of the MongoDB→Qdrant conversion (the block that logs "Failed to convert
MongoDB UserProfile to Qdrant items") to use logger.exception(...) instead of
logger.error(..., exc_info=True); keep the same message text and re-raise the
exception as before so stacktrace is logged idiomatically by logger.exception in
the converter module (user_profile_qdrant_converter.py).

In
`@methods/evermemos/src/infra_layer/adapters/out/search/repository/agent_case_qdrant_repository.py`:
- Around line 152-154: In der Except-Behandlung des AgentCase Qdrant-Suchpfads
(der Block mit "except Exception as e" und der aktuellen Aufrufzeile
logger.error("AgentCase Qdrant search failed: %s", e)) ersetze logger.error
durch logger.exception, damit der Stacktrace und Kontext automatisch geloggt
werden; belasse die vorhandene Nachricht ("AgentCase Qdrant search failed") und
entferne das explizite %-formatieren des Exceptions-Objekts, sodass
logger.exception die Fehlermeldung inklusive Traceback schreibt.

In
`@methods/evermemos/src/infra_layer/adapters/out/search/repository/agent_skill_qdrant_repository.py`:
- Around line 170-172: Die Exception-Logging-Zeile in der except-Block der
Qdrant-Suche sollte konsistent zu den anderen Repositories geändert werden:
ersetze den Aufruf von logger.error("AgentSkill Qdrant search failed: %s", e)
durch logger.exception(...) im selben except-Block in
agent_skill_qdrant_repository.py (die Stelle, die die Qdrant-Suche umgibt),
damit Stacktrace und Kontext geloggt werden; lasse das anschließende raise
unverändert, um die ursprüngliche Exception weiterzugeben.
- Around line 231-238: In the exception handler inside
AgentSkillQdrantRepository (the except Exception as e block that currently calls
logger.error("Failed to delete Qdrant points for cluster=%s: %s", cluster_id,
e)), replace logger.error with logger.exception so the full stack trace is
logged; keep the same formatted message and arguments and continue to re-raise
the exception so behavior remains unchanged.
- Around line 124-147: The AgentSkill implementation computes
effective_threshold differently from AgentCase, causing inconsistent search
behavior; update the AgentSkill calculation (the variable effective_threshold
used before the self.search(...) call) to mirror AgentCase by choosing the more
permissive (lower) threshold between radius and score_threshold: compute
effective_threshold = min(r for r in (radius, score_threshold) if r is not None)
while handling None and negative radius (treat negative as unset), then pass
score_threshold=(effective_threshold if effective_threshold > 0 else None) into
search; keep the rest of the search call (query_vector, limit, query_filter,
with_payload, with_vectors, search_params) unchanged.

In
`@methods/evermemos/src/infra_layer/adapters/out/search/repository/atomic_fact_qdrant_repository.py`:
- Around line 215-231: The vector-search result mapping is missing the
"atomic_fact" field, causing inconsistency with the persisted model (created in
create_and_save_atomic_fact) and the batch path; update the dictionary returned
in the single-result path inside atomic_fact_qdrant_repository.py to include
"atomic_fact": payload.get("atomic_fact") (or the equivalent source used in the
batch path) alongside the other payload fields so callers receive the same model
shape as in the batch and create paths.

In
`@methods/evermemos/src/infra_layer/adapters/out/search/repository/episodic_memory_qdrant_repository.py`:
- Around line 124-130: The code in vector_search is incorrectly adding a user_id
filter when user_id is None, causing a filter for an empty string; change the
guard so the FieldCondition is only appended when user_id is explicitly provided
(i.e., user_id is not None) and not equal to MAGIC_ALL. Update the condition
around qmodels.FieldCondition (the block creating FieldCondition with
key="user_id" and match=MatchValue) to check both user_id is not None and
user_id != MAGIC_ALL before adding it.
- Around line 67-84: Before calling self.upsert with qmodels.PointStruct,
validate the vector variable is not None and (for lists/iterables) non-empty; if
the vector is invalid, raise a clear ValueError or return an error instead of
proceeding to upsert. Add the guard immediately before the self.upsert(...) call
in episodic_memory_qdrant_repository (check the vector variable used to
construct qmodels.PointStruct), and ensure any thrown error includes identifying
context (id, user_id or event_type) to aid debugging.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 28d66b57-c0f7-4bc9-b767-c55a2930358e

📥 Commits

Reviewing files that changed from the base of the PR and between 29d555c and 4fe11a2.

⛔ Files ignored due to path filters (1)
  • methods/evermemos/uv.lock is excluded by !**/*.lock
📒 Files selected for processing (34)
  • README.md
  • methods/evermemos/pyproject.toml
  • methods/evermemos/src/core/component/qdrant_client_factory.py
  • methods/evermemos/src/core/lifespan/qdrant_lifespan.py
  • methods/evermemos/src/core/oxm/qdrant/__init__.py
  • methods/evermemos/src/core/oxm/qdrant/base_converter.py
  • methods/evermemos/src/core/oxm/qdrant/base_repository.py
  • methods/evermemos/src/core/oxm/qdrant/qdrant_collection_base.py
  • methods/evermemos/src/core/tenants/tenantize/oxm/qdrant/__init__.py
  • methods/evermemos/src/core/tenants/tenantize/oxm/qdrant/config_utils.py
  • methods/evermemos/src/core/tenants/tenantize/oxm/qdrant/tenant_aware_qdrant_collection_with_suffix.py
  • methods/evermemos/src/devops_scripts/migrate_milvus_to_qdrant.py
  • methods/evermemos/src/devops_scripts/re_embed_sweep.py
  • methods/evermemos/src/infra_layer/adapters/out/search/qdrant/__init__.py
  • methods/evermemos/src/infra_layer/adapters/out/search/qdrant/converter/__init__.py
  • methods/evermemos/src/infra_layer/adapters/out/search/qdrant/converter/agent_case_qdrant_converter.py
  • methods/evermemos/src/infra_layer/adapters/out/search/qdrant/converter/agent_skill_qdrant_converter.py
  • methods/evermemos/src/infra_layer/adapters/out/search/qdrant/converter/atomic_fact_qdrant_converter.py
  • methods/evermemos/src/infra_layer/adapters/out/search/qdrant/converter/episodic_memory_qdrant_converter.py
  • methods/evermemos/src/infra_layer/adapters/out/search/qdrant/converter/foresight_qdrant_converter.py
  • methods/evermemos/src/infra_layer/adapters/out/search/qdrant/converter/user_profile_qdrant_converter.py
  • methods/evermemos/src/infra_layer/adapters/out/search/qdrant/memory/__init__.py
  • methods/evermemos/src/infra_layer/adapters/out/search/qdrant/memory/agent_case_collection.py
  • methods/evermemos/src/infra_layer/adapters/out/search/qdrant/memory/agent_skill_collection.py
  • methods/evermemos/src/infra_layer/adapters/out/search/qdrant/memory/atomic_fact_collection.py
  • methods/evermemos/src/infra_layer/adapters/out/search/qdrant/memory/episodic_memory_collection.py
  • methods/evermemos/src/infra_layer/adapters/out/search/qdrant/memory/foresight_collection.py
  • methods/evermemos/src/infra_layer/adapters/out/search/qdrant/memory/user_profile_collection.py
  • methods/evermemos/src/infra_layer/adapters/out/search/repository/agent_case_qdrant_repository.py
  • methods/evermemos/src/infra_layer/adapters/out/search/repository/agent_skill_qdrant_repository.py
  • methods/evermemos/src/infra_layer/adapters/out/search/repository/atomic_fact_qdrant_repository.py
  • methods/evermemos/src/infra_layer/adapters/out/search/repository/episodic_memory_qdrant_repository.py
  • methods/evermemos/src/infra_layer/adapters/out/search/repository/foresight_qdrant_repository.py
  • methods/evermemos/src/infra_layer/adapters/out/search/repository/user_profile_qdrant_repository.py

Comment thread methods/evermemos/src/core/oxm/qdrant/qdrant_collection_base.py
Comment thread methods/evermemos/src/devops_scripts/migrate_milvus_to_qdrant.py
Comment thread methods/evermemos/src/devops_scripts/migrate_milvus_to_qdrant.py Outdated
Comment thread methods/evermemos/src/devops_scripts/re_embed_sweep.py
@XInfty XInfty deleted a comment from qodo-code-review Bot May 13, 2026
Critical
- ``migrate_milvus_to_qdrant`` now wraps the migration body in try/finally
  and closes ``mongo``, ``qdrant``, and ``openai`` clients explicitly. Long
  sweeps previously leaked connections on every per-pair invocation.

Major
- ``QdrantCollectionBase.exists`` only catches the qdrant-client transport
  exceptions (``ResponseHandlingException``, ``UnexpectedResponse``); other
  failures propagate so infrastructure issues stay visible instead of being
  silently treated as "collection missing".
- ``EpisodicMemoryQdrantRepository.create_and_save_episodic_memory`` rejects
  missing/empty vectors up front with a ``ValueError`` (mirrors the converter
  contract) instead of bubbling up a confusing 400 from Qdrant.
- ``EpisodicMemoryQdrantRepository.vector_search`` no longer treats the
  default ``user_id=None`` as a filter on the empty string. The condition
  now requires the caller to have provided an explicit (non-None,
  non-MAGIC_ALL) value, restoring full-recall behaviour for the unscoped
  search path.
- ``AtomicFactQdrantRepository.vector_search`` now returns ``atomic_fact``
  in the result dict (parity with the batch path and with the persisted
  payload from ``create_and_save_atomic_fact``).

Minor
- ``AgentSkillQdrantRepository.vector_search`` two-stage threshold now uses
  ``min(radius, score_threshold)`` server-side, matching the agent_case
  repository's "more permissive lower bound" semantics.

Nitpick
- ``logger.error`` in every except block touched by Phase 2/3 swapped for
  ``logger.exception`` (10 sites across converters, repositories, and the
  sweep wrapper) so the stack trace is always logged.
- ``build_payload`` now logs a warning when a timestamp field has an
  unexpected type instead of silently dropping the value.
@Ptah-CT
Copy link
Copy Markdown
Author

Ptah-CT commented May 13, 2026

@coderabbitai review

All 17 inline findings from the first review have been addressed in commit ed457a6:

  • Critical (1/1): migrate workhorse closes Mongo/Qdrant/OpenAI clients via try/finally.
  • Major (4/4): exists() catches only Qdrant transport exceptions; EpisodicMemory upsert rejects empty vectors; vector_search no longer filters on empty user_id when None is passed; AtomicFact single-search returns atomic_fact field.
  • Minor (1/1): AgentSkill effective_threshold uses min(radius, score_threshold) for parity with AgentCase.
  • Nitpick (11/11): logger.error → logger.exception across converters/repositories/sweep; timestamp-warning added in build_payload.

Please re-run the review on this branch.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 13, 2026

@Ptah-CT Thank you for the detailed summary of all 17 addressed findings! I'll kick off a fresh review of the branch now, covering the fixes in commit ed457a6 and the full adapter implementation.

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@DerAuctor
Copy link
Copy Markdown

@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: ed457a6e17

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

oder ``None`` falls kein Tenant aktiv.
"""
# Lazy import vermeidet Circular-Dependency bei Adapter-Discovery-Time.
from core.tenants.tenantize.tenant_context import get_current_tenant
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Import the actual tenant context helper

In tenant-aware Qdrant mode this import points at a module that does not exist in this tree (rg "def get_current_tenant" only finds core/tenants/tenant_contextvar.py). get_tenant_aware_collection_name() catches the resulting exception and falls back to the base resource prefix, so every tenant ignores its configured storage_info["qdrant"].collection_prefix/Milvus prefix and resolves to the same non-tenant collection name.

Useful? React with 👍 / 👎.

Comment on lines +101 to +103
if self._collection is None:
self._collection = self.model()
return self._collection
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid caching tenant-scoped collections in singleton repos

When these Qdrant repositories are used under multiple tenant contexts, this property freezes the collection instance created by the first request; the repository decorators keep the default singleton scope, and TenantAwareQdrantCollectionWithSuffix resolves the tenant-prefixed name in its constructor. After the first tenant touches the repo, later tenants will keep searching/upserting the first tenant's Qdrant collection instead of resolving their own context.

Useful? React with 👍 / 👎.

Comment on lines +92 to +94
await self.upsert(
qmodels.PointStruct(id=id, vector=vector, payload=payload)
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Translate caller ids before constructing Qdrant points

The Milvus-parity create_and_save_* entry points receive the existing Mongo-style ids, but this path passes id directly to Qdrant instead of using the new mongo_id_to_qdrant_id() helper. For the normal 24-hex Mongo ObjectId strings, Qdrant rejects point ids because they must be unsigned integers or UUIDs, so direct writes through this Qdrant repository fail even though the migration converters handle the same ids correctly.

Useful? React with 👍 / 👎.

logger = get_logger(__name__)


@repository("episodic_memory_qdrant_repository", primary=False)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Route repository users through the backend flag

With VECTOR_STORE_BACKEND=qdrant, these new repositories still are not selected by the application: they are registered only under Qdrant-specific bean names with primary=False, while the inspected callers still import/resolve concrete Milvus classes (for example service/memcell_delete_service.py injects *MilvusRepository, and memory_layer/profile_indexer/profile_indexer.py calls get_bean_by_type(UserProfileMilvusRepository)). As a result, business reads/writes/deletes continue to hit Milvus even when the Qdrant lifespan is enabled.

Useful? React with 👍 / 👎.

# Order 19: zwischen milvus_lifespan (18) und business_lifespan (20). So
# laufen beide Vector-Backends initialisiert (im Cutover-Fall), und
# business-Logik startet erst danach.
_QDRANT_LIFESPAN_ORDER = 19
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Do not start Milvus before Qdrant cutover

In the standard app path (app.py uses create_auto_lifespan()), all registered lifespan providers are started by order; because this provider is explicitly ordered after milvus_lifespan and MilvusLifespanProvider.startup() has no VECTOR_STORE_BACKEND guard, setting VECTOR_STORE_BACKEND=qdrant still attempts to connect to Milvus first. In the deployment this migration is meant to unblock, an unavailable/crash-looping Milvus instance will still fail startup before Qdrant can serve traffic.

Useful? React with 👍 / 👎.

vectorize_model=os.environ.get(
"VECTORIZE_MODEL", "qwen/qwen3-embedding-8b"
),
vectorize_dimensions=int(os.environ.get("VECTORIZE_DIMENSIONS", "4096")),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Align migration vector dimensions with runtime defaults

If operators run the standalone migration without explicitly setting VECTORIZE_DIMENSIONS, it creates/embeds Qdrant points at 4096 dimensions, while the runtime collection classes use memory_layer.constants.VECTORIZE_DIMENSIONS, whose default is 1024. That default mismatch either makes the migration fail against collections already created by the service, or creates 4096-dimensional collections that later reject the service's 1024-dimensional query/upsert vectors.

Useful? React with 👍 / 👎.

payload_fields=(
"user_id", "group_id", "session_id",
"participants", "sender_ids", "type",
"start_time", "end_time", "duration_days",
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Normalize migrated foresight end times

For the sweep's foresight migration, only start_time is passed as timestamp_field, so build_payload() normalizes start_time but leaves the copied end_time value in its raw Mongo form. The Qdrant foresight repository and collection treat end_time as epoch milliseconds and create an integer payload index for it, so migrated foresight records with datetime/string end_time values will not match end_time range filters and may fail indexing.

Useful? React with 👍 / 👎.

Comment on lines +180 to +181
factory = get_bean("qdrant_client_factory")
return factory.get_named_client(self.using)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Resolve tenant-specific Qdrant clients

In deployments where tenant storage supplies a dedicated Qdrant endpoint (storage_info["qdrant"] includes host/port/API key), this client lookup never consults that tenant config and always asks the factory for the static _DB_USING alias, normally default. The new config helper even documents a tenant connection cache key, but nothing uses it here, so tenants configured for separate Qdrant clusters still read/write the default cluster.

Useful? React with 👍 / 👎.

Comment on lines +99 to +100
collection = collection_class()
collection.ensure_all()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Initialize tenant collections on demand

For multi-tenant Qdrant deployments, startup runs without each request's tenant context, so constructing TenantAwareQdrantCollectionWithSuffix here only ensures the no-tenant/base-prefixed collection. Later requests resolve tenant-prefixed collection names, but the repository methods go straight to upsert/search/delete without ensure_all(), so a tenant whose collection was not pre-created by the migration scripts will hit Qdrant collection not found instead of being created like the Milvus tenant-aware path.

Useful? React with 👍 / 👎.

Comment on lines +98 to +100
api_key = os.environ.get("OPENROUTER_API_KEY", "").strip()
if not api_key:
raise SystemExit("OPENROUTER_API_KEY is required (env or .env)")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P3 Badge Let dry runs run without an embedding API key

The documented smoke path re_embed_sweep.py --dry-run reaches Config.from_env() before it knows no embeddings will be requested, so this unconditional check exits unless OPENROUTER_API_KEY is set. That makes a dry-run unable to inspect Mongo/Qdrant migration coverage in environments where operators intentionally have not provisioned embedding credentials yet.

Useful? React with 👍 / 👎.

@Ptah-CT
Copy link
Copy Markdown
Author

Ptah-CT commented May 13, 2026

Superseded by a rebased branch on top of current main.

The upstream had renamed methods/evermemos/methods/EverCore/ and added OpenHer use-cases; this branch was rebased with the path rewrite applied to all 20 commits (feature/qdrant-adapterqdrant/rebase-evercore). All 17 CodeRabbit findings from this PR are carried over verbatim. New PR: plus `re_embed_sweep` wrapper.

  • 3 rounds of CodeRabbit feedback addressed: 30 + 7 + 17 findings.

Test plan

  • `ruff check` clean on changed files.
  • `pytest` green for `methods/EverCore/tests/` (collection + converter + repository suites).
  • Smoke: standalone migrate writes a single document into a smoketest Qdrant collection (verified against a real Qdrant pre-rebase).
  • Sweep dry-run prints the expected (db, coll) pair plan and does not call OpenRouter.
  • Full sweep (after merge) writes the re-embedded points into Qdrant collections per tenant.

Out of scope

  • UserProfile reindex needs its own Phase 3.1 path (splits one Mongo doc into many Qdrant points).
    ")

@Ptah-CT
Copy link
Copy Markdown
Author

Ptah-CT commented May 13, 2026

Replacement PR: #2

DerAuctor pushed a commit that referenced this pull request May 13, 2026
…ed on EverCore path) (#2)

## Summary

Rebase of the original Qdrant-adapter branch on top of current ``main``,
after upstream renamed ``methods/evermemos/`` → ``methods/EverCore/``
and added OpenHer use-cases. All 20 commits preserved, path rewritten
via ``git format-patch`` + sed + ``git am --3way``. ``README.md``
auto-merged.

Supersedes #1, which carried the same code on the pre-rename path.

The branch carries:

- **Phase 1**: ``QdrantCollectionBase`` (tenant-aware naming, payload
indexes, ``query_points`` instead of removed ``search``).
- **Phase 2**: 6 collections + 6 converters covering ``EpisodicMemory``,
``AtomicFact``, ``Foresight``, ``AgentCase``, ``AgentSkill``,
``UserProfile``.
- **Phase 2.5**: 6 ``@repository`` adapters with two-stage score gating,
``client.count(exact=True)`` for delete-by-filter, and tz-aware epoch
helpers.
- **Phase 3**: standalone ``migrate_milvus_to_qdrant`` CLI (Mongo →
OpenRouter → Qdrant) plus the ``re_embed_sweep`` wrapper that fans the
workhorse over every active tenant database.
- **3 rounds of bot feedback** addressed inline: 30 (CodeRabbit pass-1)
+ 7 (CodeRabbit pass-2) + 17 (CodeRabbit on #1, plus a 👍-equivalent from
Codex).

## Test plan

- [ ] ``ruff check`` clean on changed files.
- [ ] ``pytest`` green for ``methods/EverCore/tests/`` (collection +
converter + repository suites).
- [ ] Smoke: standalone migrate writes a single document into a
smoketest Qdrant collection (verified against a real Qdrant pre-rebase).
- [ ] Sweep dry-run prints the expected (db, coll) pair plan and does
not call OpenRouter.
- [ ] Full sweep (after merge) writes the re-embedded points into Qdrant
collections per tenant.

## Out of scope / follow-ups

- ``UserProfile`` reindex is intentionally not in the sweep wrapper — it
splits one source document into many points and needs its own Phase 3.1
path.

---------

Co-authored-by: Ptah-CT <221234802+Ptah-CT@users.noreply.github.com>
Ptah-CT added a commit that referenced this pull request May 15, 2026
…Lock

CodeRabbit finding #3: _ensure_session() could be entered concurrently
by multiple coroutines (e.g. parallel rerank batches via gather), each
seeing self.session is None and creating its own aiohttp.ClientSession.
The losers of the race were never tracked and leaked.

Add self._session_lock = asyncio.Lock() in __init__ and use a
double-checked pattern inside _ensure_session: a fast path when the
session is already alive, a lock-protected slow path with re-check
before instantiation.

Findings #1 (Voyage model default) and #2 (raw query in debug log) are
intentionally deferred — production .env always sets RERANK_MODEL and
DEBUG-level logs are not enabled by default in this environment.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants