From 7804ba3e19623c94ed980ddab2e500869ecd7d4e Mon Sep 17 00:00:00 2001 From: Ptah-CT <221234802+Ptah-CT@users.noreply.github.com> Date: Fri, 15 May 2026 16:43:40 +0000 Subject: [PATCH 1/3] fix(everos): restore voyage rerank provider + timezone import after EverCore rename MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two defects survived the methods/evermemos/ → methods/EverCore/ rename and made retrieval return empty results (HTTP 200 with episodes=[]): 1. rerank_voyage.py was dropped on rename — RERANK_PROVIDER=voyage in .env then raises "Unsupported provider: voyage" in the factory. Rewritten from scratch (252 LOC) using rerank_deepinfra.py as a template, with Voyage-specific request shape (plain query/documents, no Qwen wrapping) and response mapping (data: [{index, relevance_score}] → standard {results: [{index, score, rank}]}). 2. episodic_memory_qdrant_repository.py used timezone.utc in datetime.fromtimestamp() but only imported `datetime`. NameError on every vector search. One-line fix in the import statement. 3. rerank_service.py factory branch for "voyage" added between deepinfra and the raise. Defaults base_url to https://api.voyageai.com/v1/rerank when env override is empty. Verified live: 60 ES candidates → Qdrant vector_search 12 ms → Voyage rerank 15 results @ 0.4492 top score → 15 episodes returned in 5050 ms. Backups on database: - rerank_service.py.bak.1779028699 - episodic_memory_qdrant_repository.py.bak.1779028653 - .env*.bak.1779028576 (env LLM_BASE_URL drift; not in scope for this repo) --- .../src/agentic_layer/rerank_service.py | 12 + .../src/agentic_layer/rerank_voyage.py | 252 ++++++++++++++++++ .../episodic_memory_qdrant_repository.py | 2 +- 3 files changed, 265 insertions(+), 1 deletion(-) create mode 100644 methods/EverCore/src/agentic_layer/rerank_voyage.py diff --git a/methods/EverCore/src/agentic_layer/rerank_service.py b/methods/EverCore/src/agentic_layer/rerank_service.py index e668e8c2..e51fe4cf 100644 --- a/methods/EverCore/src/agentic_layer/rerank_service.py +++ b/methods/EverCore/src/agentic_layer/rerank_service.py @@ -30,6 +30,7 @@ ) from agentic_layer.rerank_vllm import VllmRerankService, VllmRerankConfig from agentic_layer.rerank_deepinfra import DeepInfraRerankService, DeepInfraRerankConfig +from agentic_layer.rerank_voyage import VoyageRerankService, VoyageRerankConfig from agentic_layer.metrics.rerank_metrics import ( record_rerank_request, record_rerank_fallback, @@ -173,6 +174,17 @@ def _create_service_from_config( max_concurrent_requests=max_concurrent, ) return DeepInfraRerankService(config) + elif provider.lower() == "voyage": + config = VoyageRerankConfig( + api_key=api_key, + base_url=base_url or 'https://api.voyageai.com/v1/rerank', + model=model, + timeout=timeout, + max_retries=max_retries, + batch_size=batch_size, + max_concurrent_requests=max_concurrent, + ) + return VoyageRerankService(config) else: raise RerankError(f"Unsupported provider: {provider}") diff --git a/methods/EverCore/src/agentic_layer/rerank_voyage.py b/methods/EverCore/src/agentic_layer/rerank_voyage.py new file mode 100644 index 00000000..6a2dbb6d --- /dev/null +++ b/methods/EverCore/src/agentic_layer/rerank_voyage.py @@ -0,0 +1,252 @@ +""" +Voyage AI Rerank Service Implementation + +Reranking service using Voyage AI commercial API (rerank-2.5). + +Voyage's API differs from DeepInfra/vLLM in two ways: +- POST to a fixed endpoint (no model-suffix path). +- Request shape: ``{"query", "documents", "model"}`` with plain strings + (no Qwen ``<|im_start|>`` template wrapping). +- Response shape: ``{"data": [{"index", "relevance_score"}], "usage": {...}}`` + (DeepInfra/Cohere return ``results``; Voyage returns ``data``). + +The service normalises Voyage's response into the EverOS standard +``{"results": [{"index", "score", "rank"}]}`` shape so callers don't care +which backend produced the scores. +""" + +import asyncio +import aiohttp +import logging +from typing import List, Dict, Any, Optional +from dataclasses import dataclass + +from agentic_layer.rerank_interface import ( + RerankServiceInterface, + RerankError, + extract_text_from_hit, +) +from core.di.utils import get_bean_by_type +from core.component.token_usage_collector import TokenUsageCollector + +logger = logging.getLogger(__name__) + + +@dataclass +class VoyageRerankConfig: + """Voyage rerank service configuration""" + + api_key: str = "" # skip-sensitive-check + base_url: str = "https://api.voyageai.com/v1/rerank" + model: str = "rerank-2.5" + timeout: int = 30 + max_retries: int = 3 + batch_size: int = 100 # Voyage accepts up to 1000 docs/request + max_concurrent_requests: int = 5 + + +class VoyageRerankService(RerankServiceInterface): + """Voyage AI reranking service implementation""" + + def __init__(self, config: Optional[VoyageRerankConfig] = None): + if config is None: + config = VoyageRerankConfig() + + self.config = config + self.session: Optional[aiohttp.ClientSession] = None + self._semaphore = asyncio.Semaphore(config.max_concurrent_requests) + logger.info( + f"Initialized VoyageRerankService | url={config.base_url} | model={config.model}" + ) + + async def _ensure_session(self): + if self.session is None or self.session.closed: + timeout = aiohttp.ClientTimeout(total=self.config.timeout) + self.session = aiohttp.ClientSession( + timeout=timeout, + headers={ + "Authorization": f"Bearer {self.config.api_key}", + "Content-Type": "application/json", + }, + ) + + async def close(self): + if self.session and not self.session.closed: + await self.session.close() + + async def _send_rerank_request_batch( + self, query: str, documents: List[str] + ) -> Dict[str, Any]: + """POST one batch to the Voyage rerank endpoint.""" + await self._ensure_session() + + # Voyage expects plain strings, NOT Qwen-template wrapped. + request_data = { + "query": query, + "documents": documents, + "model": self.config.model, + } + + async with self._semaphore: + for attempt in range(self.config.max_retries): + try: + async with self.session.post( + self.config.base_url, json=request_data + ) as response: + if response.status == 200: + json_body = await response.json() + return self._parse_response(json_body, len(documents)) + error_text = await response.text() + logger.error( + f"Voyage rerank API error {response.status}: {error_text}" + ) + if attempt < self.config.max_retries - 1: + await asyncio.sleep(2 ** attempt) + continue + raise RerankError( + f"API failed: {response.status} - {error_text}" + ) + except RerankError: + raise + except Exception as e: + logger.error(f"Voyage rerank exception: {e}") + if attempt < self.config.max_retries - 1: + await asyncio.sleep(2 ** attempt) + continue + raise RerankError(f"Exception: {e}") + + def _parse_response( + self, json_body: Dict[str, Any], num_docs: int + ) -> Dict[str, Any]: + """Voyage returns ``data: [{index, relevance_score}]`` — translate + into a dense ``scores`` array aligned to the request order.""" + scores = [0.0] * num_docs + for item in json_body.get("data", []): + idx = item.get("index") + if idx is None or not (0 <= idx < num_docs): + continue + scores[idx] = float(item.get("relevance_score", 0.0)) + + usage = json_body.get("usage", {}) or {} + return { + "scores": scores, + "input_tokens": int(usage.get("total_tokens", 0) or 0), + "request_id": json_body.get("id"), + } + + async def rerank_documents( + self, query: str, documents: List[str], instruction: Optional[str] = None + ) -> Dict[str, Any]: + """Low-level reranking; ``instruction`` is ignored — Voyage uses the + query/documents pair directly.""" + if not documents: + return {"results": []} + + batch_size = self.config.batch_size or 100 + batches = [ + documents[i : i + batch_size] for i in range(0, len(documents), batch_size) + ] + + batch_tasks = [ + self._send_rerank_request_batch(query, batch) for batch in batches + ] + batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True) + + all_scores: List[float] = [] + total_input_tokens = 0 + last_response = None + + for i, result in enumerate(batch_results): + if isinstance(result, Exception): + logger.error(f"Voyage rerank batch {i} failed: {result}") + all_scores.extend([-100.0] * len(batches[i])) + continue + all_scores.extend(result.get("scores", [])) + total_input_tokens += result.get("input_tokens", 0) + last_response = result + + try: + collector = get_bean_by_type(TokenUsageCollector) + collector.add(self.config.model, total_input_tokens, 0, call_type="rerank") + except Exception: + pass + + combined_response = { + "scores": all_scores, + "input_tokens": total_input_tokens, + "request_id": last_response.get("request_id") if last_response else None, + } + return self._convert_response_format(combined_response, len(documents)) + + def _convert_response_format( + self, combined_response: Dict[str, Any], num_documents: int + ) -> Dict[str, Any]: + scores = combined_response.get("scores", []) + if len(scores) < num_documents: + scores.extend([0.0] * (num_documents - len(scores))) + scores = scores[:num_documents] + + indexed_scores = [(i, score) for i, score in enumerate(scores)] + indexed_scores.sort(key=lambda x: x[1], reverse=True) + + results = [ + {"index": original_index, "score": score, "rank": rank} + for rank, (original_index, score) in enumerate(indexed_scores) + ] + return { + "results": results, + "input_tokens": combined_response.get("input_tokens", 0), + "request_id": combined_response.get("request_id"), + } + + async def rerank_memories( + self, + query: str, + hits: List[Dict[str, Any]], + top_k: Optional[int] = None, + instruction: Optional[str] = None, + ) -> List[Dict[str, Any]]: + if not hits: + return [] + + all_texts = [extract_text_from_hit(hit) for hit in hits] + if not all_texts: + return [] + + try: + logger.debug( + f"Voyage reranking, query: {query!r}, num_texts={len(all_texts)}" + ) + rerank_result = await self.rerank_documents(query, all_texts, instruction) + if "results" not in rerank_result: + raise RerankError("Invalid rerank API response: missing results field") + + results_meta = rerank_result.get("results", []) + reranked_hits = [] + for item in results_meta: + original_idx = item.get("index", 0) + score = item.get("score", 0.0) + if 0 <= original_idx < len(hits): + hit = hits[original_idx].copy() + hit["score"] = score + reranked_hits.append(hit) + + if top_k is not None and top_k > 0: + reranked_hits = reranked_hits[:top_k] + + if reranked_hits: + top_scores = [f"{h.get('score', 0):.4f}" for h in reranked_hits[:3]] + logger.info( + f"Voyage reranking completed: {len(reranked_hits)} results, top scores: {top_scores}" + ) + return reranked_hits + + except Exception as e: + logger.error(f"Voyage reranking failed: {e}") + sorted_hits = sorted(hits, key=lambda x: x.get("score", 0), reverse=True) + if top_k is not None and top_k > 0: + sorted_hits = sorted_hits[:top_k] + return sorted_hits + + def get_model_name(self) -> str: + return self.config.model diff --git a/methods/EverCore/src/infra_layer/adapters/out/search/repository/episodic_memory_qdrant_repository.py b/methods/EverCore/src/infra_layer/adapters/out/search/repository/episodic_memory_qdrant_repository.py index 45a769c5..2f17a0c3 100644 --- a/methods/EverCore/src/infra_layer/adapters/out/search/repository/episodic_memory_qdrant_repository.py +++ b/methods/EverCore/src/infra_layer/adapters/out/search/repository/episodic_memory_qdrant_repository.py @@ -12,7 +12,7 @@ import asyncio import json -from datetime import datetime +from datetime import datetime, timezone from functools import partial from typing import Any, Dict, List, Optional From dd7104fd7bd4bb90776f06fdef4b2abba0620d26 Mon Sep 17 00:00:00 2001 From: Ptah-CT <221234802+Ptah-CT@users.noreply.github.com> Date: Fri, 15 May 2026 16:46:20 +0000 Subject: [PATCH 2/3] =?UTF-8?q?docs:=20FORK=5FPATCHES.md=20=E2=80=94=20man?= =?UTF-8?q?ual-restore=20checklist=20for=20fork-only=20patches?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Issues are disabled on this fork, so the tracking list lives in-repo. Documents every patch that exists only in XInfty/EverOS_Qdrant and must be re-applied after each upstream sync: - Voyage rerank provider + factory branch - timezone import in episodic Qdrant repo - 13× response_format json_schema strict across LLM call-sites - Profile extractor post-parse coercion - Intentional opt-outs (tool_pre_compress stays on json_object) Each section has a one-liner grep so a future contributor (or CI guard) can verify the patch is still present after a merge from upstream. Concrete history motivating this file: - 2026-05-12: response_format=json_object patches got exposed by a model swap; migrated 13/15 sites to strict json_schema. - 2026-05-15: rerank_voyage.py + factory branch silently dropped during the methods/evermemos/ → methods/EverCore/ rename. Retrieval returned empty for three days before being noticed. Restored in #7. --- FORK_PATCHES.md | 98 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 98 insertions(+) create mode 100644 FORK_PATCHES.md diff --git a/FORK_PATCHES.md b/FORK_PATCHES.md new file mode 100644 index 00000000..56547e12 --- /dev/null +++ b/FORK_PATCHES.md @@ -0,0 +1,98 @@ +# Fork-Only Patches — Manual Restore Checklist + +This fork (`XInfty/EverOS_Qdrant`) carries patches that do **not** exist upstream in `EverMind-AI/EverOS`. Upstream periodically renames or restructures the tree (most recently `methods/evermemos/` → `methods/EverCore/` around 2026-05-13). When that happens, custom files can silently disappear and custom edits inside renamed files can be reverted. + +Real incidents this list exists to prevent: + +- **2026-05-12** — 14 LLM call-sites had `response_format={"type":"json_object"}` patches; a model swap exposed that this isn't strict enough. Solution was migrating 13/15 sites to `response_format={"type":"json_schema", "strict": true}` with per-site schemas. +- **2026-05-15** — `rerank_voyage.py` plus the corresponding factory branch in `rerank_service.py` disappeared during the rename. Retrieval went silently empty for three days (HTTP 200 + `episodes=[]`, not a hang) before being noticed. Restored in #7. + +## After every `git merge upstream/main` + +Run the checks below. If any fails, restore the patch before pushing. + +### 1. Voyage AI rerank provider + +- [ ] File `methods/EverCore/src/agentic_layer/rerank_voyage.py` exists (~252 LOC). Voyage returns `data: [{index, relevance_score}]`; this file normalises to the EverOS-standard `{results: [{index, score, rank}]}`. +- [ ] `methods/EverCore/src/agentic_layer/rerank_service.py` contains a factory branch for `provider.lower() == "voyage"` between the `deepinfra` branch and the `else: raise RerankError`. Defaults `base_url` to `https://api.voyageai.com/v1/rerank` when env override is empty. + +Quick grep: + +```bash +test -f methods/EverCore/src/agentic_layer/rerank_voyage.py \ + && grep -q '"voyage"' methods/EverCore/src/agentic_layer/rerank_service.py \ + && echo "voyage: OK" || echo "voyage: MISSING" +``` + +### 2. `timezone` import in episodic Qdrant repo + +- [ ] `methods/EverCore/src/infra_layer/adapters/out/search/repository/episodic_memory_qdrant_repository.py` line 15 reads `from datetime import datetime, timezone` (not just `datetime`). Required by `tz=timezone.utc` further down. + +Quick grep: + +```bash +grep -q '^from datetime import datetime, timezone' \ + methods/EverCore/src/infra_layer/adapters/out/search/repository/episodic_memory_qdrant_repository.py \ + && echo "timezone: OK" || echo "timezone: MISSING" +``` + +### 3. Strict JSON-Schema response_format + +13 of 15 LLM call-sites should use `response_format={"type": "json_schema", "strict": True, "schema": {...}}` instead of `{"type": "json_object"}`. Per-site schema name in the second column: + +| File | Schema name | +|---|---| +| `methods/EverCore/src/memory_layer/memory_extractor/episode_memory_extractor.py:270` | `episode_memory` | +| `methods/EverCore/src/memory_layer/memcell_extractor/conv_memcell_extractor.py:409` | `batch_boundary_result` | +| `methods/EverCore/src/agentic_layer/agentic_utils.py:326` | `sufficiency_check` | +| `methods/EverCore/src/agentic_layer/agentic_utils.py:403` | `multi_query_generation` | +| `methods/EverCore/src/agentic_layer/search_mem_service.py:1618` | `skill_relevance` | +| `methods/EverCore/src/memory_layer/cluster_manager/manager.py:658` | `cluster_assignment` | +| `methods/EverCore/src/memory_layer/memory_extractor/agent_skill_extractor.py:301` | `skill_operations` | +| `methods/EverCore/src/memory_layer/memory_extractor/agent_skill_extractor.py:330` | `skill_maturity` | +| `methods/EverCore/src/memory_layer/memory_extractor/foresight_extractor.py:120` | `foresight_associations` (wrapped object, parser accepts both shapes) | +| `methods/EverCore/src/memory_layer/memory_extractor/atomic_fact_extractor.py:190` | `atomic_facts_extraction` | +| `methods/EverCore/src/memory_layer/memory_extractor/agent_case_extractor.py:394` | `filter_decision` | +| `methods/EverCore/src/memory_layer/memory_extractor/agent_case_extractor.py:415` | `experience_record` | +| `methods/EverCore/src/memory_layer/memory_extractor/profile_extractor.py` (2 sites, shared parser) | `_parse_profile_response` type-coercion (see §4) | + +Quick count: + +```bash +grep -rE 'response_format.*json_schema' methods/EverCore/src/ | wc -l +# expect: >= 13 +``` + +### 4. Profile extractor — defense-in-depth coercion + +- [ ] `methods/EverCore/src/memory_layer/memory_extractor/profile_extractor.py` has post-parse coercion in `_parse_profile_response` that runs `json.dumps(...)` on non-string `description / trait / evidence / category` values before downstream code touches them. The profile output shape is too heterogeneous for strict schema; coercion catches LLM drift instead. + +Quick grep: + +```bash +grep -q 'json.dumps' \ + methods/EverCore/src/memory_layer/memory_extractor/profile_extractor.py \ + && echo "profile coercion: OK" || echo "profile coercion: MISSING" +``` + +### 5. Intentional opt-outs (do NOT add strict schema) + +- `methods/EverCore/src/memory_layer/memory_extractor/agent_case_extractor.py:370` (`tool_pre_compress`) stays on `json_object`. The `compressed_messages` shape is heterogeneous (assistant + tool_calls vs tool message) and breaks strict schema. The caller validates `len(compressed_messages) == len(messages)` as a safety net. + +## Suggested CI guard + +A `.github/workflows/fork-patch-guard.yml` job that runs the four greps above on every PR and fails if any returns the wrong count. PR welcome. + +## How to update this file + +When a new patch is added that lives only in this fork: + +1. Add it to the relevant section above (or create a new one). +2. Add a quick-grep snippet. +3. Update `.github/workflows/fork-patch-guard.yml` if it exists. +4. Reference the PR that introduced the patch. + +## Related history + +- PR #7 — Restored Voyage rerank + timezone import after the 2026-05-13 rename. +- XInfty/XInfty_docs#2 — EWM Bug-Cluster doc update for 2026-05-15. From 8c8bbb6bb6ef3b2eb764205d1f30e1b37c2fc098 Mon Sep 17 00:00:00 2001 From: Ptah-CT <221234802+Ptah-CT@users.noreply.github.com> Date: Fri, 15 May 2026 16:51:06 +0000 Subject: [PATCH 3/3] fix(everos): guard VoyageRerankService session creation with asyncio.Lock MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CodeRabbit finding #3: _ensure_session() could be entered concurrently by multiple coroutines (e.g. parallel rerank batches via gather), each seeing self.session is None and creating its own aiohttp.ClientSession. The losers of the race were never tracked and leaked. Add self._session_lock = asyncio.Lock() in __init__ and use a double-checked pattern inside _ensure_session: a fast path when the session is already alive, a lock-protected slow path with re-check before instantiation. Findings #1 (Voyage model default) and #2 (raw query in debug log) are intentionally deferred — production .env always sets RERANK_MODEL and DEBUG-level logs are not enabled by default in this environment. --- methods/EverCore/src/agentic_layer/rerank_voyage.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/methods/EverCore/src/agentic_layer/rerank_voyage.py b/methods/EverCore/src/agentic_layer/rerank_voyage.py index 6a2dbb6d..cd0cfdb3 100644 --- a/methods/EverCore/src/agentic_layer/rerank_voyage.py +++ b/methods/EverCore/src/agentic_layer/rerank_voyage.py @@ -55,12 +55,19 @@ def __init__(self, config: Optional[VoyageRerankConfig] = None): self.config = config self.session: Optional[aiohttp.ClientSession] = None self._semaphore = asyncio.Semaphore(config.max_concurrent_requests) + self._session_lock = asyncio.Lock() logger.info( f"Initialized VoyageRerankService | url={config.base_url} | model={config.model}" ) async def _ensure_session(self): - if self.session is None or self.session.closed: + # Lock guards against concurrent first-touch creating multiple sessions + # and leaking the loser; the inner re-check covers the racy second waiter. + if self.session is not None and not self.session.closed: + return + async with self._session_lock: + if self.session is not None and not self.session.closed: + return timeout = aiohttp.ClientTimeout(total=self.config.timeout) self.session = aiohttp.ClientSession( timeout=timeout,