From 1a6ef76eb21648490b789e2501b5e916c36bca27 Mon Sep 17 00:00:00 2001 From: Ramon Bartl Date: Tue, 9 Jun 2026 23:37:11 +0200 Subject: [PATCH] Refactor and tighten the cloud-sync broadcast plumbing surfaced by the post-feature audit Five small wins from the same review pass: 1. Single BROADCAST_RESOURCES constant in kaisho/cron/scheduler.py replaces the three previous listings of ('clocks', 'inbox', 'kanban', 'notes'). Drift in any one of them silently regresses the kanban-vs-tasks no-op trap; one source of truth makes that trap permanent. 2. Drop the WS pending-resources set and the _drain_and_broadcast_pending function. The poller's _broadcast_sync_changes now fires every cycle (after PR #162 removed the pulled+deleted==0 gate), so the WS pending-set was doing the same job twice. Removing it keeps every WS-triggered sync to one broadcast pass and trims ~40 lines plus the module-level set. 3. _broadcast_sync_changes wraps each broadcast in its own try/except so one failing resource doesn't abort the rest of the loop. The drain function used to handle failures this way; aligning the surviving function matches the policy and lights up partial failures in the warning log. 4. APScheduler's BackgroundScheduler is tz-aware; the naive datetime.now() previously passed to next_run_time emitted a deprecation warning on every boot and outright refused to schedule on some tzlocal versions. Use datetime.now(_scheduler.timezone) so the kick-off datetime always matches. 5. The board snooze filter and the deadline badge computed today via toISOString().slice(0, 10), which is UTC. A user in PDT (UTC-7) at 22:00 local sees the next day's UTC date, so their snooze expires a day late and their deadline badge fires off by one. Use toLocaleDateString('en-CA') which produces canonical YYYY-MM-DD in *local* time. Tests in tests/test_cloud_ws_broadcast.py reshaped to match the simpler post-refactor contract: events trigger _schedule_ws_sync, the debounced sync calls _run_cloud_sync, _broadcast_sync_changes blanket-broadcasts BROADCAST_RESOURCES and survives a single-resource failure. 699 tests pass; flake8 clean. --- CHANGELOG.md | 18 ++ .../src/components/kanban/KanbanBoard.tsx | 9 +- .../src/components/kanban/TaskDateBadges.tsx | 7 +- kaisho/cron/scheduler.py | 138 ++++++-------- tests/test_cloud_ws_broadcast.py | 168 +++++++++--------- 5 files changed, 172 insertions(+), 168 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9c174ba3..f9c6a2cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,24 @@ ## Unreleased +- Refactor the cloud-WS broadcast plumbing from the + post-feature audit. Single `BROADCAST_RESOURCES` + constant replaces three duplicate listings of + `("clocks", "inbox", "kanban", "notes")`. Drop the WS + pending-resources set + `_drain_and_broadcast_pending` + since `_broadcast_sync_changes` now fires + unconditionally inside `_run_cloud_sync` — two + mechanisms for the same effect collapsed to one. + Per-iteration `try/except` in the broadcast loop so + one failing resource doesn't abort the rest. Pass the + scheduler's own timezone to `next_run_time` so + APScheduler stops emitting the naive-datetime + warning. And the snooze filter + deadline badge use + `toLocaleDateString("en-CA")` instead of + `toISOString().slice(0, 10)` so users in negative-UTC + timezones no longer see their snoozes expire / + deadlines fire a day late. + - Fix three correctness regressions from the scheduling + sync work. (1) A cloud-pull update for a task whose wire payload omits `scheduled`/`deadline` diff --git a/frontend/src/components/kanban/KanbanBoard.tsx b/frontend/src/components/kanban/KanbanBoard.tsx index 9e9adb89..bc73fda7 100644 --- a/frontend/src/components/kanban/KanbanBoard.tsx +++ b/frontend/src/components/kanban/KanbanBoard.tsx @@ -356,7 +356,14 @@ export function KanbanBoard() { // Hide tasks whose ``scheduled`` date is still in the // future — they reappear on the day. The count is shown // in a toolbar pill so the user can still see them. - const todayStr = new Date().toISOString().slice(0, 10); + // + // ``toISOString()`` returns UTC, which trips negative-UTC + // timezones: a user in PDT (UTC−7) at 22:00 local sees + // the next day's UTC date and their snooze "expires" a + // day late. ``en-CA`` produces the canonical + // ``YYYY-MM-DD`` shape in *local* time, which is what + // we actually want. + const todayStr = new Date().toLocaleDateString("en-CA"); const snoozed = rawTasks.filter( (t) => t.scheduled && t.scheduled > todayStr, ); diff --git a/frontend/src/components/kanban/TaskDateBadges.tsx b/frontend/src/components/kanban/TaskDateBadges.tsx index 7a3ffff6..71147d54 100644 --- a/frontend/src/components/kanban/TaskDateBadges.tsx +++ b/frontend/src/components/kanban/TaskDateBadges.tsx @@ -36,7 +36,12 @@ const DEADLINE_ACK_EVENT = "kaisho:deadline-acked"; const DEADLINE_URGENCY_DAYS = 3; function todayIso(): string { - return new Date().toISOString().slice(0, 10); + // Local-time ``YYYY-MM-DD``. ``toISOString`` gives UTC, + // which trips negative-UTC timezones: a user in PDT + // (UTC−7) at 22:00 local sees the next day's UTC date + // and their scheduled badge surfaces / their deadline + // badge fires a day off. + return new Date().toLocaleDateString("en-CA"); } /** Inclusive day-difference between two ``YYYY-MM-DD`` diff --git a/kaisho/cron/scheduler.py b/kaisho/cron/scheduler.py index 6816c420..2f4a253b 100644 --- a/kaisho/cron/scheduler.py +++ b/kaisho/cron/scheduler.py @@ -211,35 +211,31 @@ def sync_backup_job() -> None: _ws_sync_pending = False _ws_sync_lock = threading.Lock() -# Resources whose React Query cache must be invalidated -# after the next WS-triggered sync completes. Accumulated -# under ``_ws_sync_lock`` so concurrent cloud-WS events -# can stack their refresh hints without races. +# Frontend resource keys the React side knows how to +# invalidate (see ``frontend/src/hooks/useWebSocket.ts`` +# ``RESOURCE_TO_QUERY``). The tasks query is routed via +# the ``kanban`` key there, so ``tasks`` (the obvious +# name) would be a silent no-op — that exact trap got +# missed for months, hence the comment-and-constant. +# Single source of truth so the periodic-poller broadcast +# and any future per-resource trigger stay in lock-step. +BROADCAST_RESOURCES = ("clocks", "inbox", "kanban", "notes") + +# Cloud-WS event names that warrant a debounced sync. +# Mapped to the resource purely for documentation / for +# any consumer that needs the affinity; the broadcast +# itself is now blanket via ``BROADCAST_RESOURCES`` so +# every WS-triggered cycle refreshes everything that +# matters. # -# Drained inside ``_debounced_sync`` once the sync cycle -# has actually written the new rows to local SQL — the -# order matters: broadcasting before the pull lands would -# cause React Query to refetch the still-stale cache. -_ws_pending_resources: set[str] = set() - -# Map cloud-WS event names onto the ``resource`` keys the -# desktop frontend understands in -# ``frontend/src/hooks/useWebSocket.ts``. ``tasks:changed`` -# maps to ``kanban`` because RESOURCE_TO_QUERY only routes -# the ``kanban`` key to the tasks React Query. +# ``timer:started`` covers two cases that both need a +# pull: a brand-new timer started on another device, and +# a paused entry resumed on another device (the cloud +# emits ``timer:started`` for resume because the entry's +# end is cleared). _WS_EVENT_TO_RESOURCE = { "entries:changed": "clocks", "entries:deleted": "clocks", - # ``timer:started`` covers two cases that both need a - # pull: a brand-new timer started on another device, - # and a paused entry resumed on another device (the - # cloud emits ``timer:started`` for resume because the - # entry's end is cleared). Without the pull, the - # immediate ``clocks`` broadcast fires above but the - # frontend refetches local data that is still in the - # pre-resume / pre-start state and the running-timer - # card stays empty until the 5-minute poller catches - # up. "timer:started": "clocks", "timer:stopped": "clocks", "inbox:changed": "inbox", @@ -248,43 +244,15 @@ def sync_backup_job() -> None: } -def _drain_and_broadcast_pending() -> None: - """Broadcast a refresh hint for each resource that had - cloud-WS activity during the debounce window. - - Called from ``_debounced_sync`` on the success path so - the local SQL has the new rows by the time the frontend - invalidates its queries and refetches. - """ - with _ws_sync_lock: - resources = list(_ws_pending_resources) - _ws_pending_resources.clear() - if not resources: - return - from ..api.ws.manager import broadcast_sync - for resource in resources: - try: - broadcast_sync({ - "resource": resource, - "type": "cloud:refresh", - }) - except Exception: # noqa: BLE001 - _ws_log.warning( - "Failed to broadcast %s refresh", - resource, exc_info=True, - ) - - def _debounced_sync() -> None: """Run a sync if one is pending, with dedup. Waits 2 seconds to coalesce rapid events into a - single sync cycle. Logs errors instead of swallowing. - - On success, drains the pending-resources set and - broadcasts a refresh hint per resource so React Query - invalidates its cache after the new rows have been - written to local SQL. + single sync cycle, then calls ``_run_cloud_sync`` — + which in turn fires ``_broadcast_sync_changes`` on + success so the frontend's React Query cache lands + *after* the new rows are in local SQL. Errors are + logged, not raised. """ global _ws_sync_pending time.sleep(2) @@ -298,8 +266,6 @@ def _debounced_sync() -> None: _ws_log.warning( "WS-triggered sync failed", exc_info=True, ) - return - _drain_and_broadcast_pending() def _schedule_ws_sync() -> None: @@ -325,15 +291,20 @@ def _on_cloud_ws_event( ) -> None: """Handle real-time events from the cloud WebSocket. - Timer events are broadcast to the local WebSocket for - instant UI updates. Data-change events record which - resource needs refreshing and schedule a debounced - sync cycle; the broadcast for those fires once the - sync has actually pulled the new rows. + Timer events fire an immediate ``clocks`` broadcast so + the running-timer card flashes the new state without + waiting for the 2-second sync debounce — the data + might still be stale when the frontend refetches, but + the visual cue is worth it. + + Any event in ``_WS_EVENT_TO_RESOURCE`` schedules a + debounced sync; the eventual ``_run_cloud_sync`` call + will broadcast all resources via + ``_broadcast_sync_changes`` once the pull lands. """ _ws_log.info("Cloud WS event: %s", event) - # Timer events: broadcast to desktop frontend + # Timer events: immediate broadcast for instant UI cue if event in ("timer:started", "timer:stopped"): try: from ..api.ws.manager import broadcast_sync @@ -348,13 +319,10 @@ def _on_cloud_ws_event( exc_info=True, ) - # Data changes: record the resource and schedule a - # debounced background sync. The broadcast happens - # in ``_debounced_sync`` after the pull lands. - resource = _WS_EVENT_TO_RESOURCE.get(event) - if resource is not None: - with _ws_sync_lock: - _ws_pending_resources.add(resource) + # Data changes trigger a debounced sync; the broadcast + # piggybacks on ``_broadcast_sync_changes`` once the + # cycle writes new rows to local SQL. + if event in _WS_EVENT_TO_RESOURCE: _schedule_ws_sync() @@ -413,19 +381,17 @@ def _broadcast_sync_changes(result: dict) -> None: cycle the gate quietly swallowed. """ from ..api.ws.manager import broadcast_sync - try: - for resource in ( - "clocks", "inbox", "kanban", "notes", - ): + for resource in BROADCAST_RESOURCES: + try: broadcast_sync({ "resource": resource, "type": "sync:updated", }) - except Exception: # noqa: BLE001 - _ws_log.warning( - "Failed to broadcast sync changes", - exc_info=True, - ) + except Exception: # noqa: BLE001 + _ws_log.warning( + "Failed to broadcast %s sync", + resource, exc_info=True, + ) def _run_cloud_sync() -> None: @@ -536,11 +502,17 @@ def build_scheduler(jobs_file: Path) -> BackgroundScheduler: # device while the desktop was offline). The cloud # WebSocket only delivers events from the moment it # connects, so it cannot fill that gap on its own. + # + # APScheduler's BackgroundScheduler is tz-aware; + # feeding it a naive ``datetime.now()`` triggers + # ``PytzUsageWarning`` and, on some platforms, refuses + # to schedule. Use the scheduler's own timezone so the + # kick-off datetime always matches. _scheduler.add_job( _run_cloud_sync, "interval", minutes=5, - next_run_time=datetime.now(), + next_run_time=datetime.now(_scheduler.timezone), id="__cloud_sync__", name="Cloud Sync", replace_existing=True, diff --git a/tests/test_cloud_ws_broadcast.py b/tests/test_cloud_ws_broadcast.py index 42c2e6c2..e3414889 100644 --- a/tests/test_cloud_ws_broadcast.py +++ b/tests/test_cloud_ws_broadcast.py @@ -4,10 +4,12 @@ cloud scheduled a background sync but never told the local React frontend to invalidate its queries, so the row kept rendering from the existing cache until the app was -restarted. +restarted. Plus the v2.5.2 audit cleanup that removed the +redundant pending-set in favour of ``_broadcast_sync_changes`` +firing for every cycle. These tests stub out the actual sync so we exercise only the -event-routing / drain-and-broadcast wiring. +event-routing / broadcast wiring. """ import threading @@ -18,12 +20,10 @@ @pytest.fixture(autouse=True) def _reset_pending_state(): - """Clear module-level WS state between tests so order + """Reset the debounce flag between tests so order doesn't matter.""" - scheduler._ws_pending_resources.clear() scheduler._ws_sync_pending = False yield - scheduler._ws_pending_resources.clear() scheduler._ws_sync_pending = False @@ -40,28 +40,27 @@ def fake_broadcast(message): return captured -def test_entries_changed_records_pending_clocks(monkeypatch): - """``entries:changed`` must accumulate ``clocks`` in the - pending set so the post-sync drain broadcasts it.""" - # Block _schedule_ws_sync from actually spawning the - # debounce thread — we test the drain step separately. +def test_entries_changed_schedules_sync(monkeypatch): + """``entries:changed`` triggers a debounced sync.""" + called = [] monkeypatch.setattr( - scheduler, "_schedule_ws_sync", lambda: None, + scheduler, "_schedule_ws_sync", + lambda: called.append(True), ) scheduler._on_cloud_ws_event("entries:changed", {}) - assert "clocks" in scheduler._ws_pending_resources + assert called == [True] def test_timer_started_schedules_sync(monkeypatch): - """A ``timer:started`` from the cloud (e.g. iPhone - starting a brand-new timer or resuming a paused entry) - must schedule a sync. Without it, the immediate - ``clocks`` broadcast invalidates the local query and - the frontend refetches the stale pre-start state — the - running-timer card stays empty until the 5-minute - poller catches up.""" + """A ``timer:started`` from the cloud (iPhone starting + a brand-new timer or resuming a paused entry) must + schedule a sync. Without it, the desktop sees only + the immediate clocks broadcast and refetches stale + local state until the 5-minute poller catches up.""" + called = [] monkeypatch.setattr( - scheduler, "_schedule_ws_sync", lambda: None, + scheduler, "_schedule_ws_sync", + lambda: called.append(True), ) # The timer-event broadcast at the top of the handler # imports broadcast_sync; stub it so this test stays @@ -69,87 +68,69 @@ def test_timer_started_schedules_sync(monkeypatch): import kaisho.api.ws.manager as mgr monkeypatch.setattr(mgr, "broadcast_sync", lambda _: None) scheduler._on_cloud_ws_event("timer:started", {}) - assert "clocks" in scheduler._ws_pending_resources + assert called == [True] -def test_tasks_changed_maps_to_kanban(monkeypatch): - """``tasks:changed`` must map to ``kanban`` because the - frontend's RESOURCE_TO_QUERY only routes ``kanban`` to - the tasks React Query — broadcasting ``tasks`` would be - a no-op.""" +def test_tasks_changed_schedules_sync(monkeypatch): + """``tasks:changed`` must still route through the + event map even though the broadcast is now blanket + rather than per-resource.""" + called = [] monkeypatch.setattr( - scheduler, "_schedule_ws_sync", lambda: None, + scheduler, "_schedule_ws_sync", + lambda: called.append(True), ) scheduler._on_cloud_ws_event("tasks:changed", {}) - assert scheduler._ws_pending_resources == {"kanban"} + assert called == [True] -def test_drain_broadcasts_each_resource_once(monkeypatch): - """Multiple events of the same type during one debounce - window should result in a single broadcast per resource, - not one per event.""" - captured = _capture_broadcasts(monkeypatch) - scheduler._ws_pending_resources.update({"clocks", "inbox"}) - scheduler._drain_and_broadcast_pending() - resources = {m["resource"] for m in captured} - assert resources == {"clocks", "inbox"} - assert all(m["type"] == "cloud:refresh" for m in captured) - # The set is drained, so a second call is a no-op. - captured.clear() - scheduler._drain_and_broadcast_pending() - assert captured == [] - - -def test_drain_no_pending_does_not_broadcast(monkeypatch): - captured = _capture_broadcasts(monkeypatch) - scheduler._drain_and_broadcast_pending() - assert captured == [] +def test_unknown_event_does_not_schedule_sync(monkeypatch): + """Events the map doesn't know about are no-ops on the + sync-trigger path.""" + called = [] + monkeypatch.setattr( + scheduler, "_schedule_ws_sync", + lambda: called.append(True), + ) + scheduler._on_cloud_ws_event("noise:event", {}) + assert called == [] -def test_debounced_sync_broadcasts_after_pull(monkeypatch): - """End-to-end: a WS event schedules a sync, the sync - runs, and the broadcast lands AFTER the pull (so the - frontend refetches against fresh local SQL, not the - stale rows the gate would otherwise show).""" +def test_debounced_sync_runs_cloud_sync(monkeypatch): + """End-to-end: a WS event schedules a sync; the sync + runs. The broadcast itself is fired from inside + _run_cloud_sync → _broadcast_sync_changes, which has + its own test below.""" order = [] def fake_run_cloud_sync(): order.append("sync") - captured = _capture_broadcasts(monkeypatch) monkeypatch.setattr( scheduler, "_run_cloud_sync", fake_run_cloud_sync, ) monkeypatch.setattr(scheduler.time, "sleep", lambda _: None) - scheduler._on_cloud_ws_event("entries:changed", {}) - # The schedule started a real thread — wait for it. for t in threading.enumerate(): if t.name == "cloud-ws-sync": t.join(timeout=2) - order.append("broadcast" if captured else "missing") - assert order == ["sync", "broadcast"] - assert {m["resource"] for m in captured} == {"clocks"} + assert order == ["sync"] -def test_broadcast_sync_changes_uses_kanban_not_tasks( +def test_broadcast_sync_changes_covers_all_resources( monkeypatch, ): - """The poller's ``_broadcast_sync_changes`` must - broadcast ``resource: "kanban"`` for the tasks query. - The frontend's RESOURCE_TO_QUERY only routes the - ``kanban`` key to the tasks React Query, so the - previous ``tasks`` payload was a silent no-op.""" + """``_broadcast_sync_changes`` must broadcast every + resource in ``BROADCAST_RESOURCES`` -- the single + source of truth that prevents the kanban / tasks + silent-no-op trap from coming back.""" captured = _capture_broadcasts(monkeypatch) scheduler._broadcast_sync_changes( {"pulled_up": 1, "pulled_del": 0}, ) resources = {m["resource"] for m in captured} - assert "kanban" in resources, ( - "broadcast must use the 'kanban' resource key so " - "the frontend's RESOURCE_TO_QUERY actually " - "invalidates the tasks query" - ) + assert resources == set(scheduler.BROADCAST_RESOURCES) + assert "kanban" in resources assert "tasks" not in resources @@ -159,30 +140,52 @@ def test_broadcast_sync_changes_fires_when_counts_zero( """The old ``pulled+deleted == 0`` gate suppressed legitimate refreshes when the sync cycle returned zero counts (cursor races, push-lock contention, - partial-success cycles). The gate is gone: the - function must broadcast every cycle, trusting that an - occasional empty refetch is cheaper than the user - staring at stale data.""" + partial-success cycles). The function must broadcast + every cycle, trusting that an occasional empty + refetch is cheaper than the user staring at stale + data.""" captured = _capture_broadcasts(monkeypatch) scheduler._broadcast_sync_changes( {"pulled_up": 0, "pulled_del": 0}, ) resources = {m["resource"] for m in captured} - assert resources == { - "clocks", "inbox", "kanban", "notes", - } + assert resources == set(scheduler.BROADCAST_RESOURCES) + + +def test_broadcast_sync_changes_continues_on_single_failure( + monkeypatch, +): + """A broadcast that raises for one resource must not + abort the rest of the loop. Mirrors the per-iteration + try/except policy.""" + sent = [] + + def flaky_broadcast(message): + if message["resource"] == "clocks": + raise RuntimeError("flaky") + sent.append(message["resource"]) + + import kaisho.api.ws.manager as mgr + monkeypatch.setattr( + mgr, "broadcast_sync", flaky_broadcast, + ) + scheduler._broadcast_sync_changes({}) + # Every resource other than the one that raised + # still got a broadcast. + assert set(sent) == ( + set(scheduler.BROADCAST_RESOURCES) - {"clocks"} + ) def test_failed_sync_does_not_broadcast(monkeypatch): - """If the sync raises, the pending set must stay intact - so the next attempt still has the resource recorded — - and the frontend must NOT receive a refresh hint pointing - at stale local data.""" + """If ``_run_cloud_sync`` raises, the debounce wrapper + swallows it cleanly — no broadcast, but also no + crashed thread.""" + captured = _capture_broadcasts(monkeypatch) def boom(): raise RuntimeError("sync exploded") - captured = _capture_broadcasts(monkeypatch) monkeypatch.setattr(scheduler, "_run_cloud_sync", boom) monkeypatch.setattr(scheduler.time, "sleep", lambda _: None) @@ -191,4 +194,3 @@ def boom(): if t.name == "cloud-ws-sync": t.join(timeout=2) assert captured == [] - assert "clocks" in scheduler._ws_pending_resources