Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions FORK_PATCHES.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# Fork-Only Patches — Manual Restore Checklist

This fork (`XInfty/EverOS_Qdrant`) carries patches that do **not** exist upstream in `EverMind-AI/EverOS`. Upstream periodically renames or restructures the tree (most recently `methods/evermemos/` → `methods/EverCore/` around 2026-05-13). When that happens, custom files can silently disappear and custom edits inside renamed files can be reverted.

Real incidents this list exists to prevent:

- **2026-05-12** — 14 LLM call-sites had `response_format={"type":"json_object"}` patches; a model swap exposed that this isn't strict enough. Solution was migrating 13/15 sites to `response_format={"type":"json_schema", "strict": true}` with per-site schemas.
- **2026-05-15** — `rerank_voyage.py` plus the corresponding factory branch in `rerank_service.py` disappeared during the rename. Retrieval went silently empty for three days (HTTP 200 + `episodes=[]`, not a hang) before being noticed. Restored in #7.

## After every `git merge upstream/main`

Run the checks below. If any fails, restore the patch before pushing.

### 1. Voyage AI rerank provider

- [ ] File `methods/EverCore/src/agentic_layer/rerank_voyage.py` exists (~252 LOC). Voyage returns `data: [{index, relevance_score}]`; this file normalises to the EverOS-standard `{results: [{index, score, rank}]}`.
- [ ] `methods/EverCore/src/agentic_layer/rerank_service.py` contains a factory branch for `provider.lower() == "voyage"` between the `deepinfra` branch and the `else: raise RerankError`. Defaults `base_url` to `https://api.voyageai.com/v1/rerank` when env override is empty.

Quick grep:

```bash
test -f methods/EverCore/src/agentic_layer/rerank_voyage.py \
&& grep -q '"voyage"' methods/EverCore/src/agentic_layer/rerank_service.py \
&& echo "voyage: OK" || echo "voyage: MISSING"
```

### 2. `timezone` import in episodic Qdrant repo

- [ ] `methods/EverCore/src/infra_layer/adapters/out/search/repository/episodic_memory_qdrant_repository.py` line 15 reads `from datetime import datetime, timezone` (not just `datetime`). Required by `tz=timezone.utc` further down.

Quick grep:

```bash
grep -q '^from datetime import datetime, timezone' \
methods/EverCore/src/infra_layer/adapters/out/search/repository/episodic_memory_qdrant_repository.py \
&& echo "timezone: OK" || echo "timezone: MISSING"
```

### 3. Strict JSON-Schema response_format

13 of 15 LLM call-sites should use `response_format={"type": "json_schema", "strict": True, "schema": {...}}` instead of `{"type": "json_object"}`. Per-site schema name in the second column:

| File | Schema name |
|---|---|
| `methods/EverCore/src/memory_layer/memory_extractor/episode_memory_extractor.py:270` | `episode_memory` |
| `methods/EverCore/src/memory_layer/memcell_extractor/conv_memcell_extractor.py:409` | `batch_boundary_result` |
| `methods/EverCore/src/agentic_layer/agentic_utils.py:326` | `sufficiency_check` |
| `methods/EverCore/src/agentic_layer/agentic_utils.py:403` | `multi_query_generation` |
| `methods/EverCore/src/agentic_layer/search_mem_service.py:1618` | `skill_relevance` |
| `methods/EverCore/src/memory_layer/cluster_manager/manager.py:658` | `cluster_assignment` |
| `methods/EverCore/src/memory_layer/memory_extractor/agent_skill_extractor.py:301` | `skill_operations` |
| `methods/EverCore/src/memory_layer/memory_extractor/agent_skill_extractor.py:330` | `skill_maturity` |
| `methods/EverCore/src/memory_layer/memory_extractor/foresight_extractor.py:120` | `foresight_associations` (wrapped object, parser accepts both shapes) |
| `methods/EverCore/src/memory_layer/memory_extractor/atomic_fact_extractor.py:190` | `atomic_facts_extraction` |
| `methods/EverCore/src/memory_layer/memory_extractor/agent_case_extractor.py:394` | `filter_decision` |
| `methods/EverCore/src/memory_layer/memory_extractor/agent_case_extractor.py:415` | `experience_record` |
| `methods/EverCore/src/memory_layer/memory_extractor/profile_extractor.py` (2 sites, shared parser) | `_parse_profile_response` type-coercion (see §4) |

Quick count:

```bash
grep -rE 'response_format.*json_schema' methods/EverCore/src/ | wc -l
# expect: >= 13
```

### 4. Profile extractor — defense-in-depth coercion

- [ ] `methods/EverCore/src/memory_layer/memory_extractor/profile_extractor.py` has post-parse coercion in `_parse_profile_response` that runs `json.dumps(...)` on non-string `description / trait / evidence / category` values before downstream code touches them. The profile output shape is too heterogeneous for strict schema; coercion catches LLM drift instead.

Quick grep:

```bash
grep -q 'json.dumps' \
methods/EverCore/src/memory_layer/memory_extractor/profile_extractor.py \
&& echo "profile coercion: OK" || echo "profile coercion: MISSING"
```

### 5. Intentional opt-outs (do NOT add strict schema)

- `methods/EverCore/src/memory_layer/memory_extractor/agent_case_extractor.py:370` (`tool_pre_compress`) stays on `json_object`. The `compressed_messages` shape is heterogeneous (assistant + tool_calls vs tool message) and breaks strict schema. The caller validates `len(compressed_messages) == len(messages)` as a safety net.

## Suggested CI guard

A `.github/workflows/fork-patch-guard.yml` job that runs the four greps above on every PR and fails if any returns the wrong count. PR welcome.

## How to update this file

When a new patch is added that lives only in this fork:

1. Add it to the relevant section above (or create a new one).
2. Add a quick-grep snippet.
3. Update `.github/workflows/fork-patch-guard.yml` if it exists.
4. Reference the PR that introduced the patch.

## Related history

- PR #7 — Restored Voyage rerank + timezone import after the 2026-05-13 rename.
- XInfty/XInfty_docs#2 — EWM Bug-Cluster doc update for 2026-05-15.
12 changes: 12 additions & 0 deletions methods/EverCore/src/agentic_layer/rerank_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
)
from agentic_layer.rerank_vllm import VllmRerankService, VllmRerankConfig
from agentic_layer.rerank_deepinfra import DeepInfraRerankService, DeepInfraRerankConfig
from agentic_layer.rerank_voyage import VoyageRerankService, VoyageRerankConfig
from agentic_layer.metrics.rerank_metrics import (
record_rerank_request,
record_rerank_fallback,
Expand Down Expand Up @@ -173,6 +174,17 @@ def _create_service_from_config(
max_concurrent_requests=max_concurrent,
)
return DeepInfraRerankService(config)
elif provider.lower() == "voyage":
config = VoyageRerankConfig(
api_key=api_key,
base_url=base_url or 'https://api.voyageai.com/v1/rerank',
model=model,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Select a Voyage model when provider is voyage

When RERANK_PROVIDER=voyage, this branch still forwards the shared RERANK_MODEL value unchanged, but the repo default is Qwen/Qwen3-Reranker-4B (methods/EverCore/env.template:121), which is not a Voyage rerank model. In that common configuration, Voyage returns a request error and reranking degrades/fails (especially with fallback disabled), so switching providers is broken unless users manually discover and override the model. Set a provider-specific default (e.g., rerank-2.5) when the model is unset or still on the non-Voyage default.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Select a Voyage-compatible default model

The new voyage branch forwards the shared RERANK_MODEL unchanged, but the repository default remains Qwen/Qwen3-Reranker-4B in env.template, which is not a Voyage reranker model. With RERANK_PROVIDER=voyage and no explicit model override, requests will hit Voyage with an invalid model and fail with 4xx errors, breaking reranking in the default configuration for this provider.

Useful? React with 👍 / 👎.

timeout=timeout,
max_retries=max_retries,
batch_size=batch_size,
max_concurrent_requests=max_concurrent,
)
return VoyageRerankService(config)
Comment on lines +177 to +187
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Der Voyage-Branch neutralisiert den provider-spezifischen Modell-Default.

Hier wird immer das geteilte model übergeben; dadurch greift VoyageRerankConfig-Default (rerank-2.5) nie. Für provider="voyage" sollte ohne explizites Modell auf den Voyage-Default zurückgefallen werden.

🔧 Vorschlag zur Behebung
     elif provider.lower() == "voyage":
+        resolved_model = model
+        if not resolved_model or resolved_model == HybridRerankConfig.model:
+            resolved_model = VoyageRerankConfig().model
         config = VoyageRerankConfig(
             api_key=api_key,
             base_url=base_url or 'https://api.voyageai.com/v1/rerank',
-            model=model,
+            model=resolved_model,
             timeout=timeout,
             max_retries=max_retries,
             batch_size=batch_size,
             max_concurrent_requests=max_concurrent,
         )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@methods/EverCore/src/agentic_layer/rerank_service.py` around lines 177 - 187,
The Voyage branch currently always forwards the shared model value to
VoyageRerankConfig, preventing VoyageRerankConfig's internal default
(rerank-2.5) from being used; modify the branch so that VoyageRerankConfig is
constructed without a model argument when no explicit model was provided (i.e.,
only pass model=model when model is not None/empty), or explicitly pass None so
the config can apply its default; update the instantiation that creates
VoyageRerankConfig in the provider=="voyage" branch (referencing
VoyageRerankConfig and VoyageRerankService) accordingly.

else:
raise RerankError(f"Unsupported provider: {provider}")

Expand Down
259 changes: 259 additions & 0 deletions methods/EverCore/src/agentic_layer/rerank_voyage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
"""
Voyage AI Rerank Service Implementation

Reranking service using Voyage AI commercial API (rerank-2.5).

Voyage's API differs from DeepInfra/vLLM in two ways:
- POST to a fixed endpoint (no model-suffix path).
- Request shape: ``{"query", "documents", "model"}`` with plain strings
(no Qwen ``<|im_start|>`` template wrapping).
- Response shape: ``{"data": [{"index", "relevance_score"}], "usage": {...}}``
(DeepInfra/Cohere return ``results``; Voyage returns ``data``).

The service normalises Voyage's response into the EverOS standard
``{"results": [{"index", "score", "rank"}]}`` shape so callers don't care
which backend produced the scores.
"""

import asyncio
import aiohttp
import logging
from typing import List, Dict, Any, Optional
from dataclasses import dataclass

from agentic_layer.rerank_interface import (
RerankServiceInterface,
RerankError,
extract_text_from_hit,
)
from core.di.utils import get_bean_by_type
from core.component.token_usage_collector import TokenUsageCollector

logger = logging.getLogger(__name__)


@dataclass
class VoyageRerankConfig:
"""Voyage rerank service configuration"""

api_key: str = "" # skip-sensitive-check
base_url: str = "https://api.voyageai.com/v1/rerank"
model: str = "rerank-2.5"
timeout: int = 30
max_retries: int = 3
batch_size: int = 100 # Voyage accepts up to 1000 docs/request
max_concurrent_requests: int = 5


class VoyageRerankService(RerankServiceInterface):
"""Voyage AI reranking service implementation"""

def __init__(self, config: Optional[VoyageRerankConfig] = None):
if config is None:
config = VoyageRerankConfig()

self.config = config
self.session: Optional[aiohttp.ClientSession] = None
self._semaphore = asyncio.Semaphore(config.max_concurrent_requests)
self._session_lock = asyncio.Lock()
logger.info(
f"Initialized VoyageRerankService | url={config.base_url} | model={config.model}"
)

async def _ensure_session(self):
# Lock guards against concurrent first-touch creating multiple sessions
# and leaking the loser; the inner re-check covers the racy second waiter.
if self.session is not None and not self.session.closed:
return
async with self._session_lock:
if self.session is not None and not self.session.closed:
return
timeout = aiohttp.ClientTimeout(total=self.config.timeout)
self.session = aiohttp.ClientSession(
timeout=timeout,
headers={
"Authorization": f"Bearer {self.config.api_key}",
"Content-Type": "application/json",
},
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

async def close(self):
if self.session and not self.session.closed:
await self.session.close()

async def _send_rerank_request_batch(
self, query: str, documents: List[str]
) -> Dict[str, Any]:
"""POST one batch to the Voyage rerank endpoint."""
await self._ensure_session()

# Voyage expects plain strings, NOT Qwen-template wrapped.
request_data = {
"query": query,
"documents": documents,
"model": self.config.model,
}

async with self._semaphore:
for attempt in range(self.config.max_retries):
try:
async with self.session.post(
self.config.base_url, json=request_data
) as response:
if response.status == 200:
json_body = await response.json()
return self._parse_response(json_body, len(documents))
error_text = await response.text()
logger.error(
f"Voyage rerank API error {response.status}: {error_text}"
)
if attempt < self.config.max_retries - 1:
await asyncio.sleep(2 ** attempt)
continue
raise RerankError(
f"API failed: {response.status} - {error_text}"
)
except RerankError:
raise
except Exception as e:
logger.error(f"Voyage rerank exception: {e}")
if attempt < self.config.max_retries - 1:
await asyncio.sleep(2 ** attempt)
continue
raise RerankError(f"Exception: {e}")

def _parse_response(
self, json_body: Dict[str, Any], num_docs: int
) -> Dict[str, Any]:
"""Voyage returns ``data: [{index, relevance_score}]`` — translate
into a dense ``scores`` array aligned to the request order."""
scores = [0.0] * num_docs
for item in json_body.get("data", []):
idx = item.get("index")
if idx is None or not (0 <= idx < num_docs):
continue
scores[idx] = float(item.get("relevance_score", 0.0))

usage = json_body.get("usage", {}) or {}
return {
"scores": scores,
"input_tokens": int(usage.get("total_tokens", 0) or 0),
"request_id": json_body.get("id"),
}

async def rerank_documents(
self, query: str, documents: List[str], instruction: Optional[str] = None
) -> Dict[str, Any]:
"""Low-level reranking; ``instruction`` is ignored — Voyage uses the
query/documents pair directly."""
Comment on lines +147 to +148
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve rerank instruction in Voyage requests

VoyageRerankService.rerank_documents explicitly ignores the instruction argument, so any call site that supplies custom ranking guidance loses that behavior when RERANK_PROVIDER=voyage. For example, search_mem_service passes a skill-specific instruction to bias domain/method matching, but this provider drops it and ranks only on raw query text, which can materially reduce retrieval precision and makes Voyage behavior inconsistent with the existing vLLM/DeepInfra implementations that honor instruction.

Useful? React with 👍 / 👎.

if not documents:
return {"results": []}

batch_size = self.config.batch_size or 100
batches = [
documents[i : i + batch_size] for i in range(0, len(documents), batch_size)
Comment on lines +152 to +154
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Guard against non-positive Voyage batch sizes

This implementation does not validate self.config.batch_size before building batches. If RERANK_BATCH_SIZE is set to a negative value, range(0, len(documents), batch_size) produces no batches, so no API calls are made and the code silently returns zero-filled scores, effectively disabling reranking without an explicit error.

Useful? React with 👍 / 👎.

]

batch_tasks = [
self._send_rerank_request_batch(query, batch) for batch in batches
]
batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)

all_scores: List[float] = []
total_input_tokens = 0
last_response = None

for i, result in enumerate(batch_results):
if isinstance(result, Exception):
logger.error(f"Voyage rerank batch {i} failed: {result}")
all_scores.extend([-100.0] * len(batches[i]))
continue
Comment on lines +167 to +170
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Raise on Voyage batch failures instead of fabricating scores

When a Voyage batch request errors out, the code logs it and appends synthetic -100.0 scores, then continues as if reranking succeeded. If all batches fail (e.g., invalid API key, outage, 429/5xx), callers still receive a normal-looking rerank result, so fallback/error paths are never triggered and production can silently serve degraded ordering instead of surfacing the failure.

Useful? React with 👍 / 👎.

all_scores.extend(result.get("scores", []))
total_input_tokens += result.get("input_tokens", 0)
last_response = result

try:
collector = get_bean_by_type(TokenUsageCollector)
collector.add(self.config.model, total_input_tokens, 0, call_type="rerank")
except Exception:
pass

combined_response = {
"scores": all_scores,
"input_tokens": total_input_tokens,
"request_id": last_response.get("request_id") if last_response else None,
}
return self._convert_response_format(combined_response, len(documents))

def _convert_response_format(
self, combined_response: Dict[str, Any], num_documents: int
) -> Dict[str, Any]:
scores = combined_response.get("scores", [])
if len(scores) < num_documents:
scores.extend([0.0] * (num_documents - len(scores)))
scores = scores[:num_documents]

indexed_scores = [(i, score) for i, score in enumerate(scores)]
indexed_scores.sort(key=lambda x: x[1], reverse=True)

results = [
{"index": original_index, "score": score, "rank": rank}
for rank, (original_index, score) in enumerate(indexed_scores)
]
return {
"results": results,
"input_tokens": combined_response.get("input_tokens", 0),
"request_id": combined_response.get("request_id"),
}

async def rerank_memories(
self,
query: str,
hits: List[Dict[str, Any]],
top_k: Optional[int] = None,
instruction: Optional[str] = None,
) -> List[Dict[str, Any]]:
if not hits:
return []

all_texts = [extract_text_from_hit(hit) for hit in hits]
if not all_texts:
return []

try:
logger.debug(
f"Voyage reranking, query: {query!r}, num_texts={len(all_texts)}"
)
Comment on lines +224 to +226
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Roh-Query wird geloggt und kann Tenant-Daten exponieren.

Das Debug-Log enthält den kompletten Query-Text. In Multi-Tenant-Betrieb sollte hier nur Metadaten (z. B. Länge/Hash) geloggt werden.

🔒 Vorschlag zur Behebung
             logger.debug(
-                f"Voyage reranking, query: {query!r}, num_texts={len(all_texts)}"
+                "Voyage reranking gestartet: num_texts=%d, query_len=%d",
+                len(all_texts),
+                len(query),
             )
As per coding guidelines `EverCore is multi-tenant; data must remain tenant-scoped`.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
logger.debug(
f"Voyage reranking, query: {query!r}, num_texts={len(all_texts)}"
)
logger.debug(
"Voyage reranking gestartet: num_texts=%d, query_len=%d",
len(all_texts),
len(query),
)
🧰 Tools
🪛 Ruff (0.15.12)

[warning] 218-218: Logging statement uses f-string

(G004)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@methods/EverCore/src/agentic_layer/rerank_voyage.py` around lines 217 - 219,
The debug log currently prints the full query string (logger.debug with query
and len(all_texts)), which can expose tenant data; update the logging in the
rerank_voyage logic (look for logger.debug and the variable query in
rerank_voyage or surrounding function) to avoid logging raw query text and
instead log only non-sensitive metadata such as query length and a deterministic
hash (e.g., SHA-256 of query) plus num_texts (len(all_texts)); ensure you
compute the hash from query safely and include a short label like "query_len"
and "query_hash" in the logger.debug call rather than the full query content.

rerank_result = await self.rerank_documents(query, all_texts, instruction)
if "results" not in rerank_result:
raise RerankError("Invalid rerank API response: missing results field")

results_meta = rerank_result.get("results", [])
reranked_hits = []
for item in results_meta:
original_idx = item.get("index", 0)
score = item.get("score", 0.0)
if 0 <= original_idx < len(hits):
hit = hits[original_idx].copy()
hit["score"] = score
reranked_hits.append(hit)

if top_k is not None and top_k > 0:
reranked_hits = reranked_hits[:top_k]

if reranked_hits:
top_scores = [f"{h.get('score', 0):.4f}" for h in reranked_hits[:3]]
logger.info(
f"Voyage reranking completed: {len(reranked_hits)} results, top scores: {top_scores}"
)
return reranked_hits

except Exception as e:
logger.error(f"Voyage reranking failed: {e}")
sorted_hits = sorted(hits, key=lambda x: x.get("score", 0), reverse=True)
if top_k is not None and top_k > 0:
sorted_hits = sorted_hits[:top_k]
return sorted_hits

def get_model_name(self) -> str:
return self.config.model
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

import asyncio
import json
from datetime import datetime
from datetime import datetime, timezone
from functools import partial
from typing import Any, Dict, List, Optional

Expand Down
Loading