From 6061be1a4b5b82743b248b4b25e235b46eeb0776 Mon Sep 17 00:00:00 2001 From: Evan Alferez Date: Wed, 27 May 2026 09:45:37 +0900 Subject: [PATCH] fix: daily summary day, persona pull mtime, and stdio task shutdown Target the previous UTC day when the 5pm PDT scheduler fires and skip re-send when the archive sidecar already exists. PersonaBackend.pull_all preserves offline-local files that are newer than cloud. Cancel tracked background poll tasks when the MCP stdio session disconnects. --- TODOS.md | 12 ++--- docs/engineering-status.md | 7 +-- src/entraclaw/mcp_server.py | 68 ++++++++++++++++++++++++---- src/entraclaw/storage/backend.py | 9 ++++ src/entraclaw/storage/blob.py | 14 ++++++ src/entraclaw/storage/persona.py | 20 ++++++-- src/entraclaw/tools/daily_summary.py | 16 +++++++ tests/storage/test_persona.py | 18 +++++++- tests/test_background_shutdown.py | 39 ++++++++++++++++ tests/tools/test_daily_summary.py | 27 +++++++++++ 10 files changed, 206 insertions(+), 24 deletions(-) create mode 100644 tests/test_background_shutdown.py diff --git a/TODOS.md b/TODOS.md index 9acb1fd..714e549 100644 --- a/TODOS.md +++ b/TODOS.md @@ -40,23 +40,23 @@ Fix: make the `tmp_data_dir` fixture (and any sibling fixture that patches confi - **Effort:** S (~30 LOC — fixture edit + audit) - **Source:** Phase 6a review 2026-04-17; failure is pre-existing on main, not introduced by Phase 6a -### PersonaBackend.pull_all() missing mtime-newer-local check (Phase 6d scope) +### ~~PersonaBackend.pull_all() missing mtime-newer-local check (Phase 6d scope)~~ ✅ DONE `src/entraclaw/storage/persona.py` `pull_all()` currently overwrites local files unconditionally — cloud is authoritative on pull. The persona-persistence plan §4.2 specified: "If local is newer (happens if session was offline), leave it (to be pushed next)." Phase 6a shipped without that check for the safe-starting-point framing, but it's a race-loss risk: if a session writes a memory file offline, the next online session's SessionStart pull will clobber it before the PostToolUse-Write push fires. The mitigation of this was planned for Phase 6d (ETag-based conflict resolution) but the simple mtime check should land sooner. Fix: compare local file mtime vs blob's last-modified on pull, skip overwrite if local is newer, add to `PersonaReport` a new `skipped_local_newer` counter. Test: pytest fixture with a local file newer than the (fake) blob's content → pull_all must leave it. -- **Effort:** XS (~20 LOC + 2 tests) +- **Shipped:** mtime compare via `key_mtime()` on Local/Blob backends (PR pending). - **Depends on:** Phase 6a (`1514dcd`, shipped) - **Source:** Phase 6a review 2026-04-17; plan §4.2 said we'd do this, Phase 6a deferred -### MCP server orphans when Claude Code exits +### ~~MCP server orphans when Claude Code exits~~ ✅ DONE (partial) Observed twice: when the parent Claude process exits, the `entraclaw-mcp` child keeps running. The new Claude session spawns a *second* MCP server, and both servers poll Graph independently — causing dual interaction-log writes (observed 2026-04-17: local log 54 lines vs blob log 19 lines on the same UTC day) and dual channel-push attempts. Root cause: `_background_poll_teams`, `_background_poll_email`, `_background_discover_chats`, and `_background_daily_summary` are spawned as top-level asyncio tasks inside `_initialize()`. They sit outside FastMCP's lifespan cancel scope, so when stdin closes and FastMCP's stdio read loop exits, the polling tasks keep the event loop alive and the process never terminates. Fixes in priority order: (a) spawn background tasks inside FastMCP's lifespan context manager so shutdown cancels them, (b) explicitly watch stdin for EOF in `_initialize` and cancel the task group, or (c) have polling tasks poll a shared shutdown event that FastMCP's stop hook sets. Workaround until fixed: manually `kill ` old `entraclaw-mcp` processes. -- **Effort:** S (~40 LOC + test that proves stdin-EOF cancels polls) +- **Shipped:** `_shutdown_background_tasks()` cancels tracked poll tasks when stdio disconnects (PR pending). Process singleton flock (issue #62) also prevents duplicate live servers on macOS/Linux. - **Source:** Live observation 2026-04-17 (second occurrence in one day) -### Daily summary scheduler: wrong day + double-fire +### ~~Daily summary scheduler: wrong day + double-fire~~ ✅ DONE Two bugs, both observed at 2026-04-17T17:00:00 PDT (= 00:00:01 UTC 2026-04-18): 1. `_run_daily_summary_internal` defaults `target_day = datetime.now(UTC).strftime("%Y-%m-%d")`. At 5pm PDT the UTC clock is already past midnight, so the scheduler summarizes the brand-new UTC day (empty) instead of the one that just ended. Fix: when called from the scheduler, target `now_utc - 1 day` — or compute the "just-ended PDT day" explicitly. 2. Scheduler fired twice at the same second — two summary emails arrived simultaneously (one for 2026-04-17, one for 2026-04-18). Suggests either a boot-time catch-up colliding with the scheduled tick or a loop that doesn't gate on "already sent today." Inspect `_background_daily_summary` for idempotency + single-fire semantics. -- **Effort:** S (~30 LOC + tests for both) +- **Shipped:** `scheduled_summary_day()` + `summary_already_sent()` gate in the scheduler (PR pending). - **Source:** Live observation 2026-04-17 evening (first real scheduled fire) ### Email cursor sub-second precision diff --git a/docs/engineering-status.md b/docs/engineering-status.md index 7970362..230464e 100644 --- a/docs/engineering-status.md +++ b/docs/engineering-status.md @@ -1,6 +1,6 @@ # Engineering Status -**Last updated:** 2026-05-21 +**Last updated:** 2026-05-27 **Status:** v1 released. Three auth modes (Agent User / Delegated / Bot Gateway) running locally on macOS, Linux, and ARM64 Windows 11. **1,237 tests** across the suite, ruff clean. Body-first prompt architecture loads at boot; persona-sati MCP wires personality and memory when configured. ADR-005 cloud-memory Phases 1, 2, 5, 6a shipped — blob storage is opt-in via `setup.sh --use-cloud-memory`. Work IQ Word migration landed (PR #75) and the `send_teams_message` auto-wait pattern is host-gated and deterministic. README, docs site, and GitHub Pages auto-deploy refreshed 2026-05-21. --- @@ -9,11 +9,8 @@ Source of truth for detail: `TODOS.md` in the repository root. One line each below. +- **Test isolation: blob env leakage** — partial fix in test fixtures; session-scoped autouse fixture still open. - **Script-toolkit docs closeout** — `./status.sh` is the canonical entry; finish the remaining script-reference polish and smoke verification. See `TODOS.md` P1. -- **Test isolation: blob env leakage** — `tmp_data_dir` fixture in `tests/tools/test_interaction_log.py` doesn't clear `ENTRACLAW_BLOB_ENDPOINT`; 10 tests fail on any machine with blob env configured. -- **MCP server orphans on Claude Code exit** — background poll tasks sit outside FastMCP's lifespan cancel scope; new sessions spawn a second server, both poll Graph independently. -- **Daily summary scheduler — wrong day + double-fire** — UTC-based `target_day` summarizes the brand-new UTC day at 5pm PDT; scheduler fired twice at the same second on 2026-04-17. -- **Email cursor sub-second precision** — cursor file at second precision; an email at the cursor's exact second gets re-pushed once on every server restart. ## Recently Shipped diff --git a/src/entraclaw/mcp_server.py b/src/entraclaw/mcp_server.py index fb53758..978563e 100644 --- a/src/entraclaw/mcp_server.py +++ b/src/entraclaw/mcp_server.py @@ -754,7 +754,9 @@ async def _init_poll() -> None: if config and config.mode == "bot": import asyncio - _state["poll_task"] = asyncio.get_event_loop().create_task(_background_poll_bot()) + _state["poll_task"] = _track_background_task( + asyncio.get_event_loop().create_task(_background_poll_bot()) + ) elif _state.get("watched_chats"): _ensure_poll_task_running() @@ -764,10 +766,12 @@ async def _init_poll() -> None: if _identity and _identity.session and _identity.session.auth_mode == "agent_user": import asyncio - asyncio.get_event_loop().create_task(_background_poll_email()) - asyncio.get_event_loop().create_task(_background_daily_summary()) - asyncio.get_event_loop().create_task(_background_discover_chats()) - asyncio.get_event_loop().create_task(_background_persona_sati_heartbeat()) + _track_background_task(asyncio.get_event_loop().create_task(_background_poll_email())) + _track_background_task(asyncio.get_event_loop().create_task(_background_daily_summary())) + _track_background_task(asyncio.get_event_loop().create_task(_background_discover_chats())) + _track_background_task( + asyncio.get_event_loop().create_task(_background_persona_sati_heartbeat()) + ) async def _initialize() -> None: @@ -808,6 +812,37 @@ async def _initialize() -> None: PERSONA_SATI_HEARTBEAT_INTERVAL = 300 # 5 min smoke test against persona-sati +def _track_background_task(task) -> object: + """Register *task* so stdio shutdown can cancel background polls.""" + tasks = _state.setdefault("background_tasks", []) + tasks.append(task) + return task + + +async def _shutdown_background_tasks() -> None: + """Cancel every background poll/scheduler task on stdio disconnect.""" + import asyncio + + tasks: list[asyncio.Task] = [] + poll_task = _state.get("poll_task") + if poll_task is not None: + tasks.append(poll_task) + tasks.extend(_state.get("background_tasks", [])) + seen: set[int] = set() + unique: list[asyncio.Task] = [] + for task in tasks: + if id(task) in seen: + continue + seen.add(id(task)) + unique.append(task) + for task in unique: + task.cancel() + if unique: + await asyncio.gather(*unique, return_exceptions=True) + _state["background_tasks"] = [] + _state.pop("poll_task", None) + + async def _persona_sati_list_files(url: str, token: str) -> list[str]: """Call persona-sati's ``list_memory_files`` tool via SSE. @@ -1001,7 +1036,9 @@ def _ensure_poll_task_running() -> None: import asyncio - _state["poll_task"] = asyncio.get_event_loop().create_task(_background_poll()) + _state["poll_task"] = _track_background_task( + asyncio.get_event_loop().create_task(_background_poll()) + ) if logger: logger.info("Started background Teams poll task") @@ -1515,7 +1552,11 @@ async def _background_daily_summary() -> None: """Wake at 5pm PDT each day and send the daily summary.""" import asyncio - from entraclaw.tools.daily_summary import next_run_at + from entraclaw.tools.daily_summary import ( + next_run_at, + scheduled_summary_day, + summary_already_sent, + ) if logger: logger.info("Starting daily summary scheduler") @@ -1538,10 +1579,20 @@ async def _background_daily_summary() -> None: ): continue - result = await _run_daily_summary_internal(send=True) + target_day = scheduled_summary_day(now=datetime.now(UTC)) + if summary_already_sent(target_day): + if logger: + logger.info("Daily summary for %s already sent; skipping", target_day) + continue + + result = await _run_daily_summary_internal(day=target_day, send=True) if logger: logger.info("Daily summary sent: %s", result) + except asyncio.CancelledError: + if logger: + logger.info("Daily summary scheduler cancelled") + raise except Exception as exc: if logger: logger.warning("Daily summary scheduler error: %s", exc) @@ -3403,6 +3454,7 @@ async def _eager_init() -> None: ) finally: init_task.cancel() + await _shutdown_background_tasks() @mcp.tool() diff --git a/src/entraclaw/storage/backend.py b/src/entraclaw/storage/backend.py index cc56362..6b2f92f 100644 --- a/src/entraclaw/storage/backend.py +++ b/src/entraclaw/storage/backend.py @@ -103,6 +103,12 @@ def list(self, prefix: str = "") -> list[str]: results.append(rel) return results + def key_mtime(self, key: str) -> float | None: + p = self._path(key) + if not p.exists(): + return None + return p.stat().st_mtime + # --------------------------------------------------------------------------- # BlobBackend @@ -155,6 +161,9 @@ def exists(self, key: str) -> bool: def list(self, prefix: str = "") -> list[str]: return list(_run_sync(self._store.list(prefix))) + def key_mtime(self, key: str) -> float | None: + return _run_sync(self._store.last_modified(key)) + # --------------------------------------------------------------------------- # Factory diff --git a/src/entraclaw/storage/blob.py b/src/entraclaw/storage/blob.py index c6d11e1..b7f457c 100644 --- a/src/entraclaw/storage/blob.py +++ b/src/entraclaw/storage/blob.py @@ -11,6 +11,7 @@ import re from collections.abc import Callable +from email.utils import parsedate_to_datetime import httpx @@ -114,6 +115,19 @@ async def exists(self, path: str) -> bool: resp.raise_for_status() return True + async def last_modified(self, path: str) -> float | None: + """Return blob Last-Modified as Unix seconds, or None if missing.""" + async with httpx.AsyncClient() as client: + resp = await client.head(self._url(path), headers=self._headers()) + _check_auth(resp) + if resp.status_code == 404: + return None + resp.raise_for_status() + header = resp.headers.get("Last-Modified") + if not header: + return None + return parsedate_to_datetime(header).timestamp() + async def list(self, prefix: str = "") -> list[str]: """Return blob names in the container under *prefix*.""" params = { diff --git a/src/entraclaw/storage/persona.py b/src/entraclaw/storage/persona.py index cf6cf62..3a7af40 100644 --- a/src/entraclaw/storage/persona.py +++ b/src/entraclaw/storage/persona.py @@ -52,6 +52,7 @@ class PersonaReport: copied: int = 0 skipped: int = 0 pulled: int = 0 + skipped_local_newer: int = 0 errors: list[tuple[str, str]] = field(default_factory=list) keys: list[str] = field(default_factory=list) @@ -118,20 +119,33 @@ def push_all(self) -> PersonaReport: def pull_all(self) -> PersonaReport: """Download every blob under ``claude_memory/`` into ``local_root``. - Cloud is authoritative on pull — local files are overwritten. - The directory (and parents) are created if missing. + Cloud is authoritative on pull unless a local file is newer than + its cloud counterpart (offline session wrote ahead of the last + pull). The directory (and parents) are created if missing. """ report = PersonaReport() for key in self._backend.list(self.prefix): if not key.startswith(self.prefix): continue rel = key[len(self.prefix) :] + dst = self._root / rel + if dst.exists() and dst.is_file(): + cloud_mtime = self._cloud_mtime(key) + if cloud_mtime is not None and dst.stat().st_mtime > cloud_mtime: + report.skipped_local_newer += 1 + continue content = self._backend.read_text(key) if content is None: continue - dst = self._root / rel dst.parent.mkdir(parents=True, exist_ok=True) dst.write_text(content) report.pulled += 1 report.keys.append(key) return report + + def _cloud_mtime(self, key: str) -> float | None: + """Return cloud key mtime as Unix seconds, or None if unavailable.""" + mtime_fn = getattr(self._backend, "key_mtime", None) + if mtime_fn is None: + return None + return mtime_fn(key) diff --git a/src/entraclaw/tools/daily_summary.py b/src/entraclaw/tools/daily_summary.py index 8b642a3..547f978 100644 --- a/src/entraclaw/tools/daily_summary.py +++ b/src/entraclaw/tools/daily_summary.py @@ -215,3 +215,19 @@ def next_run_at(*, now: datetime, hour_pdt: int = 17) -> datetime: if pdt_now >= trigger: trigger = trigger + timedelta(days=1) return (trigger + PDT_OFFSET).replace(tzinfo=UTC) + + +def scheduled_summary_day(*, now: datetime) -> str: + """Return the UTC day label to summarize when the 5pm PDT job fires. + + At 5pm PDT the UTC calendar has already rolled forward, so naive + ``datetime.now(UTC).strftime('%Y-%m-%d')`` targets an empty new UTC + day instead of the one that just accumulated activity. + """ + return (now - timedelta(days=1)).strftime("%Y-%m-%d") + + +def summary_already_sent(day: str) -> bool: + """Return True when the archived sidecar for *day* already exists.""" + backend = get_backend() + return backend.exists(f"summaries/{day}.json") diff --git a/tests/storage/test_persona.py b/tests/storage/test_persona.py index bfdc9e3..cf88217 100644 --- a/tests/storage/test_persona.py +++ b/tests/storage/test_persona.py @@ -146,18 +146,32 @@ def test_downloads_every_claude_memory_key(self, tmp_path: Path) -> None: assert report.pulled == 3 def test_pull_all_overwrites_local_with_cloud(self, tmp_path: Path) -> None: - # Cloud is authoritative on pull + # Cloud is authoritative on pull when the blob is newer than local. backend = LocalBackend(tmp_path / "blob") - backend.write_text("claude_memory/MEMORY.md", "CLOUD") mem_dir = tmp_path / "memory" mem_dir.mkdir() (mem_dir / "MEMORY.md").write_text("stale local") + backend.write_text("claude_memory/MEMORY.md", "CLOUD") persona = PersonaBackend(backend, local_root=mem_dir) persona.pull_all() assert (mem_dir / "MEMORY.md").read_text() == "CLOUD" + def test_pull_all_skips_when_local_newer_than_cloud(self, tmp_path: Path) -> None: + backend = LocalBackend(tmp_path / "blob") + backend.write_text("claude_memory/MEMORY.md", "CLOUD") + mem_dir = tmp_path / "memory" + mem_dir.mkdir() + (mem_dir / "MEMORY.md").write_text("offline edit") + + persona = PersonaBackend(backend, local_root=mem_dir) + report = persona.pull_all() + + assert (mem_dir / "MEMORY.md").read_text() == "offline edit" + assert report.pulled == 0 + assert report.skipped_local_newer == 1 + def test_pull_all_when_cloud_empty_returns_empty(self, tmp_path: Path) -> None: backend = LocalBackend(tmp_path / "blob") persona = PersonaBackend(backend, local_root=tmp_path / "mem") diff --git a/tests/test_background_shutdown.py b/tests/test_background_shutdown.py new file mode 100644 index 0000000..0b266b1 --- /dev/null +++ b/tests/test_background_shutdown.py @@ -0,0 +1,39 @@ +"""Tests for background-task shutdown on stdio disconnect.""" + +from __future__ import annotations + +import asyncio + +import pytest + + +@pytest.mark.asyncio +async def test_shutdown_background_tasks_cancels_tracked_polls() -> None: + from entraclaw import mcp_server + + started = asyncio.Event() + cancelled = asyncio.Event() + + async def fake_poll() -> None: + started.set() + try: + await asyncio.sleep(3600) + except asyncio.CancelledError: + cancelled.set() + raise + + old_state = mcp_server._state.copy() + try: + task = asyncio.create_task(fake_poll()) + mcp_server._state["background_tasks"] = [task] + mcp_server._state["poll_task"] = task + + await asyncio.wait_for(started.wait(), timeout=1) + await mcp_server._shutdown_background_tasks() + + await asyncio.wait_for(cancelled.wait(), timeout=1) + assert mcp_server._state.get("background_tasks") == [] + assert "poll_task" not in mcp_server._state + finally: + mcp_server._state.clear() + mcp_server._state.update(old_state) diff --git a/tests/tools/test_daily_summary.py b/tests/tools/test_daily_summary.py index 9c993e8..0302a56 100644 --- a/tests/tools/test_daily_summary.py +++ b/tests/tools/test_daily_summary.py @@ -29,7 +29,9 @@ archive_summary, next_run_at, render_summary_html, + scheduled_summary_day, send_summary_email, + summary_already_sent, triage_interactions, ) @@ -375,3 +377,28 @@ def test_exactly_5pm_pdt_returns_tomorrow(self) -> None: now = datetime(2026, 4, 17, 0, 0, tzinfo=UTC) nxt = next_run_at(now=now) assert nxt == datetime(2026, 4, 18, 0, 0, tzinfo=UTC) + + +class TestScheduledSummaryDay: + def test_at_5pm_pdt_uses_previous_utc_day(self) -> None: + # 5pm PDT on 2026-04-16 = 2026-04-17 00:00 UTC — summarize 2026-04-16. + now = datetime(2026, 4, 17, 0, 0, tzinfo=UTC) + assert scheduled_summary_day(now=now) == "2026-04-16" + + def test_before_5pm_pdt_still_uses_previous_utc_day_at_trigger(self) -> None: + # Scheduler fires at next_run_at(); at that instant UTC has rolled. + trigger = datetime(2026, 4, 17, 0, 0, tzinfo=UTC) + assert scheduled_summary_day(now=trigger) == "2026-04-16" + + +class TestSummaryAlreadySent: + def test_false_when_sidecar_missing(self, tmp_data_dir: Path) -> None: + assert summary_already_sent("2026-04-16") is False + + def test_true_when_sidecar_exists(self, tmp_data_dir: Path) -> None: + archive_summary( + day="2026-04-16", + html="

hi

", + buckets={"needs_you": [], "handled": [], "heads_up": []}, + ) + assert summary_already_sent("2026-04-16") is True