diff --git a/FORK_PATCHES.md b/FORK_PATCHES.md new file mode 100644 index 00000000..56547e12 --- /dev/null +++ b/FORK_PATCHES.md @@ -0,0 +1,98 @@ +# Fork-Only Patches — Manual Restore Checklist + +This fork (`XInfty/EverOS_Qdrant`) carries patches that do **not** exist upstream in `EverMind-AI/EverOS`. Upstream periodically renames or restructures the tree (most recently `methods/evermemos/` → `methods/EverCore/` around 2026-05-13). When that happens, custom files can silently disappear and custom edits inside renamed files can be reverted. + +Real incidents this list exists to prevent: + +- **2026-05-12** — 14 LLM call-sites had `response_format={"type":"json_object"}` patches; a model swap exposed that this isn't strict enough. Solution was migrating 13/15 sites to `response_format={"type":"json_schema", "strict": true}` with per-site schemas. +- **2026-05-15** — `rerank_voyage.py` plus the corresponding factory branch in `rerank_service.py` disappeared during the rename. Retrieval went silently empty for three days (HTTP 200 + `episodes=[]`, not a hang) before being noticed. Restored in #7. + +## After every `git merge upstream/main` + +Run the checks below. If any fails, restore the patch before pushing. + +### 1. Voyage AI rerank provider + +- [ ] File `methods/EverCore/src/agentic_layer/rerank_voyage.py` exists (~252 LOC). Voyage returns `data: [{index, relevance_score}]`; this file normalises to the EverOS-standard `{results: [{index, score, rank}]}`. +- [ ] `methods/EverCore/src/agentic_layer/rerank_service.py` contains a factory branch for `provider.lower() == "voyage"` between the `deepinfra` branch and the `else: raise RerankError`. Defaults `base_url` to `https://api.voyageai.com/v1/rerank` when env override is empty. + +Quick grep: + +```bash +test -f methods/EverCore/src/agentic_layer/rerank_voyage.py \ + && grep -q '"voyage"' methods/EverCore/src/agentic_layer/rerank_service.py \ + && echo "voyage: OK" || echo "voyage: MISSING" +``` + +### 2. `timezone` import in episodic Qdrant repo + +- [ ] `methods/EverCore/src/infra_layer/adapters/out/search/repository/episodic_memory_qdrant_repository.py` line 15 reads `from datetime import datetime, timezone` (not just `datetime`). Required by `tz=timezone.utc` further down. + +Quick grep: + +```bash +grep -q '^from datetime import datetime, timezone' \ + methods/EverCore/src/infra_layer/adapters/out/search/repository/episodic_memory_qdrant_repository.py \ + && echo "timezone: OK" || echo "timezone: MISSING" +``` + +### 3. Strict JSON-Schema response_format + +13 of 15 LLM call-sites should use `response_format={"type": "json_schema", "strict": True, "schema": {...}}` instead of `{"type": "json_object"}`. Per-site schema name in the second column: + +| File | Schema name | +|---|---| +| `methods/EverCore/src/memory_layer/memory_extractor/episode_memory_extractor.py:270` | `episode_memory` | +| `methods/EverCore/src/memory_layer/memcell_extractor/conv_memcell_extractor.py:409` | `batch_boundary_result` | +| `methods/EverCore/src/agentic_layer/agentic_utils.py:326` | `sufficiency_check` | +| `methods/EverCore/src/agentic_layer/agentic_utils.py:403` | `multi_query_generation` | +| `methods/EverCore/src/agentic_layer/search_mem_service.py:1618` | `skill_relevance` | +| `methods/EverCore/src/memory_layer/cluster_manager/manager.py:658` | `cluster_assignment` | +| `methods/EverCore/src/memory_layer/memory_extractor/agent_skill_extractor.py:301` | `skill_operations` | +| `methods/EverCore/src/memory_layer/memory_extractor/agent_skill_extractor.py:330` | `skill_maturity` | +| `methods/EverCore/src/memory_layer/memory_extractor/foresight_extractor.py:120` | `foresight_associations` (wrapped object, parser accepts both shapes) | +| `methods/EverCore/src/memory_layer/memory_extractor/atomic_fact_extractor.py:190` | `atomic_facts_extraction` | +| `methods/EverCore/src/memory_layer/memory_extractor/agent_case_extractor.py:394` | `filter_decision` | +| `methods/EverCore/src/memory_layer/memory_extractor/agent_case_extractor.py:415` | `experience_record` | +| `methods/EverCore/src/memory_layer/memory_extractor/profile_extractor.py` (2 sites, shared parser) | `_parse_profile_response` type-coercion (see §4) | + +Quick count: + +```bash +grep -rE 'response_format.*json_schema' methods/EverCore/src/ | wc -l +# expect: >= 13 +``` + +### 4. Profile extractor — defense-in-depth coercion + +- [ ] `methods/EverCore/src/memory_layer/memory_extractor/profile_extractor.py` has post-parse coercion in `_parse_profile_response` that runs `json.dumps(...)` on non-string `description / trait / evidence / category` values before downstream code touches them. The profile output shape is too heterogeneous for strict schema; coercion catches LLM drift instead. + +Quick grep: + +```bash +grep -q 'json.dumps' \ + methods/EverCore/src/memory_layer/memory_extractor/profile_extractor.py \ + && echo "profile coercion: OK" || echo "profile coercion: MISSING" +``` + +### 5. Intentional opt-outs (do NOT add strict schema) + +- `methods/EverCore/src/memory_layer/memory_extractor/agent_case_extractor.py:370` (`tool_pre_compress`) stays on `json_object`. The `compressed_messages` shape is heterogeneous (assistant + tool_calls vs tool message) and breaks strict schema. The caller validates `len(compressed_messages) == len(messages)` as a safety net. + +## Suggested CI guard + +A `.github/workflows/fork-patch-guard.yml` job that runs the four greps above on every PR and fails if any returns the wrong count. PR welcome. + +## How to update this file + +When a new patch is added that lives only in this fork: + +1. Add it to the relevant section above (or create a new one). +2. Add a quick-grep snippet. +3. Update `.github/workflows/fork-patch-guard.yml` if it exists. +4. Reference the PR that introduced the patch. + +## Related history + +- PR #7 — Restored Voyage rerank + timezone import after the 2026-05-13 rename. +- XInfty/XInfty_docs#2 — EWM Bug-Cluster doc update for 2026-05-15. diff --git a/methods/EverCore/src/agentic_layer/rerank_service.py b/methods/EverCore/src/agentic_layer/rerank_service.py index e668e8c2..e51fe4cf 100644 --- a/methods/EverCore/src/agentic_layer/rerank_service.py +++ b/methods/EverCore/src/agentic_layer/rerank_service.py @@ -30,6 +30,7 @@ ) from agentic_layer.rerank_vllm import VllmRerankService, VllmRerankConfig from agentic_layer.rerank_deepinfra import DeepInfraRerankService, DeepInfraRerankConfig +from agentic_layer.rerank_voyage import VoyageRerankService, VoyageRerankConfig from agentic_layer.metrics.rerank_metrics import ( record_rerank_request, record_rerank_fallback, @@ -173,6 +174,17 @@ def _create_service_from_config( max_concurrent_requests=max_concurrent, ) return DeepInfraRerankService(config) + elif provider.lower() == "voyage": + config = VoyageRerankConfig( + api_key=api_key, + base_url=base_url or 'https://api.voyageai.com/v1/rerank', + model=model, + timeout=timeout, + max_retries=max_retries, + batch_size=batch_size, + max_concurrent_requests=max_concurrent, + ) + return VoyageRerankService(config) else: raise RerankError(f"Unsupported provider: {provider}") diff --git a/methods/EverCore/src/agentic_layer/rerank_voyage.py b/methods/EverCore/src/agentic_layer/rerank_voyage.py new file mode 100644 index 00000000..cd0cfdb3 --- /dev/null +++ b/methods/EverCore/src/agentic_layer/rerank_voyage.py @@ -0,0 +1,259 @@ +""" +Voyage AI Rerank Service Implementation + +Reranking service using Voyage AI commercial API (rerank-2.5). + +Voyage's API differs from DeepInfra/vLLM in two ways: +- POST to a fixed endpoint (no model-suffix path). +- Request shape: ``{"query", "documents", "model"}`` with plain strings + (no Qwen ``<|im_start|>`` template wrapping). +- Response shape: ``{"data": [{"index", "relevance_score"}], "usage": {...}}`` + (DeepInfra/Cohere return ``results``; Voyage returns ``data``). + +The service normalises Voyage's response into the EverOS standard +``{"results": [{"index", "score", "rank"}]}`` shape so callers don't care +which backend produced the scores. +""" + +import asyncio +import aiohttp +import logging +from typing import List, Dict, Any, Optional +from dataclasses import dataclass + +from agentic_layer.rerank_interface import ( + RerankServiceInterface, + RerankError, + extract_text_from_hit, +) +from core.di.utils import get_bean_by_type +from core.component.token_usage_collector import TokenUsageCollector + +logger = logging.getLogger(__name__) + + +@dataclass +class VoyageRerankConfig: + """Voyage rerank service configuration""" + + api_key: str = "" # skip-sensitive-check + base_url: str = "https://api.voyageai.com/v1/rerank" + model: str = "rerank-2.5" + timeout: int = 30 + max_retries: int = 3 + batch_size: int = 100 # Voyage accepts up to 1000 docs/request + max_concurrent_requests: int = 5 + + +class VoyageRerankService(RerankServiceInterface): + """Voyage AI reranking service implementation""" + + def __init__(self, config: Optional[VoyageRerankConfig] = None): + if config is None: + config = VoyageRerankConfig() + + self.config = config + self.session: Optional[aiohttp.ClientSession] = None + self._semaphore = asyncio.Semaphore(config.max_concurrent_requests) + self._session_lock = asyncio.Lock() + logger.info( + f"Initialized VoyageRerankService | url={config.base_url} | model={config.model}" + ) + + async def _ensure_session(self): + # Lock guards against concurrent first-touch creating multiple sessions + # and leaking the loser; the inner re-check covers the racy second waiter. + if self.session is not None and not self.session.closed: + return + async with self._session_lock: + if self.session is not None and not self.session.closed: + return + timeout = aiohttp.ClientTimeout(total=self.config.timeout) + self.session = aiohttp.ClientSession( + timeout=timeout, + headers={ + "Authorization": f"Bearer {self.config.api_key}", + "Content-Type": "application/json", + }, + ) + + async def close(self): + if self.session and not self.session.closed: + await self.session.close() + + async def _send_rerank_request_batch( + self, query: str, documents: List[str] + ) -> Dict[str, Any]: + """POST one batch to the Voyage rerank endpoint.""" + await self._ensure_session() + + # Voyage expects plain strings, NOT Qwen-template wrapped. + request_data = { + "query": query, + "documents": documents, + "model": self.config.model, + } + + async with self._semaphore: + for attempt in range(self.config.max_retries): + try: + async with self.session.post( + self.config.base_url, json=request_data + ) as response: + if response.status == 200: + json_body = await response.json() + return self._parse_response(json_body, len(documents)) + error_text = await response.text() + logger.error( + f"Voyage rerank API error {response.status}: {error_text}" + ) + if attempt < self.config.max_retries - 1: + await asyncio.sleep(2 ** attempt) + continue + raise RerankError( + f"API failed: {response.status} - {error_text}" + ) + except RerankError: + raise + except Exception as e: + logger.error(f"Voyage rerank exception: {e}") + if attempt < self.config.max_retries - 1: + await asyncio.sleep(2 ** attempt) + continue + raise RerankError(f"Exception: {e}") + + def _parse_response( + self, json_body: Dict[str, Any], num_docs: int + ) -> Dict[str, Any]: + """Voyage returns ``data: [{index, relevance_score}]`` — translate + into a dense ``scores`` array aligned to the request order.""" + scores = [0.0] * num_docs + for item in json_body.get("data", []): + idx = item.get("index") + if idx is None or not (0 <= idx < num_docs): + continue + scores[idx] = float(item.get("relevance_score", 0.0)) + + usage = json_body.get("usage", {}) or {} + return { + "scores": scores, + "input_tokens": int(usage.get("total_tokens", 0) or 0), + "request_id": json_body.get("id"), + } + + async def rerank_documents( + self, query: str, documents: List[str], instruction: Optional[str] = None + ) -> Dict[str, Any]: + """Low-level reranking; ``instruction`` is ignored — Voyage uses the + query/documents pair directly.""" + if not documents: + return {"results": []} + + batch_size = self.config.batch_size or 100 + batches = [ + documents[i : i + batch_size] for i in range(0, len(documents), batch_size) + ] + + batch_tasks = [ + self._send_rerank_request_batch(query, batch) for batch in batches + ] + batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True) + + all_scores: List[float] = [] + total_input_tokens = 0 + last_response = None + + for i, result in enumerate(batch_results): + if isinstance(result, Exception): + logger.error(f"Voyage rerank batch {i} failed: {result}") + all_scores.extend([-100.0] * len(batches[i])) + continue + all_scores.extend(result.get("scores", [])) + total_input_tokens += result.get("input_tokens", 0) + last_response = result + + try: + collector = get_bean_by_type(TokenUsageCollector) + collector.add(self.config.model, total_input_tokens, 0, call_type="rerank") + except Exception: + pass + + combined_response = { + "scores": all_scores, + "input_tokens": total_input_tokens, + "request_id": last_response.get("request_id") if last_response else None, + } + return self._convert_response_format(combined_response, len(documents)) + + def _convert_response_format( + self, combined_response: Dict[str, Any], num_documents: int + ) -> Dict[str, Any]: + scores = combined_response.get("scores", []) + if len(scores) < num_documents: + scores.extend([0.0] * (num_documents - len(scores))) + scores = scores[:num_documents] + + indexed_scores = [(i, score) for i, score in enumerate(scores)] + indexed_scores.sort(key=lambda x: x[1], reverse=True) + + results = [ + {"index": original_index, "score": score, "rank": rank} + for rank, (original_index, score) in enumerate(indexed_scores) + ] + return { + "results": results, + "input_tokens": combined_response.get("input_tokens", 0), + "request_id": combined_response.get("request_id"), + } + + async def rerank_memories( + self, + query: str, + hits: List[Dict[str, Any]], + top_k: Optional[int] = None, + instruction: Optional[str] = None, + ) -> List[Dict[str, Any]]: + if not hits: + return [] + + all_texts = [extract_text_from_hit(hit) for hit in hits] + if not all_texts: + return [] + + try: + logger.debug( + f"Voyage reranking, query: {query!r}, num_texts={len(all_texts)}" + ) + rerank_result = await self.rerank_documents(query, all_texts, instruction) + if "results" not in rerank_result: + raise RerankError("Invalid rerank API response: missing results field") + + results_meta = rerank_result.get("results", []) + reranked_hits = [] + for item in results_meta: + original_idx = item.get("index", 0) + score = item.get("score", 0.0) + if 0 <= original_idx < len(hits): + hit = hits[original_idx].copy() + hit["score"] = score + reranked_hits.append(hit) + + if top_k is not None and top_k > 0: + reranked_hits = reranked_hits[:top_k] + + if reranked_hits: + top_scores = [f"{h.get('score', 0):.4f}" for h in reranked_hits[:3]] + logger.info( + f"Voyage reranking completed: {len(reranked_hits)} results, top scores: {top_scores}" + ) + return reranked_hits + + except Exception as e: + logger.error(f"Voyage reranking failed: {e}") + sorted_hits = sorted(hits, key=lambda x: x.get("score", 0), reverse=True) + if top_k is not None and top_k > 0: + sorted_hits = sorted_hits[:top_k] + return sorted_hits + + def get_model_name(self) -> str: + return self.config.model diff --git a/methods/EverCore/src/infra_layer/adapters/out/search/repository/episodic_memory_qdrant_repository.py b/methods/EverCore/src/infra_layer/adapters/out/search/repository/episodic_memory_qdrant_repository.py index 45a769c5..2f17a0c3 100644 --- a/methods/EverCore/src/infra_layer/adapters/out/search/repository/episodic_memory_qdrant_repository.py +++ b/methods/EverCore/src/infra_layer/adapters/out/search/repository/episodic_memory_qdrant_repository.py @@ -12,7 +12,7 @@ import asyncio import json -from datetime import datetime +from datetime import datetime, timezone from functools import partial from typing import Any, Dict, List, Optional