diff --git a/prompts/anatomy/identity-and-tools.md b/prompts/anatomy/identity-and-tools.md index 77f8a52..935b8e1 100644 --- a/prompts/anatomy/identity-and-tools.md +++ b/prompts/anatomy/identity-and-tools.md @@ -51,6 +51,26 @@ report back via Teams. automatically. - **`whoami`** — Check identity and connection status. - **`audit_log`** — Record an action before performing it. +- **`bootstrap_body_state`** — One-call index of today's operational + state: counts, top chats, open promises, cursor freshness. Call at + session start to land continuity in the first turn. Index only — + full message content is in `read_interactions`. +- **`read_interactions`** — Query your own interaction log with + structured filters (chat_id, sender, action, direction, since, + limit). Defaults to the last 24 h; can reach back up to 7 days. + +### Body-side observation discipline (pre-send check) + +Before every outbound send — `send_teams_message`, `send_email`, +`send_card`, `share_file` — call +`read_interactions(chat_id=, since=<24h ago>, limit=5)` and +scan the returned entries. If your draft repeats something you already +sent to this chat today, revise. This is the body-side analogue of +persona-sati's `observe` discipline — same cheap-not-precious posture. +The lookup is local (sub-10 ms in the common case), so the cost +budget is small even when several sends happen in one turn. Scope +is intentionally narrow: outbound publishing only. Reads, list calls, +and audit entries do not need a pre-call observe. ### Files (SharePoint / OneDrive) authorization diff --git a/src/entrabot/mcp_server.py b/src/entrabot/mcp_server.py index 30605b3..b8f3f61 100644 --- a/src/entrabot/mcp_server.py +++ b/src/entrabot/mcp_server.py @@ -3875,6 +3875,83 @@ async def _call(token: str) -> list: # noqa: ARG001 — token unused return json.dumps([p.to_entry() for p in promises], indent=2) +@mcp.tool() +async def read_interactions( + chat_id: str = "", + sender: str = "", + action: str = "", + direction: str = "", + since: str = "", + limit: int = 10, +) -> str: + """Query the agent's own interaction log — body-side analogue of recall. + + Every inbound + outbound message the agent handles is appended to + ``interactions/.jsonl`` by the MCP server. This tool reads + that log with structured filters so the model can answer "did I + already say this?" / "what did the sponsor ping me about earlier?" + without re-hitting Graph. + + Default window is the last 24 h; ``since`` may reach back up to 7 + days. Use BEFORE every outbound send (``send_teams_message``, + ``send_email``, ``send_card``, ``share_file``) with + ``chat_id=`` to avoid repeating yourself. + + Args: + chat_id: Teams chat ID. For outbound entries this matches + ``recipient``; for inbound, ``metadata.chat_id``. Empty + string = no filter. + sender: Exact sender (case-insensitive). Empty = no filter. + action: Exact action name, e.g. ``"send_teams_message"``. + Empty = no filter. + direction: ``"inbound"`` or ``"outbound"``. Empty = no filter. + since: ISO 8601 timestamp. Default = now − 24 h. Entries at or + before this cutoff are excluded. + limit: Max entries to return (default 10). + + Returns: + JSON array of raw interaction-log entries, most-recent first. + On validation failure: ``{"error": "..."}``. + """ + from entrabot.tools.read_interactions import read_interactions as _read + + try: + entries = _read( + chat_id=chat_id or None, + sender=sender or None, + action=action or None, + direction=direction or None, + since=since or None, + limit=limit, + ) + except ValueError as exc: + return json.dumps({"error": str(exc)}) + return json.dumps(entries, indent=2) + + +@mcp.tool() +async def bootstrap_body_state() -> str: + """Single-packet snapshot of body-side operational state. + + Counterpart to persona-sati's ``bootstrap_session``. Returns an + INDEX (not content) of today's activity, the most-active chats, + every open promise, and watched-chat cursor freshness. Call at + session start to land operational continuity in the first turn + without chaining multiple read tools. + + Full message content stays in ``read_interactions`` — bootstrap + is the catalog, ``read_interactions`` is the read. + + Returns: + JSON object: ``today_counts``, ``top_chats_today``, + ``open_promises``, ``cursor_freshness``, ``watched_chat_count``, + ``generated_at``. + """ + from entrabot.tools.body_bootstrap import bootstrap_body_state as _bootstrap + + return json.dumps(_bootstrap(), indent=2) + + @mcp.tool() async def resolve_promise(promise_id: str, resolution: str) -> str: """Mark a promise resolved. diff --git a/src/entrabot/tools/body_bootstrap.py b/src/entrabot/tools/body_bootstrap.py new file mode 100644 index 0000000..899ea9e --- /dev/null +++ b/src/entrabot/tools/body_bootstrap.py @@ -0,0 +1,208 @@ +"""Body-side bootstrap — single packet of operational state for session-start. + +Issue #20: counterpart to persona-sati's ``bootstrap_session``. Returns +an INDEX of the agent's recent operational activity (counts, top chats, +open promises, cursor freshness) so the model has continuity at the top +of a turn without having to call multiple read tools. + +Key design constraint: this is an INDEX, not content. Full interaction +summaries do not appear in the payload — :func:`read_interactions` +serves that on demand. Keeping bootstrap small means it can land in +context without dominating it. +""" + +from __future__ import annotations + +import asyncio +import logging +from datetime import UTC, datetime + +from entrabot.config import get_config +from entrabot.tools import chat_cursors +from entrabot.tools.interaction_log import _interaction_key +from entrabot.tools.promises import list_promises +from entrabot.tools.read_interactions import _entry_chat_id, _load_day + +logger = logging.getLogger("entrabot.tools.body_bootstrap") + +_DESCRIPTION_PREVIEW_LEN = 80 +_TOP_CHATS_LIMIT = 5 + + +def _today_entries() -> list[dict]: + """Load today's interaction JSONL via the configured backend.""" + today = datetime.now(UTC).strftime("%Y-%m-%d") + # Use _load_day so corrupt-line handling matches read_interactions. + # (_load_day reads _interaction_key(day) via get_backend().) + _ = _interaction_key # keep import path explicit for grep + return _load_day(today) + + +def _summarize_today(entries: list[dict]) -> dict: + by_action: dict[str, int] = {} + by_channel: dict[str, int] = {} + inbound = 0 + outbound = 0 + for e in entries: + direction = e.get("direction") + if direction == "inbound": + inbound += 1 + elif direction == "outbound": + outbound += 1 + action = e.get("action") + if action: + by_action[action] = by_action.get(action, 0) + 1 + channel = e.get("channel") + if channel: + by_channel[channel] = by_channel.get(channel, 0) + 1 + return { + "total": len(entries), + "inbound": inbound, + "outbound": outbound, + "by_action": by_action, + "by_channel": by_channel, + } + + +def _top_chats(entries: list[dict]) -> list[dict]: + """Top chats by interaction count today; ties broken by recency.""" + by_chat: dict[str, dict] = {} + for e in entries: + cid = _entry_chat_id(e) + if not cid: + continue + ts = e.get("ts") or "" + sender = e.get("sender") or "" + slot = by_chat.setdefault( + cid, + {"chat_id": cid, "interaction_count": 0, "last_activity": "", "last_sender": ""}, + ) + slot["interaction_count"] += 1 + if ts > slot["last_activity"]: + slot["last_activity"] = ts + slot["last_sender"] = sender + ordered = sorted( + by_chat.values(), + key=lambda s: (s["interaction_count"], s["last_activity"]), + reverse=True, + ) + return ordered[:_TOP_CHATS_LIMIT] + + +def _open_promises_index() -> list[dict]: + """Return ALL open promises as compact index entries (no top-N cap).""" + + def _drive() -> list: + return asyncio.run(list_promises(open_only=True)) + + try: + asyncio.get_running_loop() + except RuntimeError: + promises = _drive() + else: + # Running inside an event loop (e.g. async test context). Drive the + # coroutine in a worker thread so we don't deadlock — and so we + # don't leak an un-awaited coroutine by letting asyncio.run raise + # after constructing the call. + from concurrent.futures import ThreadPoolExecutor + + with ThreadPoolExecutor(max_workers=1) as ex: + promises = ex.submit(_drive).result() + out: list[dict] = [] + for p in promises: + desc = p.description or "" + preview = desc[:_DESCRIPTION_PREVIEW_LEN] + out.append( + { + "id": p.id, + "chat_id": p.chat_id, + "description_preview": preview, + "created_at": p.created_at, + "due_by": p.due_by, + } + ) + return out + + +def _cursor_freshness() -> dict: + """Summarize watched-chat cursor health.""" + from entrabot.storage.backend import get_backend + + backend = get_backend() + keys = [k for k in backend.list(prefix="chat_cursors/") if k.endswith(".json")] + cursors_present = 0 + cursors_stale = 0 + timestamps: list[str] = [] + for key in keys: + raw = backend.read_text(key) + if raw is None: + continue + try: + import json + + payload = json.loads(raw) + except (ValueError, TypeError): + continue + if not isinstance(payload, dict): + continue + cursors_present += 1 + last_ts = payload.get("last_ts") + if chat_cursors.is_stale(last_ts): + cursors_stale += 1 + if last_ts: + timestamps.append(last_ts) + return { + "watched_chat_count": _count_watched_chats(), + "cursors_present": cursors_present, + "cursors_stale": cursors_stale, + "oldest_cursor_ts": min(timestamps) if timestamps else None, + "newest_cursor_ts": max(timestamps) if timestamps else None, + } + + +def _count_watched_chats() -> int: + """Read the persisted watched_chats file; missing → 0.""" + cfg = get_config() + f = cfg.data_dir / "watched_chats" + if not f.is_file(): + return 0 + return sum(1 for line in f.read_text().splitlines() if line.strip()) + + +def _now_iso() -> str: + return datetime.now(UTC).isoformat() + + +def bootstrap_body_state() -> dict: + """Return a single packet of body-side state for session-start. + + Mirrors persona-sati's ``bootstrap_session`` shape: one call, one + JSON object the model can scan in a single read. Index only — + full interaction content stays in :func:`read_interactions`. + + Returns: + ``today_counts`` — totals, inbound/outbound, by_action, by_channel + for entries on today's (UTC) interaction log file. + ``top_chats_today`` — up to 5 chats by interaction count today; + ties broken by most-recent activity. Each entry: chat_id, + interaction_count, last_activity, last_sender. + ``open_promises`` — every open promise (no top-N cap, since + commitments are durable). Each entry: id, chat_id, + description_preview, created_at, due_by. + ``cursor_freshness`` — watched_chat_count, cursors_present, + cursors_stale (older than 24 h), oldest_cursor_ts, + newest_cursor_ts. + ``watched_chat_count`` — count from the persisted watched_chats + file (mirror of cursor_freshness.watched_chat_count for + top-level convenience). + ``generated_at`` — when the packet was assembled. + """ + today_entries = _today_entries() + return { + "today_counts": _summarize_today(today_entries), + "top_chats_today": _top_chats(today_entries), + "open_promises": _open_promises_index(), + "cursor_freshness": _cursor_freshness(), + "watched_chat_count": _count_watched_chats(), + "generated_at": _now_iso(), + } diff --git a/src/entrabot/tools/read_interactions.py b/src/entrabot/tools/read_interactions.py new file mode 100644 index 0000000..7d09cf5 --- /dev/null +++ b/src/entrabot/tools/read_interactions.py @@ -0,0 +1,191 @@ +"""Body-side query over the interaction log — chronological filter only. + +Issue #20: the entrabot MCP server logs every inbound/outbound interaction +to ``interactions/.jsonl`` via :mod:`entrabot.tools.interaction_log`, +but the model has no read path into its own operational history. This +module is that read path. + +Read-only. Routes through :class:`entrabot.storage.backend.MemoryBackend` +so both ``LocalBackend`` and ``BlobBackend`` work. v1 is chronological + +structured filters; no embeddings, no scoring, no caching. JSONL files +are small (a day's worth is typically <100 KB) and re-reading is cheap. + +Day-file window: defaults to "today + yesterday" (24h window). When +``since`` reaches further back, additional day files are loaded up to +:data:`_MAX_DAYS_SCAN` (7) to keep the read bounded. Anything older +than that requires a follow-up that raises the cap intentionally. +""" + +from __future__ import annotations + +import json +import logging +from datetime import UTC, datetime, timedelta + +from entrabot.storage.backend import get_backend +from entrabot.tools.interaction_log import _interaction_key + +logger = logging.getLogger("entrabot.tools.read_interactions") + +_VALID_DIRECTIONS = {"inbound", "outbound"} +_DEFAULT_SINCE_HOURS = 24 +_DEFAULT_LIMIT = 10 +_MAX_DAYS_SCAN = 7 # hard cap to keep the read bounded + + +def _parse_since(since: str | None) -> datetime: + """Return the UTC cutoff. Default = now - 24h.""" + if since is None: + return datetime.now(UTC) - timedelta(hours=_DEFAULT_SINCE_HOURS) + try: + dt = datetime.fromisoformat(since.replace("Z", "+00:00")) + except (ValueError, AttributeError) as exc: + raise ValueError(f"since must be an ISO 8601 timestamp, got {since!r}") from exc + if dt.tzinfo is None: + dt = dt.replace(tzinfo=UTC) + return dt + + +def _days_to_scan(cutoff: datetime, now: datetime) -> list[str]: + """Day strings (YYYY-MM-DD) from today back to *cutoff*, hard-capped. + + Always includes today + yesterday (matches the issue's default window + even when cutoff is within today). Extends backwards if cutoff is + older, never exceeding :data:`_MAX_DAYS_SCAN`. + + Both ``cutoff`` and ``now`` are normalized to UTC before extracting + calendar dates because interaction logs are keyed by UTC day. A + ``since`` value supplied in a non-UTC offset whose offset-local date + differs from its UTC date would otherwise skip the earliest required + day file. + """ + today = now.astimezone(UTC).date() + cutoff_date = cutoff.astimezone(UTC).date() + span_days = (today - cutoff_date).days + 1 # inclusive of both ends + span_days = max(span_days, 2) # always today + yesterday + if span_days > _MAX_DAYS_SCAN: + logger.warning( + "read_interactions cutoff would scan %d day files; capping at %d", + span_days, + _MAX_DAYS_SCAN, + ) + span_days = _MAX_DAYS_SCAN + return [(today - timedelta(days=i)).strftime("%Y-%m-%d") for i in range(span_days)] + + +def _load_day(day: str) -> list[dict]: + """Read the day's JSONL via the configured backend; corrupt lines skipped.""" + raw = get_backend().read_text(_interaction_key(day)) + if raw is None: + return [] + entries: list[dict] = [] + for line in raw.splitlines(): + line = line.strip() + if not line: + continue + try: + entries.append(json.loads(line)) + except json.JSONDecodeError: + logger.warning( + "skipping corrupt line in interactions/%s.jsonl: %s", + day, + line[:80], + ) + return entries + + +def _entry_chat_id(entry: dict) -> str | None: + """Pull the chat_id off an entry regardless of direction. + + Outbound entries store the destination in ``recipient``; inbound + entries (Teams pushes) store the chat in ``metadata.chat_id``. This + mirrors :func:`entrabot.tools.daily_summary._counterparty`. + """ + direction = entry.get("direction") + if direction == "outbound": + return entry.get("recipient") + meta = entry.get("metadata") or {} + return meta.get("chat_id") + + +def _matches( + entry: dict, + *, + chat_id: str | None, + sender: str | None, + action: str | None, + direction: str | None, + cutoff: datetime, +) -> bool: + if direction is not None and entry.get("direction") != direction: + return False + if action is not None and entry.get("action") != action: + return False + if sender is not None: + entry_sender = (entry.get("sender") or "").lower() + if entry_sender != sender.lower(): + return False + if chat_id is not None and _entry_chat_id(entry) != chat_id: + return False + ts_raw = entry.get("ts") + if ts_raw is None: + return False + try: + ts = datetime.fromisoformat(ts_raw.replace("Z", "+00:00")) + except (ValueError, AttributeError): + return False + if ts.tzinfo is None: + ts = ts.replace(tzinfo=UTC) + return ts > cutoff + + +def read_interactions( + chat_id: str | None = None, + sender: str | None = None, + action: str | None = None, + direction: str | None = None, + since: str | None = None, + limit: int = _DEFAULT_LIMIT, +) -> list[dict]: + """Return recent interaction entries matching the given filters. + + Args: + chat_id: Match the Teams chat ID. For outbound entries this is + ``recipient``; for inbound this is ``metadata.chat_id``. + sender: Exact sender match (case-insensitive — emails are + case-insensitive identities). + action: Exact match on the ``action`` field + (e.g. ``"send_teams_message"``). + direction: ``"inbound"`` or ``"outbound"``. + since: ISO 8601 timestamp. Default is now − 24 h. Entries at or + before this cutoff are excluded. + limit: Maximum entries to return (default 10). + + Returns: + Most-recent-first list of raw entry dicts (existing JSONL schema + preserved — caller sees what was written). + """ + if direction is not None and direction not in _VALID_DIRECTIONS: + raise ValueError(f"direction must be one of {_VALID_DIRECTIONS}, got {direction!r}") + if limit <= 0: + return [] + + cutoff = _parse_since(since) + now = datetime.now(UTC) + days = _days_to_scan(cutoff, now) + + collected: list[dict] = [] + for day in days: + for entry in _load_day(day): + if _matches( + entry, + chat_id=chat_id, + sender=sender, + action=action, + direction=direction, + cutoff=cutoff, + ): + collected.append(entry) + + collected.sort(key=lambda e: e.get("ts", ""), reverse=True) + return collected[:limit] diff --git a/tests/test_mcp_server_body_tools.py b/tests/test_mcp_server_body_tools.py new file mode 100644 index 0000000..6f38beb --- /dev/null +++ b/tests/test_mcp_server_body_tools.py @@ -0,0 +1,115 @@ +"""Tests confirming the two new body-side tools are registered with FastMCP. + +Issue #20: ``read_interactions`` and ``bootstrap_body_state`` must be +exposed through the MCP surface, not just available as Python imports. +These tests verify registration and the JSON-string contract the MCP +wrapper enforces. +""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pytest + +from entrabot import mcp_server +from entrabot.tools import interaction_log as il + + +@pytest.fixture +def tmp_data_dir(tmp_path, monkeypatch): + monkeypatch.setenv("ENTRABOT_DATA_DIR", str(tmp_path)) + monkeypatch.delenv("ENTRABOT_BLOB_ENDPOINT", raising=False) + monkeypatch.delenv("ENTRABOT_BLOB_CONTAINER", raising=False) + monkeypatch.setenv("ENTRABOT_KEEP_MEMORY_LOCAL", "true") + mcp_server._state.pop("config", None) + yield tmp_path + mcp_server._state.pop("config", None) + + +class TestToolRegistration: + def test_read_interactions_is_registered(self) -> None: + assert "read_interactions" in mcp_server.mcp._tool_manager._tools + + def test_bootstrap_body_state_is_registered(self) -> None: + assert "bootstrap_body_state" in mcp_server.mcp._tool_manager._tools + + +class TestReadInteractionsThroughMCP: + @pytest.mark.asyncio + async def test_returns_json_string(self, tmp_data_dir: Path) -> None: + il.log_interaction(channel="terminal", direction="inbound", sender="u", summary="hi") + tool = mcp_server.mcp._tool_manager._tools["read_interactions"] + out = await tool.fn() + # MCP tools return JSON strings (matches read_email, list_promises convention) + parsed = json.loads(out) + assert isinstance(parsed, list) + assert len(parsed) == 1 + assert parsed[0]["summary"] == "hi" + + @pytest.mark.asyncio + async def test_filters_passed_through(self, tmp_data_dir: Path) -> None: + il.log_interaction( + channel="teams_group", + direction="outbound", + sender="agent", + recipient="19:A@thread.v2", + summary="to A", + ) + il.log_interaction( + channel="teams_group", + direction="outbound", + sender="agent", + recipient="19:B@thread.v2", + summary="to B", + ) + tool = mcp_server.mcp._tool_manager._tools["read_interactions"] + out = await tool.fn(chat_id="19:A@thread.v2") + parsed = json.loads(out) + assert len(parsed) == 1 + assert parsed[0]["recipient"] == "19:A@thread.v2" + + @pytest.mark.asyncio + async def test_invalid_direction_returns_error_json(self, tmp_data_dir: Path) -> None: + """Validation errors come back as JSON, not raw exceptions.""" + tool = mcp_server.mcp._tool_manager._tools["read_interactions"] + out = await tool.fn(direction="sideways") + parsed = json.loads(out) + assert isinstance(parsed, dict) + assert "error" in parsed + + +class TestBootstrapBodyStateThroughMCP: + @pytest.mark.asyncio + async def test_returns_json_packet(self, tmp_data_dir: Path) -> None: + tool = mcp_server.mcp._tool_manager._tools["bootstrap_body_state"] + out = await tool.fn() + parsed = json.loads(out) + # All documented top-level keys present + for key in ( + "today_counts", + "top_chats_today", + "open_promises", + "cursor_freshness", + "watched_chat_count", + "generated_at", + ): + assert key in parsed + + @pytest.mark.asyncio + async def test_reflects_logged_interactions(self, tmp_data_dir: Path) -> None: + il.log_interaction( + channel="teams_dm", + direction="outbound", + sender="agent", + recipient="19:X@unq.gbl.spaces", + summary="hi", + action="send_teams_message", + ) + tool = mcp_server.mcp._tool_manager._tools["bootstrap_body_state"] + out = await tool.fn() + parsed = json.loads(out) + assert parsed["today_counts"]["total"] == 1 + assert parsed["today_counts"]["outbound"] == 1 + assert parsed["today_counts"]["by_action"]["send_teams_message"] == 1 diff --git a/tests/tools/test_body_bootstrap.py b/tests/tools/test_body_bootstrap.py new file mode 100644 index 0000000..f35a66f --- /dev/null +++ b/tests/tools/test_body_bootstrap.py @@ -0,0 +1,399 @@ +"""Tests for bootstrap_body_state — the body-side counterpart to bootstrap_session. + +Returns a single packet with today's interaction counts, the most active +chats, all open promises, cursor freshness, and watched chat count. The +goal is "index, not content" — full message content stays in +``read_interactions``; this is what lands in the model's bootstrap turn. +""" + +from __future__ import annotations + +import asyncio +from datetime import UTC, datetime, timedelta +from pathlib import Path +from unittest.mock import patch + +import pytest + +from entrabot.tools import chat_cursors +from entrabot.tools import interaction_log as il +from entrabot.tools import promises as pr +from entrabot.tools.body_bootstrap import bootstrap_body_state + + +@pytest.fixture +def tmp_data_dir(tmp_path, monkeypatch): + """Point storage at a temp dir and force LocalBackend.""" + monkeypatch.setenv("ENTRABOT_DATA_DIR", str(tmp_path)) + monkeypatch.delenv("ENTRABOT_BLOB_ENDPOINT", raising=False) + monkeypatch.delenv("ENTRABOT_BLOB_CONTAINER", raising=False) + monkeypatch.setenv("ENTRABOT_KEEP_MEMORY_LOCAL", "true") + return tmp_path + + +def _log_at(ts: datetime, **kwargs) -> None: + with patch.object(il, "_now", return_value=ts): + il.log_interaction(**kwargs) + + +# --------------------------------------------------------------------------- +# Empty / sensible defaults +# --------------------------------------------------------------------------- +class TestEmptyState: + def test_empty_storage_returns_zero_counts(self, tmp_data_dir: Path) -> None: + result = bootstrap_body_state() + assert result["today_counts"]["total"] == 0 + assert result["today_counts"]["inbound"] == 0 + assert result["today_counts"]["outbound"] == 0 + assert result["today_counts"]["by_action"] == {} + assert result["today_counts"]["by_channel"] == {} + assert result["top_chats_today"] == [] + assert result["open_promises"] == [] + assert result["watched_chat_count"] == 0 + + def test_cursor_freshness_zero_when_no_cursors(self, tmp_data_dir: Path) -> None: + result = bootstrap_body_state() + cf = result["cursor_freshness"] + assert cf["watched_chat_count"] == 0 + assert cf["cursors_present"] == 0 + assert cf["cursors_stale"] == 0 + assert cf["oldest_cursor_ts"] is None + assert cf["newest_cursor_ts"] is None + + def test_generated_at_is_iso_utc(self, tmp_data_dir: Path) -> None: + result = bootstrap_body_state() + assert "generated_at" in result + # Must parse as ISO 8601 + datetime.fromisoformat(result["generated_at"].replace("Z", "+00:00")) + + +# --------------------------------------------------------------------------- +# today_counts +# --------------------------------------------------------------------------- +class TestTodayCounts: + def test_counts_inbound_and_outbound(self, tmp_data_dir: Path) -> None: + il.log_interaction(channel="terminal", direction="inbound", sender="u", summary="in1") + il.log_interaction(channel="terminal", direction="inbound", sender="u", summary="in2") + il.log_interaction(channel="terminal", direction="outbound", sender="agent", summary="out1") + result = bootstrap_body_state() + assert result["today_counts"]["total"] == 3 + assert result["today_counts"]["inbound"] == 2 + assert result["today_counts"]["outbound"] == 1 + + def test_by_action_counts_actions(self, tmp_data_dir: Path) -> None: + il.log_interaction( + channel="teams_dm", + direction="outbound", + sender="agent", + summary="s1", + action="send_teams_message", + ) + il.log_interaction( + channel="teams_dm", + direction="outbound", + sender="agent", + summary="s2", + action="send_teams_message", + ) + il.log_interaction( + channel="teams_dm", + direction="outbound", + sender="agent", + summary="c1", + action="send_card", + ) + result = bootstrap_body_state() + assert result["today_counts"]["by_action"] == { + "send_teams_message": 2, + "send_card": 1, + } + + def test_by_channel_counts_channels(self, tmp_data_dir: Path) -> None: + il.log_interaction(channel="teams_dm", direction="outbound", sender="agent", summary="a") + il.log_interaction(channel="teams_dm", direction="inbound", sender="u", summary="b") + il.log_interaction(channel="email", direction="inbound", sender="u", summary="c") + result = bootstrap_body_state() + assert result["today_counts"]["by_channel"] == { + "teams_dm": 2, + "email": 1, + } + + def test_yesterdays_entries_not_counted_as_today(self, tmp_data_dir: Path) -> None: + now = datetime.now(UTC) + _log_at( + now - timedelta(hours=30), + channel="terminal", + direction="inbound", + sender="u", + summary="ancient", + ) + il.log_interaction(channel="terminal", direction="inbound", sender="u", summary="now") + result = bootstrap_body_state() + assert result["today_counts"]["total"] == 1 + + +# --------------------------------------------------------------------------- +# top_chats_today +# --------------------------------------------------------------------------- +class TestTopChatsToday: + def test_sorts_by_interaction_count_desc(self, tmp_data_dir: Path) -> None: + # Chat A: 3 interactions; Chat B: 2; Chat C: 1 + for _ in range(3): + il.log_interaction( + channel="teams_group", + direction="outbound", + sender="agent", + recipient="19:A@thread.v2", + summary="a", + ) + for _ in range(2): + il.log_interaction( + channel="teams_group", + direction="outbound", + sender="agent", + recipient="19:B@thread.v2", + summary="b", + ) + il.log_interaction( + channel="teams_group", + direction="outbound", + sender="agent", + recipient="19:C@thread.v2", + summary="c", + ) + result = bootstrap_body_state() + ids = [c["chat_id"] for c in result["top_chats_today"]] + assert ids == ["19:A@thread.v2", "19:B@thread.v2", "19:C@thread.v2"] + counts = [c["interaction_count"] for c in result["top_chats_today"]] + assert counts == [3, 2, 1] + + def test_returns_at_most_5(self, tmp_data_dir: Path) -> None: + for i in range(8): + il.log_interaction( + channel="teams_group", + direction="outbound", + sender="agent", + recipient=f"19:C{i}@thread.v2", + summary=f"to {i}", + ) + result = bootstrap_body_state() + assert len(result["top_chats_today"]) == 5 + + def test_includes_inbound_chats_via_metadata(self, tmp_data_dir: Path) -> None: + il.log_interaction( + channel="teams_group", + direction="inbound", + sender="brandon@x.com", + summary="hi", + metadata={"chat_id": "19:X@thread.v2"}, + ) + result = bootstrap_body_state() + assert len(result["top_chats_today"]) == 1 + assert result["top_chats_today"][0]["chat_id"] == "19:X@thread.v2" + + def test_includes_last_activity_and_last_sender(self, tmp_data_dir: Path) -> None: + now = datetime.now(UTC) + earlier_ts = now - timedelta(minutes=10) + latest_ts = now - timedelta(minutes=1) + _log_at( + earlier_ts, + channel="teams_group", + direction="outbound", + sender="agent", + recipient="19:A@thread.v2", + summary="earlier", + ) + _log_at( + latest_ts, + channel="teams_group", + direction="inbound", + sender="brandon@x.com", + summary="latest", + metadata={"chat_id": "19:A@thread.v2"}, + ) + result = bootstrap_body_state() + assert len(result["top_chats_today"]) == 1 + top = result["top_chats_today"][0] + assert top["interaction_count"] == 2 + assert top["last_sender"] == "brandon@x.com" + # last_activity must reflect the NEWER of the two entries, not + # the earlier one — regression for the recency selection in + # _top_chats. + last = datetime.fromisoformat(top["last_activity"].replace("Z", "+00:00")) + assert last == latest_ts + assert last > earlier_ts + + def test_excludes_entries_with_no_chat_id(self, tmp_data_dir: Path) -> None: + """Terminal sends have no chat_id and shouldn't pollute top_chats.""" + il.log_interaction(channel="terminal", direction="outbound", sender="agent", summary="cli") + il.log_interaction( + channel="teams_group", + direction="outbound", + sender="agent", + recipient="19:X@thread.v2", + summary="real", + ) + result = bootstrap_body_state() + assert len(result["top_chats_today"]) == 1 + assert result["top_chats_today"][0]["chat_id"] == "19:X@thread.v2" + + def test_ties_broken_by_recency(self, tmp_data_dir: Path) -> None: + """Equal counts → most-recent activity wins.""" + now = datetime.now(UTC) + # Chat A: one old entry + _log_at( + now - timedelta(hours=5), + channel="teams_group", + direction="outbound", + sender="agent", + recipient="19:A@thread.v2", + summary="a", + ) + # Chat B: one fresh entry + _log_at( + now - timedelta(minutes=1), + channel="teams_group", + direction="outbound", + sender="agent", + recipient="19:B@thread.v2", + summary="b", + ) + result = bootstrap_body_state() + ids = [c["chat_id"] for c in result["top_chats_today"]] + # Same count (1 each) → B is more recent → B first + assert ids[0] == "19:B@thread.v2" + + +# --------------------------------------------------------------------------- +# open_promises — ALL open promises (no top-N cap) +# --------------------------------------------------------------------------- +class TestOpenPromises: + def test_no_promises_returns_empty(self, tmp_data_dir: Path) -> None: + result = bootstrap_body_state() + assert result["open_promises"] == [] + + def test_returns_all_open_promises(self, tmp_data_dir: Path) -> None: + async def _seed() -> None: + for i in range(12): + await pr.add_promise( + chat_id=f"19:C{i}@thread.v2", + description=f"Do thing {i}", + ) + + asyncio.run(_seed()) + result = bootstrap_body_state() + # ALL open promises, not top-N + assert len(result["open_promises"]) == 12 + + def test_promise_shape_has_required_fields(self, tmp_data_dir: Path) -> None: + async def _seed() -> None: + await pr.add_promise( + chat_id="19:A@thread.v2", + description="The description text here is fairly long to test preview", + due_by="2026-12-31T00:00:00+00:00", + ) + + asyncio.run(_seed()) + result = bootstrap_body_state() + assert len(result["open_promises"]) == 1 + p = result["open_promises"][0] + assert "id" in p + assert p["chat_id"] == "19:A@thread.v2" + assert "description_preview" in p + assert "created_at" in p + assert p["due_by"] == "2026-12-31T00:00:00+00:00" + + def test_due_by_null_when_unset(self, tmp_data_dir: Path) -> None: + async def _seed() -> None: + await pr.add_promise(chat_id="19:A@thread.v2", description="d") + + asyncio.run(_seed()) + result = bootstrap_body_state() + assert result["open_promises"][0]["due_by"] is None + + +# --------------------------------------------------------------------------- +# cursor_freshness +# --------------------------------------------------------------------------- +class TestCursorFreshness: + def test_counts_present_cursors(self, tmp_data_dir: Path) -> None: + recent = (datetime.now(UTC) - timedelta(minutes=10)).strftime("%Y-%m-%dT%H:%M:%SZ") + chat_cursors.save_cursor( + "19:A@thread.v2", {"last_ts": recent, "seen_ids_tail": [], "bootstrapped": True} + ) + chat_cursors.save_cursor( + "19:B@thread.v2", {"last_ts": recent, "seen_ids_tail": [], "bootstrapped": True} + ) + result = bootstrap_body_state() + cf = result["cursor_freshness"] + assert cf["cursors_present"] == 2 + assert cf["cursors_stale"] == 0 + + def test_distinguishes_stale_from_fresh(self, tmp_data_dir: Path) -> None: + recent = (datetime.now(UTC) - timedelta(minutes=10)).strftime("%Y-%m-%dT%H:%M:%SZ") + stale = ( + datetime.now(UTC) - timedelta(seconds=chat_cursors.CURSOR_STALENESS_SECONDS + 3600) + ).strftime("%Y-%m-%dT%H:%M:%SZ") + chat_cursors.save_cursor( + "19:fresh@thread.v2", {"last_ts": recent, "seen_ids_tail": [], "bootstrapped": True} + ) + chat_cursors.save_cursor( + "19:stale@thread.v2", {"last_ts": stale, "seen_ids_tail": [], "bootstrapped": True} + ) + result = bootstrap_body_state() + cf = result["cursor_freshness"] + assert cf["cursors_present"] == 2 + assert cf["cursors_stale"] == 1 + + def test_oldest_and_newest_cursor_ts(self, tmp_data_dir: Path) -> None: + old = (datetime.now(UTC) - timedelta(hours=5)).strftime("%Y-%m-%dT%H:%M:%SZ") + new = (datetime.now(UTC) - timedelta(minutes=1)).strftime("%Y-%m-%dT%H:%M:%SZ") + chat_cursors.save_cursor( + "19:A@thread.v2", {"last_ts": old, "seen_ids_tail": [], "bootstrapped": True} + ) + chat_cursors.save_cursor( + "19:B@thread.v2", {"last_ts": new, "seen_ids_tail": [], "bootstrapped": True} + ) + result = bootstrap_body_state() + cf = result["cursor_freshness"] + assert cf["oldest_cursor_ts"] == old + assert cf["newest_cursor_ts"] == new + + +# --------------------------------------------------------------------------- +# watched_chat_count +# --------------------------------------------------------------------------- +class TestWatchedChatCount: + def test_counts_persisted_watched_chats(self, tmp_data_dir: Path) -> None: + """watched_chat_count reads from the persisted watched_chats file.""" + (tmp_data_dir / "watched_chats").write_text( + "19:A@thread.v2\n19:B@thread.v2\n19:C@unq.gbl.spaces\n" + ) + result = bootstrap_body_state() + assert result["watched_chat_count"] == 3 + + def test_skips_blank_lines_in_watched_chats(self, tmp_data_dir: Path) -> None: + (tmp_data_dir / "watched_chats").write_text("19:A@thread.v2\n\n19:B@thread.v2\n\n") + result = bootstrap_body_state() + assert result["watched_chat_count"] == 2 + + +# --------------------------------------------------------------------------- +# Indexes only — no message content +# --------------------------------------------------------------------------- +class TestNoContentLeak: + def test_message_summaries_not_in_payload(self, tmp_data_dir: Path) -> None: + """Bootstrap is INDEX. Full summaries don't belong here.""" + unique = "AAAAA-SECRET-SENTINEL-VALUE-XYZ" + il.log_interaction( + channel="teams_dm", + direction="outbound", + sender="agent", + recipient="19:X@unq.gbl.spaces", + summary=unique, + action="send_teams_message", + ) + result = bootstrap_body_state() + import json + + assert unique not in json.dumps(result) diff --git a/tests/tools/test_read_interactions.py b/tests/tools/test_read_interactions.py new file mode 100644 index 0000000..3f3675b --- /dev/null +++ b/tests/tools/test_read_interactions.py @@ -0,0 +1,520 @@ +"""Tests for read_interactions — chronological filter over the interaction log. + +The body-side analogue of persona-sati's ``recall``: lets the model query +its own operational history (interaction_log) before speaking. Cheap, not +precious. Chronological + structured filters, no semantic scoring. + +Storage path goes through MemoryBackend so BlobBackend works in cloud +mode. JSONL on-disk schema is the existing ``interactions/.jsonl`` +shape — this module is read-only and must not change writes. +""" + +from __future__ import annotations + +from datetime import UTC, datetime, timedelta +from pathlib import Path +from unittest.mock import patch + +import pytest + +from entrabot.tools import interaction_log as il +from entrabot.tools.read_interactions import read_interactions + + +@pytest.fixture +def tmp_data_dir(tmp_path, monkeypatch): + """Point storage at a temp directory and force LocalBackend.""" + monkeypatch.setenv("ENTRABOT_DATA_DIR", str(tmp_path)) + monkeypatch.delenv("ENTRABOT_BLOB_ENDPOINT", raising=False) + monkeypatch.delenv("ENTRABOT_BLOB_CONTAINER", raising=False) + monkeypatch.setenv("ENTRABOT_KEEP_MEMORY_LOCAL", "true") + return tmp_path + + +def _log_at(ts: datetime, **kwargs) -> None: + """Write one interaction at a specific UTC timestamp.""" + with patch.object(il, "_now", return_value=ts): + il.log_interaction(**kwargs) + + +# --------------------------------------------------------------------------- +# Basic shape + default behavior +# --------------------------------------------------------------------------- +class TestReadInteractionsBasic: + def test_empty_storage_returns_empty_list(self, tmp_data_dir: Path) -> None: + assert read_interactions() == [] + + def test_returns_list_of_dicts_in_original_schema(self, tmp_data_dir: Path) -> None: + il.log_interaction( + channel="teams_dm", + direction="outbound", + sender="agent@x.com", + recipient="19:chat@unq.gbl.spaces", + summary="hi", + action="send_teams_message", + ) + results = read_interactions() + assert len(results) == 1 + entry = results[0] + assert entry["channel"] == "teams_dm" + assert entry["direction"] == "outbound" + assert entry["sender"] == "agent@x.com" + assert entry["recipient"] == "19:chat@unq.gbl.spaces" + assert entry["summary"] == "hi" + assert entry["action"] == "send_teams_message" + # Existing schema preserved — id + ts always present + assert "id" in entry + assert "ts" in entry + + def test_sort_order_is_most_recent_first(self, tmp_data_dir: Path) -> None: + now = datetime.now(UTC) + _log_at( + now - timedelta(hours=3), + channel="terminal", + direction="inbound", + sender="u", + summary="oldest", + ) + _log_at( + now - timedelta(hours=2), + channel="terminal", + direction="inbound", + sender="u", + summary="middle", + ) + _log_at( + now - timedelta(hours=1), + channel="terminal", + direction="inbound", + sender="u", + summary="newest", + ) + results = read_interactions() + summaries = [e["summary"] for e in results] + assert summaries == ["newest", "middle", "oldest"] + + +# --------------------------------------------------------------------------- +# limit +# --------------------------------------------------------------------------- +class TestLimit: + def test_limit_default_is_10(self, tmp_data_dir: Path) -> None: + now = datetime.now(UTC) + for i in range(15): + _log_at( + now - timedelta(minutes=i), + channel="terminal", + direction="inbound", + sender="u", + summary=f"m{i}", + ) + results = read_interactions() + assert len(results) == 10 + # Most-recent-first, so m0..m9 (m0 is newest) + assert [e["summary"] for e in results] == [f"m{i}" for i in range(10)] + + def test_limit_honored_when_explicit(self, tmp_data_dir: Path) -> None: + now = datetime.now(UTC) + for i in range(5): + _log_at( + now - timedelta(minutes=i), + channel="terminal", + direction="inbound", + sender="u", + summary=f"m{i}", + ) + results = read_interactions(limit=2) + assert len(results) == 2 + assert [e["summary"] for e in results] == ["m0", "m1"] + + def test_limit_zero_returns_empty(self, tmp_data_dir: Path) -> None: + il.log_interaction(channel="terminal", direction="inbound", sender="u", summary="x") + assert read_interactions(limit=0) == [] + + +# --------------------------------------------------------------------------- +# Filters +# --------------------------------------------------------------------------- +class TestChatIdFilter: + def test_matches_outbound_recipient(self, tmp_data_dir: Path) -> None: + il.log_interaction( + channel="teams_group", + direction="outbound", + sender="agent", + recipient="19:A@thread.v2", + summary="to A", + ) + il.log_interaction( + channel="teams_group", + direction="outbound", + sender="agent", + recipient="19:B@thread.v2", + summary="to B", + ) + results = read_interactions(chat_id="19:A@thread.v2") + assert len(results) == 1 + assert results[0]["summary"] == "to A" + + def test_matches_inbound_metadata_chat_id(self, tmp_data_dir: Path) -> None: + il.log_interaction( + channel="teams_group", + direction="inbound", + sender="brandon@x.com", + summary="from A", + metadata={"chat_id": "19:A@thread.v2"}, + ) + il.log_interaction( + channel="teams_group", + direction="inbound", + sender="brandon@x.com", + summary="from B", + metadata={"chat_id": "19:B@thread.v2"}, + ) + results = read_interactions(chat_id="19:A@thread.v2") + assert len(results) == 1 + assert results[0]["summary"] == "from A" + + def test_matches_mixed_directions_for_same_chat(self, tmp_data_dir: Path) -> None: + il.log_interaction( + channel="teams_group", + direction="outbound", + sender="agent", + recipient="19:A@thread.v2", + summary="sent", + ) + il.log_interaction( + channel="teams_group", + direction="inbound", + sender="brandon@x.com", + summary="received", + metadata={"chat_id": "19:A@thread.v2"}, + ) + results = read_interactions(chat_id="19:A@thread.v2") + assert {e["summary"] for e in results} == {"sent", "received"} + + +class TestSenderFilter: + def test_matches_exact_sender(self, tmp_data_dir: Path) -> None: + il.log_interaction(channel="email", direction="inbound", sender="alice@x.com", summary="a") + il.log_interaction(channel="email", direction="inbound", sender="bob@x.com", summary="b") + results = read_interactions(sender="alice@x.com") + assert len(results) == 1 + assert results[0]["summary"] == "a" + + def test_sender_match_is_case_insensitive(self, tmp_data_dir: Path) -> None: + il.log_interaction(channel="email", direction="inbound", sender="Alice@X.com", summary="a") + results = read_interactions(sender="alice@x.com") + assert len(results) == 1 + assert results[0]["summary"] == "a" + + +class TestActionFilter: + def test_matches_exact_action(self, tmp_data_dir: Path) -> None: + il.log_interaction( + channel="teams_dm", + direction="outbound", + sender="agent", + summary="m1", + action="send_teams_message", + ) + il.log_interaction( + channel="teams_dm", + direction="outbound", + sender="agent", + summary="m2", + action="send_card", + ) + results = read_interactions(action="send_card") + assert len(results) == 1 + assert results[0]["summary"] == "m2" + + def test_action_filter_skips_entries_with_no_action(self, tmp_data_dir: Path) -> None: + il.log_interaction(channel="terminal", direction="inbound", sender="u", summary="no action") + il.log_interaction( + channel="teams_dm", + direction="outbound", + sender="agent", + summary="has action", + action="send_teams_message", + ) + results = read_interactions(action="send_teams_message") + assert len(results) == 1 + assert results[0]["summary"] == "has action" + + +class TestDirectionFilter: + def test_inbound_only(self, tmp_data_dir: Path) -> None: + il.log_interaction(channel="terminal", direction="inbound", sender="u", summary="in") + il.log_interaction(channel="terminal", direction="outbound", sender="agent", summary="out") + results = read_interactions(direction="inbound") + assert len(results) == 1 + assert results[0]["summary"] == "in" + + def test_outbound_only(self, tmp_data_dir: Path) -> None: + il.log_interaction(channel="terminal", direction="inbound", sender="u", summary="in") + il.log_interaction(channel="terminal", direction="outbound", sender="agent", summary="out") + results = read_interactions(direction="outbound") + assert len(results) == 1 + assert results[0]["summary"] == "out" + + def test_invalid_direction_raises(self, tmp_data_dir: Path) -> None: + with pytest.raises(ValueError, match="direction"): + read_interactions(direction="sideways") + + +# --------------------------------------------------------------------------- +# since — chronological window +# --------------------------------------------------------------------------- +class TestSinceFilter: + def test_default_since_is_24h_ago(self, tmp_data_dir: Path) -> None: + now = datetime.now(UTC) + _log_at( + now - timedelta(hours=2), + channel="terminal", + direction="inbound", + sender="u", + summary="recent", + ) + _log_at( + now - timedelta(hours=48), + channel="terminal", + direction="inbound", + sender="u", + summary="too old", + ) + results = read_interactions() + summaries = [e["summary"] for e in results] + assert "recent" in summaries + assert "too old" not in summaries + + def test_explicit_since_includes_older_entries(self, tmp_data_dir: Path) -> None: + now = datetime.now(UTC) + _log_at( + now - timedelta(hours=48), + channel="terminal", + direction="inbound", + sender="u", + summary="48h ago", + ) + since = (now - timedelta(hours=72)).isoformat() + results = read_interactions(since=since) + assert len(results) == 1 + assert results[0]["summary"] == "48h ago" + + def test_since_excludes_entries_at_or_before_cutoff(self, tmp_data_dir: Path) -> None: + now = datetime.now(UTC) + cutoff = now - timedelta(hours=2) + _log_at( + cutoff - timedelta(seconds=1), + channel="terminal", + direction="inbound", + sender="u", + summary="just before", + ) + _log_at( + cutoff + timedelta(seconds=1), + channel="terminal", + direction="inbound", + sender="u", + summary="just after", + ) + results = read_interactions(since=cutoff.isoformat()) + summaries = [e["summary"] for e in results] + assert "just after" in summaries + assert "just before" not in summaries + + def test_day_boundary_crossover(self, tmp_data_dir: Path) -> None: + """24h window must read today's AND yesterday's file.""" + now = datetime.now(UTC) + # Force one entry 18h ago (definitely yesterday in UTC) + _log_at( + now - timedelta(hours=18), + channel="terminal", + direction="inbound", + sender="u", + summary="yesterday-ish", + ) + _log_at( + now - timedelta(minutes=5), + channel="terminal", + direction="inbound", + sender="u", + summary="today", + ) + results = read_interactions() + summaries = [e["summary"] for e in results] + assert "today" in summaries + assert "yesterday-ish" in summaries + + def test_since_with_non_utc_offset_scans_correct_utc_day( + self, tmp_data_dir: Path + ) -> None: + """Regression for PR #21 review (medium): cutoff.date() in a + non-UTC offset must not skip the earliest required UTC day file. + + Construct a `since` whose offset-local date is one day LATER + than its UTC date, with an entry living on the UTC day. If + `_days_to_scan` uses the un-normalized date, the day file the + entry lives in is skipped and the entry is silently lost. + """ + from datetime import timezone + + now_utc = datetime.now(UTC) + # Entry: 3 days ago at 22:00 UTC → lands in that day's file. + entry_ts = (now_utc - timedelta(days=3)).replace( + hour=22, minute=0, second=0, microsecond=0 + ) + _log_at( + entry_ts, + channel="terminal", + direction="inbound", + sender="u", + summary="deep-past-utc-day", + ) + + # Cutoff: 1h before the entry in UTC, expressed as +12:00 — the + # offset rotates the calendar date forward into the next day. + cutoff_utc = entry_ts - timedelta(hours=1) + cutoff_in_offset = cutoff_utc.astimezone(timezone(timedelta(hours=12))) + # Sanity: this construction actually exposes the bug condition. + assert cutoff_in_offset.date() > cutoff_utc.date() + + results = read_interactions(since=cutoff_in_offset.isoformat()) + summaries = [e["summary"] for e in results] + assert "deep-past-utc-day" in summaries + + def test_seven_day_cap_does_not_scan_further(self, tmp_data_dir: Path, caplog) -> None: + """Pass since=10d ago — we cap at 7 day files. 10d-old entry NOT returned.""" + now = datetime.now(UTC) + _log_at( + now - timedelta(days=10), + channel="terminal", + direction="inbound", + sender="u", + summary="too-old-cap", + ) + _log_at( + now - timedelta(days=3), + channel="terminal", + direction="inbound", + sender="u", + summary="within-cap", + ) + since = (now - timedelta(days=10)).isoformat() + results = read_interactions(since=since) + summaries = [e["summary"] for e in results] + assert "within-cap" in summaries + assert "too-old-cap" not in summaries + + def test_invalid_since_raises(self, tmp_data_dir: Path) -> None: + with pytest.raises(ValueError, match="since"): + read_interactions(since="not-a-timestamp") + + +# --------------------------------------------------------------------------- +# Filter composition +# --------------------------------------------------------------------------- +class TestFilterComposition: + def test_chat_id_plus_direction_plus_sender(self, tmp_data_dir: Path) -> None: + il.log_interaction( + channel="teams_group", + direction="inbound", + sender="brandon@x.com", + summary="match", + metadata={"chat_id": "19:A@thread.v2"}, + ) + il.log_interaction( + channel="teams_group", + direction="outbound", + sender="agent", + recipient="19:A@thread.v2", + summary="wrong direction", + ) + il.log_interaction( + channel="teams_group", + direction="inbound", + sender="bob@x.com", + summary="wrong sender", + metadata={"chat_id": "19:A@thread.v2"}, + ) + il.log_interaction( + channel="teams_group", + direction="inbound", + sender="brandon@x.com", + summary="wrong chat", + metadata={"chat_id": "19:B@thread.v2"}, + ) + results = read_interactions( + chat_id="19:A@thread.v2", + direction="inbound", + sender="brandon@x.com", + ) + assert len(results) == 1 + assert results[0]["summary"] == "match" + + def test_all_filters_plus_limit_plus_since(self, tmp_data_dir: Path) -> None: + now = datetime.now(UTC) + # 5 matching entries spread over 5 hours + for i in range(5): + _log_at( + now - timedelta(hours=i), + channel="teams_dm", + direction="outbound", + sender="agent", + recipient="19:C@unq.gbl.spaces", + summary=f"m{i}", + action="send_teams_message", + ) + # An older non-matching entry + _log_at( + now - timedelta(hours=10), + channel="teams_dm", + direction="inbound", + sender="u", + summary="no", + metadata={"chat_id": "19:C@unq.gbl.spaces"}, + ) + results = read_interactions( + chat_id="19:C@unq.gbl.spaces", + direction="outbound", + action="send_teams_message", + since=(now - timedelta(hours=3)).isoformat(), + limit=10, + ) + # since cuts at 3h → entries 0,1,2 (3h-old is at cutoff, excluded) + summaries = [e["summary"] for e in results] + assert summaries == ["m0", "m1", "m2"] + + +# --------------------------------------------------------------------------- +# Resilience +# --------------------------------------------------------------------------- +class TestResilience: + def test_missing_day_file_is_handled_gracefully(self, tmp_data_dir: Path) -> None: + """No entries today; only one yesterday's file — must not crash.""" + now = datetime.now(UTC) + _log_at( + now - timedelta(hours=18), + channel="terminal", + direction="inbound", + sender="u", + summary="y", + ) + # Today's file may or may not exist depending on now; the only + # invariant we need is that read_interactions doesn't choke on a + # missing file. + results = read_interactions() + assert any(e["summary"] == "y" for e in results) + + def test_corrupt_line_is_skipped(self, tmp_data_dir: Path) -> None: + il.log_interaction(channel="terminal", direction="inbound", sender="u", summary="good") + day = datetime.now(UTC).strftime("%Y-%m-%d") + log_file = tmp_data_dir / "interactions" / f"{day}.jsonl" + with open(log_file, "a") as fh: + fh.write("not-json\n") + il.log_interaction(channel="terminal", direction="inbound", sender="u", summary="after") + results = read_interactions() + summaries = [e["summary"] for e in results] + assert "good" in summaries + assert "after" in summaries + assert "not-json" not in str(results)