diff --git a/.gitignore b/.gitignore index 9120beb..6e4202f 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,4 @@ benchmarks/datasets/longmemeval/longmemeval_s.json benchmarks/datasets/longmemeval/longmemeval_s.provenance.json benchmarks/datasets/locomo-audit/ benchmarks/.mem0-qdrant/ +benchmarks/datasets/convomem/ diff --git a/README.md b/README.md index fd9943c..5a9ba75 100644 --- a/README.md +++ b/README.md @@ -144,6 +144,31 @@ session-id prefix and per-turn `has_answer` flags. The converter remaps all session ids to neutral positional ids (`-s012`) and drops turn flags, so ingested corpora carry no evidence markers. +## ConvoMem (sampled) + +ConvoMem (Salesforce, Apache-2.0) ships ~75K QA pairs as pre-mixed test cases +— each a self-contained haystack of conversations plus questions — which map +1:1 onto grouped mode. The full dataset is multi-GB, so fetching is selective: +batch files are indexed with cheap HTTP Range tail-probes and only files +matching the requested context sizes are downloaded. The probe index +(`index.json`) records every file including the ones not downloaded, so the +selection is auditable. + +```bash +uv run bm-bench datasets fetch --dataset convomem --context-sizes 10,30 +uv run bm-bench convert convomem --sample-per-stratum 25 --seed 42 +``` + +Sampling is stratified by (category, contextSize) with a fixed seed; +`sampling.json` records the seed, per-stratum population, and sample counts — +a published number states exactly which slice of ConvoMem it covers. Note +`--sample-per-stratum` counts *cases* (haystacks); larger-context cases carry +multiple questions each, all sharing one ingested group corpus. + +Anti-leakage: raw conversations carry `containsEvidence`/`model_name` fields; +rendered docs include neither and conversation ids are remapped to neutral +positional ids. + ## Basic Memory source policy By default this project tracks Basic Memory from `main`. diff --git a/justfile b/justfile index 9f8e5a8..ffa0b56 100644 --- a/justfile +++ b/justfile @@ -65,6 +65,14 @@ bench-convert-longmemeval-dev: bench-prepare-longmemeval: bench-fetch-longmemeval bench-convert-longmemeval +bench-fetch-convomem: + uv run bm-bench datasets fetch --dataset convomem --context-sizes 10,30 + +bench-convert-convomem: + uv run bm-bench convert convomem --sample-per-stratum 25 --seed 42 + +bench-prepare-convomem: bench-fetch-convomem bench-convert-convomem + bench-fetch-locomo-audit: uv run bm-bench datasets fetch --dataset locomo-audit diff --git a/src/basic_memory_benchmarks/cli.py b/src/basic_memory_benchmarks/cli.py index 939bf11..5f53968 100644 --- a/src/basic_memory_benchmarks/cli.py +++ b/src/basic_memory_benchmarks/cli.py @@ -12,6 +12,8 @@ from basic_memory_benchmarks.converters.locomo_to_corpus import convert_locomo_to_corpus from basic_memory_benchmarks.converters.longmemeval_to_corpus import convert_longmemeval_to_corpus from basic_memory_benchmarks.datasets.locomo import LOCOMO_URL, fetch_locomo_dataset +from basic_memory_benchmarks.converters.convomem_to_corpus import convert_convomem_to_corpus +from basic_memory_benchmarks.datasets.convomem import fetch_convomem_batches from basic_memory_benchmarks.datasets.locomo_audit import fetch_locomo_audit_corrections from basic_memory_benchmarks.datasets.longmemeval import ( LONGMEMEVAL_S_URL, @@ -42,6 +44,9 @@ def datasets_fetch( dataset: str = typer.Option("locomo", "--dataset"), output: Path | None = typer.Option(None, "--output"), url: str | None = typer.Option(None, "--url"), + context_sizes: str = typer.Option( + "10,30", "--context-sizes", help="convomem only: batch context sizes to download" + ), ) -> None: if dataset == "locomo": resolved_output = output or Path("benchmarks/datasets/locomo/locomo10.json") @@ -54,8 +59,14 @@ def datasets_fetch( elif dataset == "locomo-audit": resolved_output = output or Path("benchmarks/datasets/locomo-audit/corrections.json") provenance = fetch_locomo_audit_corrections(output_path=resolved_output) + elif dataset == "convomem": + resolved_output = output or Path("benchmarks/datasets/convomem") + sizes = tuple(int(s.strip()) for s in context_sizes.split(",") if s.strip()) + provenance = fetch_convomem_batches(output_dir=resolved_output, context_sizes=sizes) else: - raise typer.BadParameter("Supported datasets: locomo, longmemeval-s, locomo-audit") + raise typer.BadParameter( + "Supported datasets: locomo, longmemeval-s, locomo-audit, convomem" + ) console.print(f"Downloaded {dataset} to [cyan]{resolved_output}[/cyan]") console.print(f"SHA256: [green]{provenance.checksum_sha256}[/green]") @@ -101,6 +112,27 @@ def convert_longmemeval( console.print(f"Queries: [cyan]{queries_path}[/cyan] ({query_count})") +@convert_app.command("convomem") +def convert_convomem( + batches_dir: Path = typer.Option(Path("benchmarks/datasets/convomem"), "--batches-dir"), + output_dir: Path = typer.Option(Path("benchmarks/generated/convomem"), "--output-dir"), + sample_per_stratum: int = typer.Option(25, "--sample-per-stratum"), + seed: int = typer.Option(42, "--seed"), + context_sizes: str = typer.Option("10,30", "--context-sizes"), +) -> None: + sizes = tuple(int(s.strip()) for s in context_sizes.split(",") if s.strip()) + groups_dir, queries_path, doc_count, query_count = convert_convomem_to_corpus( + batches_dir=batches_dir, + output_dir=output_dir, + sample_per_stratum=sample_per_stratum, + seed=seed, + context_sizes=sizes, + ) + console.print(f"Groups: [cyan]{groups_dir}[/cyan] ({doc_count} docs)") + console.print(f"Queries: [cyan]{queries_path}[/cyan] ({query_count})") + console.print(f"Sampling manifest: [cyan]{output_dir / 'sampling.json'}[/cyan]") + + @run_app.command("retrieval") def run_retrieval_command( providers: str = typer.Option("bm-local,mem0-local", "--providers"), diff --git a/src/basic_memory_benchmarks/converters/convomem_to_corpus.py b/src/basic_memory_benchmarks/converters/convomem_to_corpus.py new file mode 100644 index 0000000..9700b21 --- /dev/null +++ b/src/basic_memory_benchmarks/converters/convomem_to_corpus.py @@ -0,0 +1,160 @@ +"""Convert sampled ConvoMem pre-mixed test cases into grouped benchmark corpora. + +Sampling is stratified by (category, contextSize) with a fixed seed, and the +exact sample composition is written to ``sampling.json`` so a published number +can state precisely which slice of ConvoMem it covers. + +Anti-leakage: conversations carry ``containsEvidence`` and ``model_name`` +fields in the raw data; rendered docs include neither, and conversation ids +are remapped to neutral positional ids. +""" + +from __future__ import annotations + +import json +import random +from pathlib import Path + +from basic_memory_benchmarks.datasets.convomem import load_convomem_batches + +DATASET_ID = "convomem" + +# Directory names -> benchmark category labels used in reports. +CATEGORY_LABELS: dict[str, str] = { + "user_evidence": "user_facts", + "assistant_facts_evidence": "assistant_facts", + "changing_evidence": "knowledge_update", + "abstention_evidence": "abstention", + "preference_evidence": "preference", + "implicit_connection_evidence": "implicit_connection", +} + + +def _render_conversation_doc(doc_id: str, messages: list[dict]) -> str: + lines: list[str] = [ + "---", + f"title: {doc_id}", + "type: note", + f"source_doc_id: {doc_id}", + f"dataset_id: {DATASET_ID}", + "---", + "", + f"# {doc_id}", + "", + "## Conversation", + ] + for message in messages: + speaker = str(message.get("speaker", "unknown")).capitalize() + text = str(message.get("text", "")).strip() + if not text: + continue + lines.append(f"- **{speaker}:** {' '.join(text.split())}") + return "\n".join(lines).rstrip() + "\n" + + +def convert_convomem_to_corpus( + batches_dir: Path, + output_dir: Path, + sample_per_stratum: int = 25, + seed: int = 42, + context_sizes: tuple[int, ...] | None = None, +) -> tuple[Path, Path, int, int]: + """Sample cases per (category, contextSize) stratum and emit grouped corpora. + + Returns: + groups_dir, queries_path, doc_count, query_count + """ + strata: dict[tuple[str, int], list[tuple[str, int, dict]]] = {} + for category, file_name, cases in load_convomem_batches(batches_dir): + for case_index, case in enumerate(cases): + context_size = int(case.get("contextSize") or 0) + if context_sizes is not None and context_size not in context_sizes: + continue + strata.setdefault((category, context_size), []).append((file_name, case_index, case)) + + if not strata: + raise ValueError( + f"No ConvoMem cases matched context sizes {context_sizes} in {batches_dir}" + ) + + groups_dir = output_dir / "groups" + groups_dir.mkdir(parents=True, exist_ok=True) + + rng = random.Random(seed) + all_queries: list[dict] = [] + sampling_manifest: dict[str, dict] = {} + doc_count = 0 + + for (category, context_size), members in sorted(strata.items()): + sample_size = min(sample_per_stratum, len(members)) + # Sort first so sampling is deterministic regardless of load order. + members.sort(key=lambda member: (member[0], member[1])) + sampled = rng.sample(members, sample_size) + label = CATEGORY_LABELS.get(category, category) + sampling_manifest[f"{label}/cs{context_size}"] = { + "population": len(members), + "sampled": sample_size, + } + + for file_name, case_index, case in sorted(sampled, key=lambda m: (m[0], m[1])): + batch_tag = file_name.rsplit("__", 1)[-1].removesuffix(".json") + group_id = f"{label}-cs{context_size}-{batch_tag}-{case_index:04d}" + docs_dir = groups_dir / group_id / "docs" + docs_dir.mkdir(parents=True, exist_ok=True) + + doc_id_by_conversation_id: dict[str, str] = {} + for conv_index, conversation in enumerate(case.get("conversations") or []): + doc_id = f"{group_id}-c{conv_index:03d}" + raw_id = str(conversation.get("id") or f"conv-{conv_index}") + doc_id_by_conversation_id[raw_id] = doc_id + (docs_dir / f"{doc_id}.md").write_text( + _render_conversation_doc(doc_id, conversation.get("messages") or []), + encoding="utf-8", + ) + doc_count += 1 + + for query_index, evidence in enumerate(case.get("evidenceItems") or []): + ground_truth: list[str] = [] + for evidence_conversation in evidence.get("conversations") or []: + raw_id = str(evidence_conversation.get("id") or "") + mapped = doc_id_by_conversation_id.get(raw_id) + # Abstention evidence references conversations that are + # intentionally absent from the haystack; skip those. + if mapped is not None: + ground_truth.append(mapped) + + all_queries.append( + { + "id": f"{group_id}-q{query_index}", + "query": str(evidence.get("question", "")).strip(), + "category": label, + "group": group_id, + "ground_truth": sorted(ground_truth), + "expected_answer": str(evidence.get("answer", "")).strip() or None, + "metadata": { + "dataset_id": DATASET_ID, + "context_size": context_size, + "abstention": label == "abstention", + "domain": str(evidence.get("category", "")), + }, + } + ) + + queries_path = output_dir / "queries.json" + queries_path.write_text(json.dumps(all_queries, indent=2), encoding="utf-8") + + sampling_path = output_dir / "sampling.json" + sampling_path.write_text( + json.dumps( + { + "seed": seed, + "sample_per_stratum": sample_per_stratum, + "context_sizes": sorted(context_sizes) if context_sizes else "all", + "strata": sampling_manifest, + }, + indent=2, + ), + encoding="utf-8", + ) + + return groups_dir, queries_path, doc_count, len(all_queries) diff --git a/src/basic_memory_benchmarks/datasets/convomem.py b/src/basic_memory_benchmarks/datasets/convomem.py new file mode 100644 index 0000000..01a54de --- /dev/null +++ b/src/basic_memory_benchmarks/datasets/convomem.py @@ -0,0 +1,169 @@ +"""ConvoMem (Salesforce) dataset utilities. + +ConvoMem ships ~75K QA pairs as pre-mixed test cases on HuggingFace +(Salesforce/ConvoMem, Apache-2.0): each case is a self-contained haystack of +conversations (evidence + filler) plus its questions, organized as +``core_benchmark/pre_mixed_testcases//_evidence/batched_*.json``. +Cases map 1:1 onto the harness's grouped runner mode. + +The full dataset is multi-GB (a single context-size-300 batch is ~850MB), so +fetching is selective: batch files within a directory are ordered by case +context size, and the last few KB of each file contain its final case's +``contextSize``. A cheap HTTP Range tail-probe indexes every file without +downloading it; only files matching the requested context sizes are fetched. +""" + +from __future__ import annotations + +import json +import re +import time +from pathlib import Path + +import httpx + +from basic_memory_benchmarks.models import DatasetProvenance +from basic_memory_benchmarks.utils import sha256_file, utc_now_iso + +CONVOMEM_REPO = "Salesforce/ConvoMem" +CONVOMEM_LICENSE_NOTE = "ConvoMem by Salesforce AI Research (Apache-2.0); see dataset card." +_BASE_PATH = "core_benchmark/pre_mixed_testcases" +_RESOLVE = f"https://huggingface.co/datasets/{CONVOMEM_REPO}/resolve/main" +_TREE = f"https://huggingface.co/api/datasets/{CONVOMEM_REPO}/tree/main" + +# Benchmark categories and their lowest evidence level (per the dataset README; +# 'changing' requires >= 2 evidence items). +CATEGORY_EVIDENCE_LEVELS: dict[str, int] = { + "user_evidence": 1, + "assistant_facts_evidence": 1, + "changing_evidence": 2, + "abstention_evidence": 1, + "preference_evidence": 1, + "implicit_connection_evidence": 1, +} + +DEFAULT_CONTEXT_SIZES = (10, 30) +_TAIL_PROBE_BYTES = 4096 +_CONTEXT_SIZE_PATTERN = re.compile(r'"contextSize":\s*(\d+)') + + +def _get_with_retry(client: httpx.Client, url: str, **kwargs) -> httpx.Response: + """GET with retries: rapid probe sequences trip transient CDN resets.""" + last_error: Exception | None = None + for attempt in range(4): + try: + response = client.get(url, follow_redirects=True, **kwargs) + response.raise_for_status() + return response + except (httpx.TransportError, httpx.HTTPStatusError) as exc: + last_error = exc + time.sleep(0.5 * (attempt + 1)) + raise RuntimeError(f"GET {url} failed after retries: {last_error}") + + +def _tail_context_size(client: httpx.Client, repo_path: str) -> int | None: + """Read the final case's contextSize via an HTTP Range request.""" + response = _get_with_retry( + client, + f"{_RESOLVE}/{repo_path}", + headers={"Range": f"bytes=-{_TAIL_PROBE_BYTES}"}, + ) + matches = _CONTEXT_SIZE_PATTERN.findall(response.text) + return int(matches[-1]) if matches else None + + +def _list_batch_files(client: httpx.Client, category: str, level: int) -> list[str]: + response = _get_with_retry(client, f"{_TREE}/{_BASE_PATH}/{category}/{level}_evidence") + return sorted( + item["path"] + for item in response.json() + if isinstance(item, dict) and item.get("path", "").endswith(".json") + ) + + +def fetch_convomem_batches( + output_dir: Path, + context_sizes: tuple[int, ...] = DEFAULT_CONTEXT_SIZES, + categories: dict[str, int] | None = None, +) -> DatasetProvenance: + """Index every batch file by tail-probe and download only matching ones. + + Downloaded files are stored flat as ``____.json``; + an ``index.json`` records the full probe results (including files NOT + downloaded) so the selection itself is auditable. + """ + categories = categories or CATEGORY_EVIDENCE_LEVELS + output_dir.mkdir(parents=True, exist_ok=True) + wanted = set(context_sizes) + + index: list[dict] = [] + downloaded: list[Path] = [] + with httpx.Client(timeout=120) as client: + for category, level in categories.items(): + for repo_path in _list_batch_files(client, category, level): + # Throttle probes; HF's CDN resets connections on rapid bursts. + time.sleep(0.2) + tail_size = _tail_context_size(client, repo_path) + record = { + "repo_path": repo_path, + "category": category, + "evidence_level": level, + "tail_context_size": tail_size, + "downloaded": tail_size in wanted, + } + index.append(record) + if tail_size not in wanted: + continue + local_name = f"{category}__{level}__{Path(repo_path).name}" + local_path = output_dir / local_name + with client.stream( + "GET", f"{_RESOLVE}/{repo_path}", follow_redirects=True + ) as response: + response.raise_for_status() + with local_path.open("wb") as file: + for chunk in response.iter_bytes(): + file.write(chunk) + record["local_file"] = local_name + record["sha256"] = sha256_file(local_path) + downloaded.append(local_path) + + index_path = output_dir / "index.json" + index_path.write_text( + json.dumps( + {"context_sizes": sorted(wanted), "files": index}, + indent=2, + ), + encoding="utf-8", + ) + + provenance = DatasetProvenance( + dataset_id="convomem", + source_url=f"https://huggingface.co/datasets/{CONVOMEM_REPO}", + checksum_sha256=sha256_file(index_path), + license_note=CONVOMEM_LICENSE_NOTE, + fetched_at_utc=utc_now_iso(), + ) + index_path.with_suffix(".provenance.json").write_text( + json.dumps(provenance.model_dump(mode="json"), indent=2), + encoding="utf-8", + ) + if not downloaded: + raise RuntimeError( + f"No ConvoMem batch files matched context sizes {sorted(wanted)}; " + "see index.json for the probe results" + ) + return provenance + + +def load_convomem_batches(batches_dir: Path) -> list[tuple[str, str, list[dict]]]: + """Load downloaded batches as (category, local_file_name, cases) tuples.""" + results: list[tuple[str, str, list[dict]]] = [] + for path in sorted(batches_dir.glob("*__*__*.json")): + category = path.name.split("__", 1)[0] + payload = json.loads(path.read_text(encoding="utf-8")) + if not isinstance(payload, list): + raise ValueError(f"ConvoMem batch must be a list of cases: {path}") + results.append((category, path.name, payload)) + if not results: + raise FileNotFoundError(f"No ConvoMem batch files found in {batches_dir}") + return results diff --git a/tests/converters/test_convomem_converter.py b/tests/converters/test_convomem_converter.py new file mode 100644 index 0000000..f4cdcd2 --- /dev/null +++ b/tests/converters/test_convomem_converter.py @@ -0,0 +1,182 @@ +"""Tests for the ConvoMem stratified sampler/converter.""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pytest + +from basic_memory_benchmarks.converters.convomem_to_corpus import ( + convert_convomem_to_corpus, +) + + +def _case( + context_size: int, + conversation_ids: list[str], + evidence_conv_ids: list[str], + question: str = "What did I say?", + answer: str = "You said the thing.", +) -> dict: + return { + "contextSize": context_size, + "conversations": [ + { + "id": conv_id, + "containsEvidence": conv_id in evidence_conv_ids, + "model_name": "gemini-2.5-pro", + "messages": [ + {"speaker": "user", "text": f"hello from conversation {index}"}, + {"speaker": "assistant", "text": "hi there"}, + ], + } + for index, conv_id in enumerate(conversation_ids) + ], + "evidenceItems": [ + { + "question": question, + "answer": answer, + "category": "Personal Life", + "conversations": [{"id": conv_id} for conv_id in evidence_conv_ids], + "message_evidences": [], + } + ], + } + + +def _write_batch(batches_dir: Path, category: str, level: int, name: str, cases: list[dict]): + batches_dir.mkdir(parents=True, exist_ok=True) + (batches_dir / f"{category}__{level}__{name}.json").write_text( + json.dumps(cases), encoding="utf-8" + ) + + +class TestConvertConvomem: + def test_grouped_output_and_ground_truth(self, tmp_path): + batches = tmp_path / "batches" + _write_batch( + batches, + "user_evidence", + 1, + "batched_000", + [_case(2, ["conv-a", "conv-b"], ["conv-b"])], + ) + + groups_dir, queries_path, doc_count, query_count = convert_convomem_to_corpus( + batches_dir=batches, + output_dir=tmp_path / "out", + sample_per_stratum=10, + context_sizes=(2,), + ) + + assert doc_count == 2 + assert query_count == 1 + queries = json.loads(queries_path.read_text()) + query = queries[0] + assert query["category"] == "user_facts" + assert query["group"].startswith("user_facts-cs2-batched_000-") + # conv-b was index 1 -> doc id suffix c001. + assert query["ground_truth"] == [f"{query['group']}-c001"] + assert query["metadata"]["context_size"] == 2 + assert query["metadata"]["abstention"] is False + + def test_leakage_fields_scrubbed(self, tmp_path): + batches = tmp_path / "batches" + _write_batch( + batches, + "user_evidence", + 1, + "batched_000", + [_case(2, ["conv-a", "conv-b"], ["conv-b"])], + ) + groups_dir, _, _, _ = convert_convomem_to_corpus( + batches_dir=batches, output_dir=tmp_path / "out", context_sizes=(2,) + ) + + corpus_text = "".join(path.read_text() for path in groups_dir.rglob("*.md")) + assert "containsEvidence" not in corpus_text + assert "gemini" not in corpus_text + assert "conv-a" not in corpus_text # raw conversation ids remapped + + def test_sampling_is_deterministic(self, tmp_path): + batches = tmp_path / "batches" + cases = [_case(2, [f"conv-{i}-a", f"conv-{i}-b"], [f"conv-{i}-b"]) for i in range(20)] + _write_batch(batches, "user_evidence", 1, "batched_000", cases) + + ids_by_run = [] + for run in range(2): + out = tmp_path / f"out{run}" + _, queries_path, _, _ = convert_convomem_to_corpus( + batches_dir=batches, + output_dir=out, + sample_per_stratum=5, + seed=42, + context_sizes=(2,), + ) + ids_by_run.append([q["id"] for q in json.loads(queries_path.read_text())]) + + assert ids_by_run[0] == ids_by_run[1] + assert len(ids_by_run[0]) == 5 + + def test_different_seed_changes_sample(self, tmp_path): + batches = tmp_path / "batches" + cases = [_case(2, [f"conv-{i}-a", f"conv-{i}-b"], [f"conv-{i}-b"]) for i in range(20)] + _write_batch(batches, "user_evidence", 1, "batched_000", cases) + + samples = [] + for seed in (42, 43): + out = tmp_path / f"out-seed{seed}" + _, queries_path, _, _ = convert_convomem_to_corpus( + batches_dir=batches, + output_dir=out, + sample_per_stratum=5, + seed=seed, + context_sizes=(2,), + ) + samples.append({q["id"] for q in json.loads(queries_path.read_text())}) + assert samples[0] != samples[1] + + def test_stratification_and_manifest(self, tmp_path): + batches = tmp_path / "batches" + _write_batch( + batches, + "user_evidence", + 1, + "batched_000", + [_case(2, ["a1", "a2"], ["a2"]), _case(4, ["b1", "b2", "b3", "b4"], ["b1"])], + ) + _write_batch( + batches, + "abstention_evidence", + 1, + "batched_000", + [_case(2, ["c1", "c2"], [])], + ) + + out = tmp_path / "out" + _, queries_path, _, query_count = convert_convomem_to_corpus( + batches_dir=batches, output_dir=out, context_sizes=(2, 4) + ) + + assert query_count == 3 + manifest = json.loads((out / "sampling.json").read_text()) + assert manifest["seed"] == 42 + assert manifest["strata"]["user_facts/cs2"] == {"population": 1, "sampled": 1} + assert manifest["strata"]["user_facts/cs4"] == {"population": 1, "sampled": 1} + assert manifest["strata"]["abstention/cs2"] == {"population": 1, "sampled": 1} + + queries = json.loads(queries_path.read_text()) + abstention = [q for q in queries if q["category"] == "abstention"][0] + # Abstention evidence may reference conversations absent from the + # haystack; ground truth is simply empty then. + assert abstention["ground_truth"] == [] + assert abstention["metadata"]["abstention"] is True + + def test_context_size_filter_excludes(self, tmp_path): + batches = tmp_path / "batches" + _write_batch(batches, "user_evidence", 1, "batched_000", [_case(2, ["a1"], ["a1"])]) + with pytest.raises(ValueError, match="No ConvoMem cases matched"): + convert_convomem_to_corpus( + batches_dir=batches, output_dir=tmp_path / "out", context_sizes=(30,) + )