diff --git a/src/basic_memory_benchmarks/providers/mem0_local.py b/src/basic_memory_benchmarks/providers/mem0_local.py index 233f325..80f8cee 100644 --- a/src/basic_memory_benchmarks/providers/mem0_local.py +++ b/src/basic_memory_benchmarks/providers/mem0_local.py @@ -82,6 +82,16 @@ def _ensure_memory(self, run_config: RunConfig): if self._memory is not None: return self._memory + # Trigger: mem0 with MEM0_TELEMETRY on (its default) opens a qdrant + # client at a FIXED path (~/.mem0/migrations_qdrant) inside every + # Memory(); qdrant local mode allows one client per path per process, + # so the second provider instance in a grouped run dies with + # "Storage folder ... is already accessed" (matrix v1: 24/25 LME and + # 30/30 ConvoMem groups lost). + # Why: benchmark runs should not emit telemetry anyway. + # Outcome: telemetry store never created; operator can force-enable. + os.environ.setdefault("MEM0_TELEMETRY", "false") + from mem0 import Memory # Deferred import to keep startup lightweight base_url = os.getenv("MEM0_OPENAI_COMPAT_BASE_URL") @@ -175,7 +185,18 @@ def cleanup(self, run_config: RunConfig) -> None: self._memory.delete_all(user_id=self._user_id(run_config)) except Exception: # Cleanup should never break the main benchmark flow. - return + pass + # Release qdrant local-mode file locks even while this instance is + # still referenced (the grouped runner keeps the last provider for + # version_info), or the next instance cannot open its stores. + for store_attr in ("vector_store", "_telemetry_vector_store"): + client = getattr(getattr(self._memory, store_attr, None), "client", None) + if client is not None and hasattr(client, "close"): + try: + client.close() + except Exception: + pass + self._memory = None def version_info(self) -> dict[str, str]: metadata: dict[str, str] = { diff --git a/tests/providers/test_mem0_normalization.py b/tests/providers/test_mem0_normalization.py index 5b96413..393f336 100644 --- a/tests/providers/test_mem0_normalization.py +++ b/tests/providers/test_mem0_normalization.py @@ -66,3 +66,61 @@ def test_infer_env_flag(self, monkeypatch): assert Mem0LocalProvider()._infer is False monkeypatch.delenv("MEM0_INFER") assert Mem0LocalProvider()._infer is False + + +class TestTelemetryLockAvoidance: + def test_telemetry_disabled_before_import(self, monkeypatch): + import os + + from basic_memory_benchmarks.providers.mem0_local import Mem0LocalProvider + + monkeypatch.setenv("MEM0_OPENAI_COMPAT_BASE_URL", "http://localhost:1/v1") + monkeypatch.delenv("MEM0_TELEMETRY", raising=False) + provider = Mem0LocalProvider() + # _ensure_memory sets the env var before importing mem0; construction + # against the dead endpoint may fail later, which is fine here. + try: + provider._ensure_memory( + __import__("basic_memory_benchmarks.models", fromlist=["RunConfig"]).RunConfig( + run_id="t", dataset_id="t", dataset_path="t", corpus_dir="t", queries_path="t" + ) + ) + except Exception: + pass + assert os.environ["MEM0_TELEMETRY"] == "false" + + def test_cleanup_closes_clients_and_drops_memory(self): + from basic_memory_benchmarks.providers.mem0_local import Mem0LocalProvider + + class FakeClient: + closed = False + + def close(self): + self.closed = True + + class FakeStore: + def __init__(self): + self.client = FakeClient() + + class FakeMemory: + def __init__(self): + self.vector_store = FakeStore() + self._telemetry_vector_store = FakeStore() + + def delete_all(self, user_id): + pass + + provider = Mem0LocalProvider() + memory = FakeMemory() + provider._memory = memory + + from basic_memory_benchmarks.models import RunConfig + + provider.cleanup( + RunConfig( + run_id="t", dataset_id="t", dataset_path="t", corpus_dir="t", queries_path="t" + ) + ) + assert memory.vector_store.client.closed + assert memory._telemetry_vector_store.client.closed + assert provider._memory is None