Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion src/basic_memory_benchmarks/providers/mem0_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,16 @@ def _ensure_memory(self, run_config: RunConfig):
if self._memory is not None:
return self._memory

# Trigger: mem0 with MEM0_TELEMETRY on (its default) opens a qdrant
# client at a FIXED path (~/.mem0/migrations_qdrant) inside every
# Memory(); qdrant local mode allows one client per path per process,
# so the second provider instance in a grouped run dies with
# "Storage folder ... is already accessed" (matrix v1: 24/25 LME and
# 30/30 ConvoMem groups lost).
# Why: benchmark runs should not emit telemetry anyway.
# Outcome: telemetry store never created; operator can force-enable.
os.environ.setdefault("MEM0_TELEMETRY", "false")

from mem0 import Memory # Deferred import to keep startup lightweight

base_url = os.getenv("MEM0_OPENAI_COMPAT_BASE_URL")
Expand Down Expand Up @@ -175,7 +185,18 @@ def cleanup(self, run_config: RunConfig) -> None:
self._memory.delete_all(user_id=self._user_id(run_config))
except Exception:
# Cleanup should never break the main benchmark flow.
return
pass
# Release qdrant local-mode file locks even while this instance is
# still referenced (the grouped runner keeps the last provider for
# version_info), or the next instance cannot open its stores.
for store_attr in ("vector_store", "_telemetry_vector_store"):
client = getattr(getattr(self._memory, store_attr, None), "client", None)
if client is not None and hasattr(client, "close"):
try:
client.close()
except Exception:
pass
self._memory = None

def version_info(self) -> dict[str, str]:
metadata: dict[str, str] = {
Expand Down
58 changes: 58 additions & 0 deletions tests/providers/test_mem0_normalization.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,61 @@ def test_infer_env_flag(self, monkeypatch):
assert Mem0LocalProvider()._infer is False
monkeypatch.delenv("MEM0_INFER")
assert Mem0LocalProvider()._infer is False


class TestTelemetryLockAvoidance:
def test_telemetry_disabled_before_import(self, monkeypatch):
import os

from basic_memory_benchmarks.providers.mem0_local import Mem0LocalProvider

monkeypatch.setenv("MEM0_OPENAI_COMPAT_BASE_URL", "http://localhost:1/v1")
monkeypatch.delenv("MEM0_TELEMETRY", raising=False)
provider = Mem0LocalProvider()
# _ensure_memory sets the env var before importing mem0; construction
# against the dead endpoint may fail later, which is fine here.
try:
provider._ensure_memory(
__import__("basic_memory_benchmarks.models", fromlist=["RunConfig"]).RunConfig(
run_id="t", dataset_id="t", dataset_path="t", corpus_dir="t", queries_path="t"
)
)
except Exception:
pass
assert os.environ["MEM0_TELEMETRY"] == "false"

def test_cleanup_closes_clients_and_drops_memory(self):
from basic_memory_benchmarks.providers.mem0_local import Mem0LocalProvider

class FakeClient:
closed = False

def close(self):
self.closed = True

class FakeStore:
def __init__(self):
self.client = FakeClient()

class FakeMemory:
def __init__(self):
self.vector_store = FakeStore()
self._telemetry_vector_store = FakeStore()

def delete_all(self, user_id):
pass

provider = Mem0LocalProvider()
memory = FakeMemory()
provider._memory = memory

from basic_memory_benchmarks.models import RunConfig

provider.cleanup(
RunConfig(
run_id="t", dataset_id="t", dataset_path="t", corpus_dir="t", queries_path="t"
)
)
assert memory.vector_store.client.closed
assert memory._telemetry_vector_store.client.closed
assert provider._memory is None
Loading