diff --git a/.coverage b/.coverage new file mode 100644 index 0000000..3dfa2b2 Binary files /dev/null and b/.coverage differ diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a7e59f1..f90a74c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -30,13 +30,13 @@ jobs: accept-flake-config = true - name: Run tests - run: nix develop . --command pytest -q + run: nix develop . --command pytest -q --cov=commutecompass --cov-report=term-missing --cov-fail-under=80 - name: Lint run: nix develop . --command ruff check . - name: Type check - run: nix develop . --command mypy src + run: nix develop . --command mypy src tests build: name: check, build, push diff --git a/AGENTS.md b/AGENTS.md index a8a494a..ccf7d32 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -106,8 +106,14 @@ commutecompass --config examples/config.toml tomorrow --dry-run 6. **Ping firing contract** - Use `store.claim_ping(id, now)` (atomic 0→1 transition) before sending a notification, not `mark_fired` after. This is the race - protection against overlapping poll cycles. Send failures are - logged but NOT retried — claim-and-send is one-shot by design. + protection against overlapping poll cycles. + - On send **failure** of an actionable ping (`prep`/`leave`), the poll + loop hands the row back with `store.release_ping(id)` (atomic 1→0, + bumps `send_attempts`) so a later poll re-fires it. Re-fire is bounded: + only within `_SEND_RETRY_GRACE_SECONDS` of the scheduled `fire_at` (a + stale alarm is worse than none) and only up to `_MAX_SEND_ATTEMPTS` + (a broken notifier can never storm). Other kinds, and pings past the + grace window or attempt cap, stay fired (give up) — never retried. 7. **OpenClaw stdout protocol** - `notify.StdoutNotifier` escapes any body line that exactly matches diff --git a/README.md b/README.md index 1dd090d..99646df 100644 --- a/README.md +++ b/README.md @@ -21,14 +21,18 @@ A self-hosted [Python](https://www.python.org/) service that pulls events from G - [`tomorrow`](./src/commutecompass/jobs/) — push tomorrow's earliest prep time to the configured HA script (pull-model alarm) - [`plan`](./src/commutecompass/planner.py) — replan a single event (debug) - [`digest-preview`](./src/commutecompass/cli.py) — print today's digest from cache without sending +- [`status`](./src/commutecompass/cli.py) — show next event, pending pings, and job heartbeats (`--json` for machine output) - [`adjust EVENT_ID --add-prep N`](./src/commutecompass/cli.py) — shift a plan's prep time by N minutes -- [`config show`](./src/commutecompass/cli.py) / [`config set KEY VALUE`](./src/commutecompass/cli.py) — view or edit allowlisted config fields +- [`snooze`](./src/commutecompass/cli.py) / [`mute`](./src/commutecompass/cli.py) / [`unmute`](./src/commutecompass/cli.py) / [`undo`](./src/commutecompass/cli.py) — adjust or suppress an event's pings +- [`config show`](./src/commutecompass/cli.py) / [`config set KEY VALUE`](./src/commutecompass/cli.py) / [`config unset KEY`](./src/commutecompass/cli.py) / [`config reset`](./src/commutecompass/cli.py) — view or edit allowlisted config fields +- [`geocode-cache`](./src/commutecompass/cli.py) — inspect or clear the geocode cache +- [`mta-alerts`](./src/commutecompass/cli.py) — print current MTA alerts - [`test-notify`](./src/commutecompass/notify.py) — emit a test message via the configured notifier - [`where`](./src/commutecompass/cli.py) — print the latest stored current location ## Configuration -See [`examples/config.toml`](./examples/) and [`examples/env.example`](./examples/) for the full configuration schema. Architecture and implementation notes live in [`plan.md`](./plan.md). +See [`examples/config.toml`](./examples/) and [`examples/env.example`](./examples/) for the full configuration schema. Architecture and implementation notes live in [`AGENTS.md`](./AGENTS.md). ## OpenClaw integration diff --git a/examples/config.toml b/examples/config.toml index 186663c..8ae2006 100644 --- a/examples/config.toml +++ b/examples/config.toml @@ -25,6 +25,22 @@ poll_interval_seconds = 60 quiet_hours_start = "22:00" quiet_hours_end = "07:00" +[weather] +# Pad the departure buffer when rain/snow is expected around the commute, using +# the free keyless Open-Meteo forecast. Disabled by default. +enabled = false +rain_buffer_minutes = 10 +snow_buffer_minutes = 20 +precip_probability_threshold = 50 + +[monitoring] +# Optional dead-man's-switch. poll_staleness_minutes controls how long the poll +# loop may be silent before the morning digest warns the timer is dead. Set +# heartbeat_url to a healthchecks.io-style URL to get an off-host alert when the +# per-minute poll stops pinging it entirely. +poll_staleness_minutes = 15 +# heartbeat_url = "https://hc-ping.com/your-uuid-here" + [paths] venues_file = "/etc/commutecompass/known_venues.yaml" db_path = "/var/lib/commutecompass/state.db" diff --git a/flake.nix b/flake.nix index 7f75f07..f9689ef 100644 --- a/flake.nix +++ b/flake.nix @@ -23,6 +23,7 @@ ps: with ps; [ pip pytest + pytest-cov pydantic click pyyaml diff --git a/nix/package.nix b/nix/package.nix index aa93a7e..1ba3860 100644 --- a/nix/package.nix +++ b/nix/package.nix @@ -2,7 +2,9 @@ python3Packages.buildPythonApplication rec { pname = "commutecompass"; - version = "0.1.0"; + # Single source of truth: read the version from pyproject.toml so it never + # drifts from the Python package metadata. + version = (lib.importTOML ../pyproject.toml).project.version; format = "pyproject"; src = ./..; diff --git a/pyproject.toml b/pyproject.toml index 3dad375..3c36d02 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,3 +38,11 @@ mypy_path = ["src"] [tool.pytest.ini_options] pythonpath = ["src"] + +[tool.coverage.run] +source = ["commutecompass"] +branch = true + +[tool.coverage.report] +# CI enforces a floor via --cov-fail-under; current coverage is ~87%. +show_missing = true diff --git a/src/commutecompass/cli.py b/src/commutecompass/cli.py index ca36674..31276df 100644 --- a/src/commutecompass/cli.py +++ b/src/commutecompass/cli.py @@ -135,7 +135,7 @@ def oauth(ctx: click.Context) -> None: token_path=token_path, ) client.authorize_interactive() - click.echo("OAuth授权完成。Token已保存。") + click.echo("OAuth authorization complete. Token saved.") # ─────────── init-db ────────────────────────────────────────────────────────── @@ -740,7 +740,11 @@ def config_show(ctx: click.Context, as_json: bool) -> None: @click.pass_context def config_set(ctx: click.Context, key: str, value: str) -> None: """Set an allowlisted config field. KEY uses dotted form (e.g. prep.prep_minutes).""" - from commutecompass.config import ConfigSetError, update_config_field + from commutecompass.config import ( + EXTERNALLY_SCHEDULED_KEYS, + ConfigSetError, + update_config_field, + ) config_path: Path = ctx.obj["config_path"] try: @@ -752,6 +756,12 @@ def config_set(ctx: click.Context, key: str, value: str) -> None: click.echo(f"Error writing {config_path}: {exc}", err=True) sys.exit(EXIT_CONFIG) click.echo(f"{key} = {coerced!r}") + if key in EXTERNALLY_SCHEDULED_KEYS: + click.echo( + f"Note: {key} is not read at runtime — the schedule is set by your " + "systemd timer or cron entry. Update that to change when the job runs.", + err=True, + ) @config.command(name="unset") diff --git a/src/commutecompass/config.py b/src/commutecompass/config.py index 62f54d9..19f6582 100644 --- a/src/commutecompass/config.py +++ b/src/commutecompass/config.py @@ -134,6 +134,52 @@ class NotifyConfig(BaseModel): mode: Literal["stdout", "telegram"] = "stdout" +class WeatherConfig(BaseModel): + """Weather-aware buffer: pad departure when rain/snow is expected. + + Uses the free Open-Meteo forecast API (no key). When precipitation is + likely around the commute window, extra minutes are subtracted from the + leave time so the alarm fires earlier. + """ + + enabled: bool = False + forecast_url: str = "https://api.open-meteo.com/v1/forecast" + # Extra minutes added to the buffer when rain / snow is expected. + rain_buffer_minutes: int = Field(default=10, ge=0, le=120) + snow_buffer_minutes: int = Field(default=20, ge=0, le=240) + # Minimum precipitation probability (%) before the rain buffer applies. + precip_probability_threshold: int = Field(default=50, ge=0, le=100) + + @field_validator("forecast_url") + @classmethod + def _validate_forecast_url(cls, v: str) -> str: + if v and not (v.startswith("http://") or v.startswith("https://")): + raise ValueError(f"weather.forecast_url must start with http(s)://, got {v!r}") + return v.rstrip("/") + + +class MonitoringConfig(BaseModel): + """Dead-man's-switch / heartbeat configuration. + + ``heartbeat_url`` is an optional healthchecks.io-style endpoint that the + poll job pings on every successful run; the external service alerts when the + pings stop (i.e. the per-minute timer died). ``poll_staleness_minutes`` is + the threshold past which the morning digest flags that poll has not run. + """ + + heartbeat_url: Optional[str] = None + poll_staleness_minutes: int = Field(default=15, ge=1, le=24 * 60) + + @field_validator("heartbeat_url") + @classmethod + def _validate_heartbeat_url(cls, v: Optional[str]) -> Optional[str]: + if v and not (v.startswith("http://") or v.startswith("https://")): + raise ValueError( + f"monitoring.heartbeat_url must start with http(s)://, got {v!r}" + ) + return v + + class Config(BaseModel): origin: Origin calendars: list[CalendarSpec] @@ -146,6 +192,8 @@ class Config(BaseModel): mode_overrides: list[ModeOverride] = [] home_assistant: HomeAssistantConfig = HomeAssistantConfig() notify: NotifyConfig = NotifyConfig() + monitoring: MonitoringConfig = MonitoringConfig() + weather: WeatherConfig = WeatherConfig() # Loaded from env, not TOML: google_maps_api_key: str = "" google_oauth_client_secret_json: str = "" @@ -338,6 +386,15 @@ def _coerce_bool(value: str) -> bool: } +# Keys that the app itself never reads at runtime — an external scheduler +# (systemd timer, cron) drives when `morning`/`poll` run, so editing these in +# TOML records intent but does NOT change the schedule. The CLI warns when one +# is set so a chat user isn't misled into thinking it took effect. +EXTERNALLY_SCHEDULED_KEYS: frozenset[str] = frozenset( + {"scheduling.morning_run_time", "scheduling.poll_interval_seconds"} +) + + class ConfigSetError(Exception): """Raised by ``update_config_field`` for an invalid key or value.""" diff --git a/src/commutecompass/format.py b/src/commutecompass/format.py index 694cf8e..b780761 100644 --- a/src/commutecompass/format.py +++ b/src/commutecompass/format.py @@ -282,6 +282,12 @@ def _format_plan_summary(plan: Plan) -> str: if plan.route: lines.append(f" {escape_md(_route_summary(plan.route))}") + if plan.weather_buffer_minutes > 0 and plan.weather_reason: + emoji = "❄️" if plan.weather_reason == "snow" else "🌧️" + lines.append( + f" {emoji} {escape_md(f'+{plan.weather_buffer_minutes} min for {plan.weather_reason}')}" + ) + lines.append("") return "\n".join(lines) @@ -393,7 +399,10 @@ def _route_summary(route: Route) -> str: else: transfer_suffix = "" - return f"{mode_label} ({total_min} min{transfer_suffix})" + # Mark routes that came from cache/estimate rather than live routing. + estimate_suffix = ", estimated" if route.approximate else "" + + return f"{mode_label} ({total_min} min{transfer_suffix}{estimate_suffix})" def _route_summary_detailed(route: Route) -> str: diff --git a/src/commutecompass/jobs/morning.py b/src/commutecompass/jobs/morning.py index 6ab8b94..2f4230d 100644 --- a/src/commutecompass/jobs/morning.py +++ b/src/commutecompass/jobs/morning.py @@ -16,10 +16,11 @@ import logging import uuid +from datetime import datetime from pathlib import Path from typing import TYPE_CHECKING -from commutecompass.calendar_client import CalendarClient +from commutecompass.calendar_client import AuthError, CalendarClient from commutecompass.config import Config from commutecompass.format import format_digest, format_leave_ping, format_prep_ping from commutecompass.mta import fetch_alerts @@ -59,6 +60,7 @@ def run(config: Config) -> None: # noqa: C901 token_path=config.paths.oauth_token_path, ) events: list[Event] = [] + auth_failed = False try: events = calendar_client.fetch_events( calendars=[ @@ -68,6 +70,13 @@ def run(config: Config) -> None: # noqa: C901 start=today_start, end=today_end, ) + except AuthError as exc: + # An expired/invalid OAuth token degrades to "no events" — but unlike a + # transient API blip it will NOT fix itself, so surface it loudly in the + # digest footer instead of letting the user think their day is empty. + logger.error("Calendar auth failed — re-auth needed: %s", exc) + auth_failed = True + events = [] except Exception as exc: logger.error("Failed to fetch calendar events: %s", exc) # Continue with empty events list — digest will reflect no events @@ -192,16 +201,10 @@ def run(config: Config) -> None: # noqa: C901 except Exception as exc: logger.warning("Failed to fetch MTA alerts: %s", exc) - # Filter to those affecting today's planned routes - from commutecompass.llm import OpencodeGoClient + # Filter to those affecting today's planned routes. Reuse the llm_client + # built above for planning rather than constructing a second identical one. from commutecompass.mta import select_actionable_alerts - llm_client = OpencodeGoClient( - endpoint=config.opencode_go.endpoint, - token=config.opencode_go_token, - model=config.opencode_go.model, - ) - affecting_alerts: list[Alert] = [] for plan in plans: if plan.route and plan.leave_at: @@ -222,7 +225,10 @@ def run(config: Config) -> None: # noqa: C901 ) # ── 6. Build and send digest ────────────────────────────────────────────── - ops_notes = _operations_notes(plans, all_alerts) + poll_stale = _poll_heartbeat_stale(store, config, _now) + ops_notes = _operations_notes( + plans, all_alerts, auth_failed=auth_failed, poll_stale=poll_stale + ) digest = format_digest(plans, affecting_alerts, operations_notes=ops_notes) notifier = build_notifier(config) sent = notifier.send(digest) @@ -231,13 +237,20 @@ def run(config: Config) -> None: # noqa: C901 else: logger.warning("morning job: digest send failed") + # Record morning's own heartbeat and ping the external dead-man's-switch. + store.record_job_success("morning", _now) + if config.monitoring.heartbeat_url: + from commutecompass.monitoring import ping_heartbeat + + ping_heartbeat(config.monitoring.heartbeat_url) + # ── 7. Log structured summary ──────────────────────────────────────────── unresolved = sum(1 for p in plans if p.error == "location_unresolved") no_route = sum(1 for p in plans if p.error == "no_route") too_imminent = sum(1 for p in plans if p.error == "too_imminent") logger.info( "morning_run_summary: events=%d plans=%d unresolved=%d no_route=%d " - "too_imminent=%d alerts=%d digest_sent=%s", + "too_imminent=%d alerts=%d digest_sent=%s auth_failed=%s", len(events), len(plans), unresolved, @@ -245,18 +258,51 @@ def run(config: Config) -> None: # noqa: C901 too_imminent, len(affecting_alerts), sent, + auth_failed, ) -def _operations_notes(plans: list[Plan], all_alerts: list[Alert]) -> list[str]: +def _poll_heartbeat_stale(store: Store, config: Config, now: datetime) -> bool: + """True if the poll loop has not completed within the staleness threshold. + + Poll runs every minute, so by morning a healthy poll heartbeat is seconds + old. A stale (or missing) heartbeat means the per-minute timer is dead and + no leave/prep alarms will fire today — worth shouting about in the digest. + """ + from datetime import timedelta + + last = store.get_job_heartbeat("poll") + if last is None: + return True + threshold = timedelta(minutes=config.monitoring.poll_staleness_minutes) + return (now - last) > threshold + + +def _operations_notes( + plans: list[Plan], + all_alerts: list[Alert], + *, + auth_failed: bool = False, + poll_stale: bool = False, +) -> list[str]: """Build the "Operations:" footer items for the morning digest. Surfaces degraded-service signals that today would only land in stderr: - MTA feeds that went silent after retries, plans whose location couldn't - be resolved, and plans that were stored with "too_imminent" / "no_route". + calendar auth that needs re-running, a dead poll timer, MTA feeds that went + silent after retries, plans whose location couldn't be resolved, and plans + that were stored with "too_imminent" / "no_route". """ notes: list[str] = [] + # Calendar auth failure first — without it the whole digest is empty and the + # user would otherwise have no idea their token lapsed. + if auth_failed: + notes.append("Calendar auth expired — re-run `commutecompass oauth`") + + # A dead poll timer means no alarms will fire today. + if poll_stale: + notes.append("Poll loop has not run recently — alarms may not fire (check the timer)") + # Per-feed MTA failures reported by fetch_alerts (set as an attribute). failed_feeds: list[str] = getattr(fetch_alerts, "last_failed_systems", []) for system in failed_feeds: diff --git a/src/commutecompass/jobs/poll.py b/src/commutecompass/jobs/poll.py index 2991b01..fa02808 100644 --- a/src/commutecompass/jobs/poll.py +++ b/src/commutecompass/jobs/poll.py @@ -24,6 +24,15 @@ # Only applied when the caller did not inject a fetch_alerts_fn (tests bypass). _MTA_CACHE_TTL_SECONDS = 180 +# Re-fire policy for actionable pings whose send failed transiently. The claim +# already consumed the row; on failure we hand it back (release_ping) so the +# next poll re-attempts — but only for these kinds, only within the grace window +# of their scheduled time (a stale alarm is worse than none), and only up to the +# attempt cap so a persistently-broken notifier can never storm. +_RETRYABLE_PING_KINDS = frozenset({"prep", "leave"}) +_MAX_SEND_ATTEMPTS = 5 +_SEND_RETRY_GRACE_SECONDS = 15 * 60 + # Module-level memo: (captured_at, (subway_url, lirr_url, bus_url), alerts). _alerts_cache: "tuple[datetime, tuple[str, str, str], list[Alert]] | None" = None @@ -194,11 +203,32 @@ def run( if sent_ok: logger.info("Fired ping %s (%s)", ping.id, ping.kind) else: - logger.warning( - "Send failed for claimed ping %s (%s) — not retrying", - ping.id, - ping.kind, + # The claim already set fired=1. For actionable pings still within + # their grace window and under the attempt cap, hand the row back so + # the next poll retries; otherwise leave it fired (give up) so a + # broken notifier can't storm or deliver a stale alarm. + attempt = ping.send_attempts + 1 + within_grace = (now - ping.fire_at).total_seconds() <= _SEND_RETRY_GRACE_SECONDS + retryable = ( + ping.kind in _RETRYABLE_PING_KINDS + and attempt < _MAX_SEND_ATTEMPTS + and within_grace ) + if retryable and _store.release_ping(ping.id): + logger.warning( + "Send failed for %s ping %s (attempt %d/%d) — released for retry", + ping.kind, + ping.id, + attempt, + _MAX_SEND_ATTEMPTS, + ) + else: + logger.warning( + "Send failed for claimed ping %s (%s) after %d attempt(s) — giving up", + ping.id, + ping.kind, + attempt, + ) # Additive HA alarm: fire AFTER the primary send attempt regardless of # its outcome (claim already consumed the row). An HA outage cannot @@ -352,6 +382,15 @@ def run( _store.cancel_pings(plan.event.id) _schedule_pings_for_plan(new_plan, _store, now) + # ── Phase 6: heartbeat ──────────────────────────────────────────────────── + # Record that poll completed, and ping the external dead-man's-switch (if + # configured) — the per-minute poll is the natural liveness signal. + _store.record_job_success("poll", now) + if config.monitoring.heartbeat_url: + from commutecompass.monitoring import ping_heartbeat + + ping_heartbeat(config.monitoring.heartbeat_url) + def _location_update_significant(old_plan: Plan, new_plan: Plan) -> bool: """Stricter check used only for Phase 5 location-driven updates. diff --git a/src/commutecompass/llm.py b/src/commutecompass/llm.py index 09e56be..fe01647 100644 --- a/src/commutecompass/llm.py +++ b/src/commutecompass/llm.py @@ -11,6 +11,7 @@ import httpx from commutecompass.models import ResolvedLocation +from commutecompass.retry import retry if TYPE_CHECKING: from commutecompass.models import Alert, Route @@ -69,37 +70,12 @@ def resolve_location(self, raw: str, hints: dict[str, Any]) -> Optional[Resolved return location def _call(self, raw: str, hints: dict[str, Any]) -> str: - """Make the chat completion request and return the content string.""" - payload: dict[str, Any] = { - "model": self.model, - "messages": [ - {"role": "system", "content": _SYSTEM_PROMPT}, - {"role": "user", "content": raw}, - ], - "temperature": 0.0, - } - headers: dict[str, str] = { - "Authorization": f"Bearer {self.token}", - "Content-Type": "application/json", - } - with httpx.Client(timeout=8.0) as client: - resp = client.post(self.endpoint, json=payload, headers=headers) - resp.raise_for_status() - data = resp.json() - if not isinstance(data, dict): - return "" - # OpenAI-compatible chat completion shape - choices = data.get("choices") - if not isinstance(choices, list) or not choices: - return "" - first = choices[0] - if not isinstance(first, dict): - return "" - message = first.get("message") - if not isinstance(message, dict): - return "" - content = message.get("content") - return content if isinstance(content, str) else "" + """Make the resolution chat completion request and return the content. + + ``hints`` is accepted for API stability but the location prompt is + self-contained, so it is not currently threaded into the request. + """ + return self._chat_completion(_SYSTEM_PROMPT, raw, timeout_seconds=8.0) def _chat_completion(self, system_prompt: str, user_content: str, *, timeout_seconds: float = 8.0) -> str: payload: dict[str, Any] = { @@ -114,10 +90,18 @@ def _chat_completion(self, system_prompt: str, user_content: str, *, timeout_sec "Authorization": f"Bearer {self.token}", "Content-Type": "application/json", } - with httpx.Client(timeout=timeout_seconds) as client: - resp = client.post(self.endpoint, json=payload, headers=headers) - resp.raise_for_status() - data = resp.json() + + def _do_request() -> object: + # Retry transient blips (timeouts / network / 5xx / 429); a single + # flaky response should not cost the whole resolution. Non-transient + # errors (4xx, bad JSON) raise straight through to the caller, which + # logs and returns None. + with httpx.Client(timeout=timeout_seconds) as client: + resp = client.post(self.endpoint, json=payload, headers=headers) + resp.raise_for_status() + return resp.json() + + data = retry(_do_request, attempts=2, label="opencode-go") if not isinstance(data, dict): return "" choices = data.get("choices") diff --git a/src/commutecompass/models.py b/src/commutecompass/models.py index ff206bb..6d831e0 100644 --- a/src/commutecompass/models.py +++ b/src/commutecompass/models.py @@ -197,6 +197,11 @@ class TransitLeg(BaseModel): arrive_at: datetime duration_seconds: int summary: str + # Structured boarding/alighting stop names. Kept separately from ``summary`` + # so consumers (e.g. MTA alert relevance) don't have to re-parse the + # human-readable string, which breaks on stop names containing "to"/"and". + departure_stop: Optional[str] = None + arrival_stop: Optional[str] = None class Route(BaseModel): @@ -207,6 +212,11 @@ class Route(BaseModel): transfers: int = 0 fare_estimate_cents: Optional[int] = None raw_provider_payload: Optional[dict[str, Any]] = None + # True when the route did not come from a live Directions response — either + # a previously-cached route reused during an API outage, or a coarse + # distance/speed estimate. Surfaced in the digest so the user knows the + # timing is best-effort rather than schedule-accurate. + approximate: bool = False class Plan(BaseModel): @@ -215,6 +225,11 @@ class Plan(BaseModel): leave_at: Optional[datetime] = None prep_at: Optional[datetime] = None error: Optional[str] = None + # Extra minutes folded into the buffer for expected precipitation, plus a + # short reason ("rain"/"snow") for display. Zero when weather is disabled + # or the forecast is clear. + weather_buffer_minutes: int = 0 + weather_reason: Optional[str] = None class Alert(BaseModel): @@ -236,6 +251,10 @@ class PingEntry(BaseModel): fired: bool = False fired_at: Optional[datetime] = None message: str + # Number of send attempts that have already failed for this ping. Used by + # the poll loop to bound cross-tick re-fire of actionable pings whose send + # failed transiently (see ``Store.release_ping``). + send_attempts: int = 0 class CurrentLocation(BaseModel): diff --git a/src/commutecompass/monitoring.py b/src/commutecompass/monitoring.py new file mode 100644 index 0000000..a0524fc --- /dev/null +++ b/src/commutecompass/monitoring.py @@ -0,0 +1,39 @@ +"""Dead-man's-switch heartbeat. + +A self-hosted alarm has a silent failure mode: if the per-minute poll timer +stops firing, the user just stops getting notifications and has no way to know. +The internal `job_heartbeat` table lets the morning digest report that poll went +stale, and an optional external healthchecks.io-style URL provides an off-host +safety net that alerts when the pings stop entirely. +""" + +from __future__ import annotations + +import logging + +import httpx + +from commutecompass.retry import retry + +logger = logging.getLogger(__name__) + + +def ping_heartbeat(url: str, *, timeout: float = 5.0) -> bool: + """GET a healthcheck URL to signal liveness. Returns True on 2xx. + + Failures are swallowed (logged at debug): a monitoring blip must never break + the job whose health it is reporting. + """ + if not url: + return False + + def _do() -> None: + with httpx.Client(timeout=timeout) as client: + client.get(url).raise_for_status() + + try: + retry(_do, attempts=2, label="heartbeat") + return True + except Exception as exc: # pragma: no cover - exercised via swallow path + logger.debug("heartbeat ping failed for %s: %s", url, exc) + return False diff --git a/src/commutecompass/mta.py b/src/commutecompass/mta.py index add9fd0..312a644 100644 --- a/src/commutecompass/mta.py +++ b/src/commutecompass/mta.py @@ -2,6 +2,7 @@ from __future__ import annotations +import hashlib import logging import re from datetime import datetime @@ -126,14 +127,14 @@ def _fetch_feed(url: str, system: str, client: httpx.Client) -> list[Alert]: alerts: list[Alert] = [] for entity in feed.entity: if entity.HasField("alert"): - parsed = _parse_alert(entity.alert, system) + parsed = _parse_alert(entity.alert, system, entity_id=entity.id) if parsed: alerts.append(parsed) return alerts -def _parse_alert(gtfs_alert: GtfsAlert, system: str) -> Optional[Alert]: +def _parse_alert(gtfs_alert: GtfsAlert, system: str, *, entity_id: str = "") -> Optional[Alert]: """Map a GTFS-RT Alert proto into our Alert model.""" if not gtfs_alert.informed_entity: return None @@ -186,12 +187,20 @@ def _parse_alert(gtfs_alert: GtfsAlert, system: str) -> Optional[Alert]: if translations: url = translations[0].text if translations[0].text else None - # Generate stable alert id from affected routes + start time of first period - first_period = active_periods[0] if active_periods else (None, None) - id_base = f"{system}:{','.join(sorted(affected_routes)) if affected_routes else 'unknown'}" - if first_period[0]: - id_base += f":{first_period[0].strftime('%Y%m%d%H%M')}" - alert_id = id_base[:128] + # Prefer the feed's own entity id — it is the canonical stable identifier. + # Fall back to a derived id (routes + first-period start) only when the feed + # omits it, and disambiguate that fallback with a short hash of the alert + # text so two distinct alerts sharing routes + start minute don't collide + # into one ledger entry (which would suppress the second). + if entity_id: + alert_id = f"{system}:{entity_id}"[:128] + else: + first_period = active_periods[0] if active_periods else (None, None) + id_base = f"{system}:{','.join(sorted(affected_routes)) if affected_routes else 'unknown'}" + if first_period[0]: + id_base += f":{first_period[0].strftime('%Y%m%d%H%M')}" + text_digest = hashlib.sha1(f"{header}\n{description}".encode()).hexdigest()[:8] + alert_id = f"{id_base}:{text_digest}"[:128] return Alert( id=alert_id, @@ -361,16 +370,17 @@ def _build_route_context(route: Route) -> tuple[set[str], set[str]]: line_ids.add(leg.line.lower().strip()) if leg.headsign: stop_names.add(leg.headsign.lower().strip()) - # Extract origin/destination stop names from summary (e.g. "C from A to B") - if leg.summary: - parts = leg.summary.split(" from ") - if len(parts) >= 2: - # left side is the line; right side is "A to B" - right = parts[1] - for stop in right.replace(" to ", " ").replace(" and ", " ").split(): - stop = stop.strip(",. ") - if stop and stop not in ("to", "and"): - stop_names.add(stop.lower()) + # Use the structured boarding/alighting stops rather than re-parsing the + # human summary (which breaks on stop names containing "to"/"and"). Add + # both the whole name and its word tokens for keyword matching. + for stop in (leg.departure_stop, leg.arrival_stop): + if not stop: + continue + normalized = stop.lower().strip() + stop_names.add(normalized) + for word in re.split(r"[^\w]+", normalized): + if word: + stop_names.add(word) return stop_names, line_ids @@ -512,24 +522,26 @@ def _systems_lines_overlap(alert: Alert, route: Route) -> bool: def _line_matches(alert: Alert, leg: TransitLeg) -> bool: - """Check if a transit leg's line/route matches alert's affected_routes.""" + """Check if a transit leg's line/route matches alert's affected_routes. + + Matches on whole line designators (case-insensitive), not substrings: bare + substring matching made affected route "1" match leg lines "B41", "M15", or + a hypothetical "10". A decorated line like "C-local" still matches affected + route "C" because we also compare against the line's alphanumeric tokens. + """ if not alert.affected_routes: # No specific routes means whole system is affected return True - if leg.line: - # Direct line match - if leg.line in alert.affected_routes: - return True + if not leg.line: + return False - # Also check if any affected route is a substring of the line (route IDs - # sometimes have prefixes like "ABC" for the C line) - if leg.line: - for affected in alert.affected_routes: - if affected in leg.line: - return True + line = leg.line.upper().strip() + line_tokens = {tok for tok in re.split(r"[^A-Z0-9]+", line) if tok} + line_tokens.add(line) # also match multi-word lines as a whole - return False + affected = {route.upper().strip() for route in alert.affected_routes} + return bool(affected & line_tokens) def _time_overlaps(alert: Alert, route: Route, at_time: datetime) -> bool: diff --git a/src/commutecompass/planner.py b/src/commutecompass/planner.py index d7e19af..8f64cae 100644 --- a/src/commutecompass/planner.py +++ b/src/commutecompass/planner.py @@ -130,7 +130,7 @@ def plan_event( Returns a Plan with route and timing, or an error Plan on failure. """ from commutecompass.resolver import resolve - from commutecompass.routing import plan_route + from commutecompass.routing import estimate_route, plan_route, route_cache_key from commutecompass.geocode import geocode # Step 1: resolve location (override applied first) @@ -158,8 +158,12 @@ def geocoder(addr: str) -> Optional[GeocodeResult]: if resolved is None: return Plan(event=event, error="location_unresolved") - # Step 2: plan route + # Step 2: plan route. A live route is cached for reuse; if live routing is + # unavailable we fall back to the last good cached route, then to a coarse + # distance estimate — so an API outage degrades to "approximate" rather than + # silently producing no plan (and therefore no alarm) for the whole day. route_origin = effective_origin(config, store, override=origin_override) + cache_key = route_cache_key(route_origin) route = plan_route( origin=route_origin, @@ -168,12 +172,25 @@ def geocoder(addr: str) -> Optional[GeocodeResult]: mode=mode, api_key=config.google_maps_api_key, ) + if route is not None: + store.cache_route(cache_key, resolved.value, mode, route) + else: + route = store.get_cached_route(cache_key, resolved.value, mode) + if route is not None: + route = route.model_copy(update={"approximate": True}) + else: + route = estimate_route(route_origin, resolved, event.start, mode) if route is None: return Plan(event=event, error="no_route") - # Step 3: compute timings + # Step 3: compute timings. Add a weather buffer when precipitation is + # expected around the event so the alarm fires earlier on a rainy/snowy day. + from commutecompass.weather import weather_buffer as _weather_buffer + + wx = _weather_buffer(route_origin.lat, route_origin.lon, event.start, config.weather) + travel = timedelta(seconds=route.total_duration_seconds) - buffer = timedelta(minutes=config.prep.safety_buffer_minutes) + buffer = timedelta(minutes=config.prep.safety_buffer_minutes + wx.minutes) prep = timedelta(minutes=config.prep.prep_minutes) leave_at = event.start - travel - buffer @@ -190,6 +207,8 @@ def geocoder(addr: str) -> Optional[GeocodeResult]: leave_at=leave_at, prep_at=prep_at, error="too_imminent", + weather_buffer_minutes=wx.minutes, + weather_reason=wx.reason, ) return Plan( @@ -197,4 +216,6 @@ def geocoder(addr: str) -> Optional[GeocodeResult]: route=route, leave_at=leave_at, prep_at=prep_at, + weather_buffer_minutes=wx.minutes, + weather_reason=wx.reason, ) diff --git a/src/commutecompass/routing.py b/src/commutecompass/routing.py index 12524a8..2062db0 100644 --- a/src/commutecompass/routing.py +++ b/src/commutecompass/routing.py @@ -3,7 +3,7 @@ from __future__ import annotations import math -from datetime import datetime +from datetime import datetime, timedelta from typing import Any, Literal, Optional import httpx @@ -17,6 +17,80 @@ def _unix(dt: datetime) -> int: return int(dt.timestamp()) +def route_cache_key(origin: Origin) -> str: + """Stable cache key for an origin. + + Rounds coordinates to ~11 m (4 decimals) so jitter in GPS-derived origins + doesn't fragment the cache while still distinguishing real start points. + """ + return f"{origin.lat:.4f},{origin.lon:.4f}" + + +# Effective door-to-door speeds (km/h) for the coarse fallback estimate. These +# bake in waiting/transfers/parking, so they are deliberately well below vehicle +# cruising speed — the goal is a leave-time that is roughly right, not a schedule. +_FALLBACK_SPEED_KMH: dict[str, float] = { + "transit": 18.0, + "driving": 25.0, + "bicycling": 14.0, + "walking": 4.8, +} + + +def _haversine_km(lat1: float, lon1: float, lat2: float, lon2: float) -> float: + """Great-circle distance between two WGS84 points, in kilometers.""" + r = 6371.0 + p1, p2 = math.radians(lat1), math.radians(lat2) + dphi = math.radians(lat2 - lat1) + dlambda = math.radians(lon2 - lon1) + a = math.sin(dphi / 2) ** 2 + math.cos(p1) * math.cos(p2) * math.sin(dlambda / 2) ** 2 + return 2 * r * math.asin(min(1.0, math.sqrt(a))) + + +def estimate_route( + origin: Origin, + destination: ResolvedLocation, + arrival_time: datetime, + mode: Literal["transit", "driving", "walking", "bicycling"] = "transit", +) -> Optional[Route]: + """Build a coarse distance/speed route estimate when live routing is down. + + Returns None when the destination has no coordinates (e.g. an unresolved + station name) — there is nothing to measure against, so the caller should + fall back to ``no_route`` rather than fabricate a number. The returned + route is flagged ``approximate``. + """ + if destination.lat is None or destination.lon is None: + return None + + distance_km = _haversine_km(origin.lat, origin.lon, destination.lat, destination.lon) + # Crow-flies underestimates real path length; pad by 30% as a rough detour + # factor before dividing by the mode speed. + speed_kmh = _FALLBACK_SPEED_KMH.get(mode, _FALLBACK_SPEED_KMH["transit"]) + hours = (distance_km * 1.3) / speed_kmh + duration_seconds = max(60, int(hours * 3600)) + + depart_at = arrival_time - timedelta(seconds=duration_seconds) + leg = TransitLeg( + mode=mode.upper(), # type: ignore[arg-type] + system=None, + line=None, + headsign=None, + depart_at=depart_at, + arrive_at=arrival_time, + duration_seconds=duration_seconds, + summary=f"Estimated {mode} (~{distance_km:.1f} km, live routing unavailable)", + ) + return Route( + legs=[leg], + depart_at=depart_at, + arrive_at=arrival_time, + total_duration_seconds=duration_seconds, + transfers=0, + approximate=True, + ) + + def _parse_step(step: dict[str, Any], nyc_tz: Any) -> Optional[TransitLeg]: """Parse a single step from a Directions leg into a TransitLeg. @@ -55,6 +129,8 @@ def _parse_step(step: dict[str, Any], nyc_tz: Any) -> Optional[TransitLeg]: system: Optional[str] = None line: Optional[str] = None headsign: Optional[str] = None + departure_stop_name: Optional[str] = None + arrival_stop_name: Optional[str] = None summary = "" if mode == "TRANSIT": @@ -98,6 +174,8 @@ def _parse_step(step: dict[str, Any], nyc_tz: Any) -> Optional[TransitLeg]: dep_name = departure_stop.get("name", "Unknown") arr_name = arrival_stop.get("name", "Unknown") + departure_stop_name = dep_name if dep_name != "Unknown" else None + arrival_stop_name = arr_name if arr_name != "Unknown" else None summary = f"{line or 'Transit'} from {dep_name} to {arr_name}" elif mode == "WALKING": html_inst = step.get("html_instructions", "") @@ -120,6 +198,8 @@ def _parse_step(step: dict[str, Any], nyc_tz: Any) -> Optional[TransitLeg]: arrive_at=arrive_at, duration_seconds=duration_sec, summary=summary, + departure_stop=departure_stop_name, + arrival_stop=arrival_stop_name, ) diff --git a/src/commutecompass/store.py b/src/commutecompass/store.py index 738e852..9e29fa3 100644 --- a/src/commutecompass/store.py +++ b/src/commutecompass/store.py @@ -10,7 +10,14 @@ import sqlite3 -from commutecompass.models import AdjustRow, CurrentLocation, Plan, PingEntry, ResolvedLocation +from commutecompass.models import ( + AdjustRow, + CurrentLocation, + Plan, + PingEntry, + ResolvedLocation, + Route, +) def _now_iso() -> str: @@ -45,6 +52,13 @@ def _json_serializer(obj: object) -> str: raise TypeError(f"Object of type {type(obj).__name__} is not JSON serializable") +# Bump when the on-disk schema changes. Stored in SQLite's built-in +# ``PRAGMA user_version`` so future migrations can branch on the version the DB +# was last initialised at, instead of probing every table with PRAGMA +# table_info. The existing column adds remain idempotent and run unconditionally. +SCHEMA_VERSION = 1 + + class Store: """SQLite store for plans, pings, geocode cache, and alert ledger.""" @@ -100,6 +114,14 @@ def init_schema(self) -> None: resolved_json TEXT NOT NULL, cached_at TEXT NOT NULL ); + CREATE TABLE IF NOT EXISTS route_cache ( + origin_key TEXT NOT NULL, + dest_value TEXT NOT NULL, + mode TEXT NOT NULL, + route_json TEXT NOT NULL, + cached_at TEXT NOT NULL, + PRIMARY KEY (origin_key, dest_value, mode) + ); CREATE TABLE IF NOT EXISTS alerts_seen ( alert_id TEXT NOT NULL, event_id TEXT NOT NULL, @@ -124,6 +146,10 @@ def init_schema(self) -> None: muted_at TEXT NOT NULL, expires_at TEXT ); + CREATE TABLE IF NOT EXISTS job_heartbeat ( + job TEXT PRIMARY KEY, + last_success TEXT NOT NULL + ); """) # Migrate adjust_log: add columns required for `undo` (single-step # restoration of the exact previous prep_at + undone flag). @@ -157,6 +183,22 @@ def init_schema(self) -> None: cl_cols = {row[1] for row in conn.execute("PRAGMA table_info(current_location)").fetchall()} if "accuracy_m" not in cl_cols: conn.execute("ALTER TABLE current_location ADD COLUMN accuracy_m REAL") + # Add send_attempts to pings: counts failed sends so the poll loop + # can bound cross-tick re-fire of actionable pings (see release_ping). + ping_cols = {row[1] for row in conn.execute("PRAGMA table_info(pings)").fetchall()} + if "send_attempts" not in ping_cols: + conn.execute( + "ALTER TABLE pings ADD COLUMN send_attempts INTEGER NOT NULL DEFAULT 0" + ) + # Stamp the schema version last, once all tables/columns exist. + # PRAGMA can't be parameterised, but SCHEMA_VERSION is a trusted int. + conn.execute(f"PRAGMA user_version = {int(SCHEMA_VERSION)}") + + def schema_version(self) -> int: + """Return the schema version stamped in the database header.""" + with self._connect() as conn: + row = conn.execute("PRAGMA user_version").fetchone() + return int(row[0]) if row else 0 # ── Plan CRUD ────────────────────────────────────────────────────────────── @@ -268,7 +310,7 @@ def pending_pings(self, before: datetime) -> list[PingEntry]: with self._connect() as conn: rows = conn.execute( """ - SELECT id, event_id, kind, fire_at, fired, fired_at, message + SELECT id, event_id, kind, fire_at, fired, fired_at, message, send_attempts FROM pings WHERE fired = 0 AND fire_at <= ? ORDER BY fire_at @@ -287,6 +329,7 @@ def pending_pings(self, before: datetime) -> list[PingEntry]: fired=bool(row[4]), fired_at=datetime.fromisoformat(row[5]) if row[5] else None, message=row[6], + send_attempts=row[7], ) ) return pings @@ -307,9 +350,11 @@ def claim_ping(self, ping_id: str, fired_at: datetime) -> bool: caller (or a previous run) already claimed it — in which case the caller MUST NOT send, to avoid duplicate notifications. - Marking happens *before* the network send, so a failed send does not - cause a retry storm: a single attempt is the contract. Observability - is provided by the caller (log + summary line). + Marking happens *before* the network send so two concurrent runners + cannot both send. If the send then fails, the caller may hand the row + back with ``release_ping`` so a later poll re-attempts it (bounded by an + attempt cap + grace window). Observability is provided by the caller + (log + summary line). """ with self._connect() as conn: cursor = conn.execute( @@ -318,6 +363,27 @@ def claim_ping(self, ping_id: str, fired_at: datetime) -> bool: ) return cursor.rowcount == 1 + def release_ping(self, ping_id: str) -> bool: + """Hand a claimed ping back to the unfired pool after a failed send. + + Atomically transitions ``fired = 1 -> 0`` and increments + ``send_attempts`` so the next poll picks the row up again. Returns True + only when the row was actually claimed (``fired = 1``); a row already + re-fired or never claimed is left untouched. The caller is responsible + for bounding re-fire (attempt cap + grace window) so this can never + storm. + """ + with self._connect() as conn: + cursor = conn.execute( + """ + UPDATE pings + SET fired = 0, fired_at = NULL, send_attempts = send_attempts + 1 + WHERE id = ? AND fired = 1 + """, + (ping_id,), + ) + return cursor.rowcount == 1 + # ── Geocode cache ─────────────────────────────────────────────────────────── def cache_geocode(self, raw: str, resolved: ResolvedLocation) -> None: @@ -356,6 +422,72 @@ def get_geocode(self, raw: str, max_age_days: int = 30) -> Optional[ResolvedLoca data = _json_loads(row[0]) return ResolvedLocation.model_validate(data) + # ── Route cache ────────────────────────────────────────────────────────────── + + def cache_route(self, origin_key: str, dest_value: str, mode: str, route: Route) -> None: + """Store the latest successful route for an (origin, dest, mode) triple. + + Used as a fallback when live routing is unavailable. Only one row per + triple is kept (latest wins); the cached travel duration is what lets + the planner still compute a leave time during an API outage. + """ + with self._connect() as conn: + conn.execute( + """ + INSERT INTO route_cache (origin_key, dest_value, mode, route_json, cached_at) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT(origin_key, dest_value, mode) DO UPDATE SET + route_json = excluded.route_json, + cached_at = excluded.cached_at + """, + (origin_key, dest_value, mode, _json_dumps(route.model_dump()), _now_iso()), + ) + + def get_cached_route( + self, origin_key: str, dest_value: str, mode: str, max_age_days: int = 30 + ) -> Optional[Route]: + """Return the most recent cached route for the triple, if fresh enough.""" + from datetime import timedelta + + from commutecompass.timeutil import now_nyc + + cutoff = now_nyc() - timedelta(days=max_age_days) + with self._connect() as conn: + row = conn.execute( + """ + SELECT route_json FROM route_cache + WHERE origin_key = ? AND dest_value = ? AND mode = ? AND cached_at >= ? + """, + (origin_key, dest_value, mode, cutoff.isoformat()), + ).fetchone() + if row is None: + return None + return Route.model_validate(_json_loads(row[0])) + + # ── Job heartbeat ──────────────────────────────────────────────────────────── + + def record_job_success(self, job: str, at: datetime) -> None: + """Record the most recent successful completion of a named job.""" + with self._connect() as conn: + conn.execute( + """ + INSERT INTO job_heartbeat (job, last_success) + VALUES (?, ?) + ON CONFLICT(job) DO UPDATE SET last_success = excluded.last_success + """, + (job, at.isoformat()), + ) + + def get_job_heartbeat(self, job: str) -> Optional[datetime]: + """Return the last successful run time for a job, or None if never run.""" + with self._connect() as conn: + row = conn.execute( + "SELECT last_success FROM job_heartbeat WHERE job = ?", (job,) + ).fetchone() + if row is None: + return None + return datetime.fromisoformat(row[0]) + # ── Alert ledger ──────────────────────────────────────────────────────────── def mark_alert_seen(self, alert_id: str, event_id: str) -> None: diff --git a/src/commutecompass/venues.py b/src/commutecompass/venues.py index 9b72da5..f9c00f6 100644 --- a/src/commutecompass/venues.py +++ b/src/commutecompass/venues.py @@ -23,15 +23,18 @@ def _normalize(s: str) -> str: return s.strip() -def _jaccard(tokens_a: set[str], tokens_b: set[str]) -> float: - """Compute Jaccard similarity between two token sets.""" - if not tokens_a and not tokens_b: - return 1.0 - intersection = len(tokens_a & tokens_b) - union = len(tokens_a | tokens_b) - if union == 0: - return 0.0 - return intersection / union +# rapidfuzz ratios run 0-100; require a strong overlap before claiming a match. +_FUZZY_THRESHOLD = 85.0 + + +def _digit_runs(s: str) -> set[str]: + """Extract digit sequences (e.g. room/studio numbers) from a string. + + Numbers carry meaning that edit-distance smears over: 'studio 100' and + 'studio 200' are 90% similar as strings but are different rooms. Requiring + digit runs to match exactly before a fuzzy comparison keeps those apart. + """ + return set(re.findall(r"\d+", s)) class VenueEntry(BaseModel): @@ -77,12 +80,17 @@ def match(self, raw: str) -> Optional[ResolvedLocation]: Matching strategy: 1. Normalize input 2. Exact alias match (normalized) → return resolution - 3. Fuzzy token-overlap match (Jaccard >= 0.85 on whitespace-collapsed alias vs input) → return resolution + 3. Fuzzy match: edit-distance ratio >= threshold on the whitespace- + collapsed strings (so "Studio 100" and "Studio100" match), but only + when both sides have the *same* digit runs — so different room + numbers ("Studio 100" vs "Studio 200") never collide. 4. Otherwise None """ if not raw: return None + from rapidfuzz import fuzz + norm = _normalize(raw) # Step 1: exact match @@ -90,15 +98,15 @@ def match(self, raw: str) -> Optional[ResolvedLocation]: idx = self._exact[norm] return self.entries[idx].resolves_to - # Step 2: fuzzy — compare whitespace-collapsed input against stored collapsed aliases + # Step 2: fuzzy — edit-distance over whitespace-collapsed strings, gated + # on matching digit runs. Unlike the previous character-set Jaccard this + # respects order (anagrams no longer match) and room numbers. collapsed_input = re.sub(r"\s+", "", norm) + input_digits = _digit_runs(collapsed_input) for stored_collapsed, idx in self._fuzzy: - # Jaccard over character bigrams (or fallback to simple overlap ratio) - # Simple ratio: number of shared tokens / total tokens - # Build token sets by splitting on whitespace after collapsing - input_tokens = set(collapsed_input) - stored_tokens = set(stored_collapsed) - if _jaccard(input_tokens, stored_tokens) >= 0.85: + if _digit_runs(stored_collapsed) != input_digits: + continue + if fuzz.ratio(collapsed_input, stored_collapsed) >= _FUZZY_THRESHOLD: return self.entries[idx].resolves_to return None diff --git a/src/commutecompass/weather.py b/src/commutecompass/weather.py new file mode 100644 index 0000000..709cb93 --- /dev/null +++ b/src/commutecompass/weather.py @@ -0,0 +1,118 @@ +"""Weather-aware departure buffer via the Open-Meteo forecast API. + +A clear-sky commute and a snowy one need different head starts. When rain or +snow is likely around the time the user would leave, we pad the buffer so the +alarm fires earlier. Open-Meteo is free and keyless, so this needs no secret. +""" + +from __future__ import annotations + +import logging +from datetime import datetime +from typing import Any, NamedTuple, Optional + +import httpx + +from commutecompass.config import WeatherConfig +from commutecompass.retry import retry +from commutecompass.timeutil import to_nyc + +logger = logging.getLogger(__name__) + + +class WeatherBuffer(NamedTuple): + minutes: int + reason: Optional[str] # human-readable note, e.g. "rain", or None when clear + + +_CLEAR = WeatherBuffer(0, None) + + +def weather_buffer( + lat: float, + lon: float, + at_time: datetime, + config: WeatherConfig, + *, + fetcher: Optional[Any] = None, +) -> WeatherBuffer: + """Return extra buffer minutes (and a reason) for precipitation at ``at_time``. + + Returns ``WeatherBuffer(0, None)`` when disabled, on any fetch/parse error, + or when the forecast is clear — weather is an enhancement, never a reason to + fail a plan. ``fetcher`` is injectable for tests. + """ + if not config.enabled: + return _CLEAR + + fetch = fetcher or _fetch_forecast + try: + hourly = fetch(lat, lon, config.forecast_url) + except Exception as exc: + logger.debug("weather fetch failed (lat=%s lon=%s): %s", lat, lon, exc) + return _CLEAR + + return _buffer_from_hourly(hourly, at_time, config) + + +def _fetch_forecast(lat: float, lon: float, forecast_url: str) -> dict[str, Any]: + """Fetch hourly precipitation/snowfall from Open-Meteo for the next 2 days.""" + params = { + "latitude": f"{lat:.4f}", + "longitude": f"{lon:.4f}", + "hourly": "precipitation,precipitation_probability,snowfall", + "timezone": "America/New_York", + "forecast_days": "2", + } + + def _do() -> dict[str, Any]: + with httpx.Client(timeout=8.0) as client: + resp = client.get(forecast_url, params=params) + resp.raise_for_status() + data = resp.json() + hourly = data.get("hourly") if isinstance(data, dict) else None + return hourly if isinstance(hourly, dict) else {} + + return retry(_do, attempts=2, label="open-meteo") + + +def _buffer_from_hourly( + hourly: dict[str, Any], at_time: datetime, config: WeatherConfig +) -> WeatherBuffer: + """Pick the forecast hour matching ``at_time`` and derive a buffer.""" + times = hourly.get("time") + if not isinstance(times, list) or not times: + return _CLEAR + + # Open-Meteo hour stamps are local ISO strings like "2026-06-23T08:00". + target = to_nyc(at_time).strftime("%Y-%m-%dT%H:00") + try: + idx = times.index(target) + except ValueError: + return _CLEAR + + snowfall = _at(hourly.get("snowfall"), idx) + precip = _at(hourly.get("precipitation"), idx) + prob = _at(hourly.get("precipitation_probability"), idx) + + # Snow dominates — it slows every mode the most. + if snowfall is not None and snowfall > 0: + return WeatherBuffer(config.snow_buffer_minutes, "snow") + + likely = (prob is not None and prob >= config.precip_probability_threshold) or ( + precip is not None and precip > 0 + ) + if likely: + return WeatherBuffer(config.rain_buffer_minutes, "rain") + + return _CLEAR + + +def _at(series: Any, idx: int) -> Optional[float]: + """Safely read index ``idx`` from a forecast series, as a float.""" + if not isinstance(series, list) or idx >= len(series): + return None + value = series[idx] + if isinstance(value, (int, float)): + return float(value) + return None diff --git a/tests/test_cli.py b/tests/test_cli.py index 97089fb..840e08a 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -487,6 +487,18 @@ def test_set_disallowed_key_exits_nonzero( # Error message should name the allowlist assert "prep.prep_minutes" in result.output + def test_set_externally_scheduled_key_warns( + self, runner: CliRunner, tmp_path: Path + ) -> None: + """Setting an inert scheduling key still writes it but warns it does nothing.""" + p = self._toml_with_prep(tmp_path) + result = runner.invoke( + cli, ["--config", str(p), "config", "set", "scheduling.poll_interval_seconds", "30"] + ) + assert result.exit_code == 0, result.output + assert "poll_interval_seconds = 30" in p.read_text() + assert "not read at runtime" in result.output + # ─────────── adjust idempotency ──────────────────────────────────────────────── @@ -816,7 +828,7 @@ def test_snooze_shifts_prep_ping_forward( result = runner.invoke(cli, ["snooze", "evt-abc", "--minutes", "20"]) assert result.exit_code == 0, result.output - pending = store.pending_pings(now_nyc() + timedelta(hours=4)) + pending = store.pending_pings(now_nyc() + timedelta(hours=24)) prep = [p for p in pending if p.event_id == "evt-abc" and p.kind == "prep"] assert len(prep) == 1 # The new fire_at is ~20 minutes after the original (allowing ms drift). @@ -848,7 +860,7 @@ def test_snooze_skip_marks_ping_fired( result = runner.invoke(cli, ["snooze", "evt-abc", "--skip"]) assert result.exit_code == 0, result.output - pending = store.pending_pings(now_nyc() + timedelta(hours=4)) + pending = store.pending_pings(now_nyc() + timedelta(hours=24)) assert not any(p.event_id == "evt-abc" and p.kind == "prep" for p in pending) def test_snooze_requires_one_of_minutes_or_skip( @@ -897,7 +909,7 @@ def test_mute_event_sets_mute_and_cancels_pings( assert store.is_muted("evt-abc") is True from datetime import timedelta from commutecompass.timeutil import now_nyc - pending = store.pending_pings(now_nyc() + timedelta(hours=4)) + pending = store.pending_pings(now_nyc() + timedelta(hours=24)) assert not any(p.event_id == "evt-abc" for p in pending) def test_mute_today_mutes_all_plans( diff --git a/tests/test_format.py b/tests/test_format.py index 925066e..f2d3e8d 100644 --- a/tests/test_format.py +++ b/tests/test_format.py @@ -208,6 +208,28 @@ def test_format_digest_single_event(self) -> None: # Should start with today header assert "Today" in result + def test_format_digest_shows_weather_note(self) -> None: + """A weather buffer is surfaced as a per-event note in the digest.""" + event = make_event( + id="evt-wx", + title="Rehearsal", + calendar_name="Theatre", + start=datetime(2026, 5, 12, 9, 30, tzinfo=timezone.utc), + location="200 Example St", + location_value="200 Example St, New York, NY 10001", + ) + leg = TransitLeg( + mode="TRANSIT", system="MTA Subway", line="C", headsign="Fulton St", + depart_at=datetime(2026, 5, 12, 8, 15, tzinfo=timezone.utc), + arrive_at=datetime(2026, 5, 12, 9, 0, tzinfo=timezone.utc), + duration_seconds=2700, summary="C train", + ) + plan = make_plan(event, make_route([leg])).model_copy( + update={"weather_buffer_minutes": 15, "weather_reason": "rain"} + ) + result = format_digest([plan], []) + assert "15 min for rain" in result + def test_format_digest_multiple_events(self) -> None: """format_digest renders multiple plans with different calendars.""" event1 = make_event( @@ -1027,6 +1049,21 @@ def test_route_summary_omits_zero_transfer_text(self) -> None: summary = _route_summary(route) assert summary == "Subway (C) (48 min)" + def test_route_summary_marks_approximate_routes(self) -> None: + """An approximate (cached/estimated) route is labelled in the digest.""" + depart = datetime(2026, 5, 12, 8, 0, tzinfo=timezone.utc) + arrive = datetime(2026, 5, 12, 8, 48, tzinfo=timezone.utc) + legs = [ + TransitLeg( + mode="TRANSIT", system="MTA Subway", line="C", headsign="Fulton St", + depart_at=depart, arrive_at=arrive, + duration_seconds=2880, summary="C train", + ), + ] + route = Route(legs=legs, depart_at=depart, arrive_at=arrive, + total_duration_seconds=2880, transfers=0, approximate=True) + assert _route_summary(route) == "Subway (C) (48 min, estimated)" + def test_route_summary_includes_transfer_text_when_present(self) -> None: """_route_summary includes transfer text when transfers are present.""" depart = datetime(2026, 5, 12, 8, 0, tzinfo=timezone.utc) diff --git a/tests/test_jobs.py b/tests/test_jobs.py index 564be50..77f6df6 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -200,6 +200,67 @@ def sample_route() -> Route: # ─────────── Morning job tests ──────────────────────────────────────────────── +def test_morning_run_surfaces_calendar_auth_failure( + minimal_config: Config, + tmp_path: Path, +) -> None: + """An expired OAuth token must be reported in the digest, not silently empty.""" + from commutecompass.calendar_client import AuthError + + with patch("commutecompass.jobs.morning.CalendarClient") as mock_cal_class, patch( + "commutecompass.jobs.morning.fetch_alerts" + ) as mock_fetch_alerts, patch( + "commutecompass.jobs.morning.build_notifier" + ) as mock_notifier_class: + mock_cal = MagicMock() + mock_cal.fetch_events.side_effect = AuthError("token refresh failed") + mock_cal_class.return_value = mock_cal + mock_fetch_alerts.return_value = [] + mock_notifier = MagicMock() + mock_notifier.send.return_value = True + mock_notifier_class.return_value = mock_notifier + + morning_run(minimal_config) + + # Digest still sent, and it tells the user to re-auth. + mock_notifier.send.assert_called_once() + sent_text = mock_notifier.send.call_args.args[0] + assert "oauth" in sent_text.lower() + + +def _run_morning_capture_digest(config: Config) -> str: + with patch("commutecompass.jobs.morning.CalendarClient") as mock_cal_class, patch( + "commutecompass.jobs.morning.fetch_alerts" + ) as mock_fetch_alerts, patch( + "commutecompass.jobs.morning.build_notifier" + ) as mock_notifier_class: + mock_cal = MagicMock() + mock_cal.fetch_events.return_value = [] + mock_cal_class.return_value = mock_cal + mock_fetch_alerts.return_value = [] + mock_notifier = MagicMock() + mock_notifier.send.return_value = True + mock_notifier_class.return_value = mock_notifier + morning_run(config) + return str(mock_notifier.send.call_args.args[0]) + + +def test_morning_flags_dead_poll_timer(minimal_config: Config) -> None: + """A missing/stale poll heartbeat is surfaced in the digest.""" + # No poll heartbeat recorded → stale. + text = _run_morning_capture_digest(minimal_config) + assert "poll loop has not run" in text.lower() + + +def test_morning_silent_when_poll_heartbeat_fresh(minimal_config: Config) -> None: + """A fresh poll heartbeat means no dead-timer warning.""" + store = Store(minimal_config.paths.db_path) + store.init_schema() + store.record_job_success("poll", now_nyc()) + text = _run_morning_capture_digest(minimal_config) + assert "poll loop has not run" not in text.lower() + + def test_morning_run_fetches_and_plans( minimal_config: Config, tmp_path: Path, @@ -1754,68 +1815,169 @@ def planner(event: Event, **_kw: object) -> Plan: # ── Atomic claim semantics ──────────────────────────────────────────────────── -def test_poll_ping_marked_fired_even_when_primary_send_fails( - minimal_config: Config, - today_events: list[Event], - sample_route: Route, -) -> None: - """When the primary notifier returns False, the ping is still consumed. +class FlakyNotifier: + """Notifier that fails its first ``fail_times`` sends, then succeeds.""" - The claim-then-send contract is "one attempt per ping": a flaky network - must not turn a single missed send into a retry storm on every subsequent - poll cycle. - """ - now = now_nyc() + def __init__(self, fail_times: int) -> None: + self.sent: list[str] = [] + self._fail_times = fail_times + def send(self, text: str) -> bool: + self.sent.append(text) + ok = len(self.sent) > self._fail_times + return ok + + +def _seed_due_leave_plan( + config: Config, + event: Event, + route: Route, + now: "datetime", + *, + fire_offset_minutes: int = 5, + kind: str = "leave", +) -> Store: leave_at = (now + timedelta(hours=3)) - timedelta(minutes=45) plan = Plan( - event=today_events[0], - route=sample_route, + event=event, + route=route, leave_at=leave_at, prep_at=leave_at - timedelta(minutes=20), ) - due_ping = PingEntry( - id="ping-one-shot", - event_id=today_events[0].id, - kind="leave", - fire_at=now - timedelta(minutes=5), - fired=False, - message="leave now", - ) - - store = Store(minimal_config.paths.db_path) + store = Store(config.paths.db_path) store.init_schema() store.upsert_plan(plan) - store.schedule_ping(due_ping) + store.schedule_ping( + PingEntry( + id="ping-flaky", + event_id=event.id, + kind=cast("Any", kind), + fire_at=now - timedelta(minutes=fire_offset_minutes), + fired=False, + message="leave now", + ) + ) + return store - failing = SpyNotifier(return_value=False) +def _poll_at(config: Config, store: Store, plan: Plan, notifier: object, at: "datetime") -> None: poll_run( - minimal_config, + config, store=store, fetch_alerts_fn=lambda **kw: [], alerts_affecting_route_fn=lambda *a, **kw: [], - notifier=cast(TelegramNotifier, failing), + notifier=cast(TelegramNotifier, notifier), plan_event_fn=MockPlanner(plan), - now_fn=lambda: now, + now_fn=lambda: at, ha_fetch_fn=lambda *a, **kw: None, ) - # The notifier was invoked exactly once with the message. + +def test_poll_releases_leave_ping_for_retry_when_send_fails( + minimal_config: Config, + today_events: list[Event], + sample_route: Route, +) -> None: + """A leave ping whose send fails is handed back so the next poll retries. + + Losing the one notification that matters is worse than a second attempt: an + actionable ping within its grace window is re-fired on the following poll. + """ + now = now_nyc() + store = _seed_due_leave_plan(minimal_config, today_events[0], sample_route, now) + plan = store.get_plan(today_events[0].id) + assert plan is not None + + failing = SpyNotifier(return_value=False) + _poll_at(minimal_config, store, plan, failing, now) assert failing.sent == ["leave now"] + # Released back to the pending pool with the attempt recorded. + pending = store.pending_pings(before=now + timedelta(minutes=1)) + assert [p.id for p in pending] == ["ping-flaky"] + assert pending[0].send_attempts == 1 - # A second poll a minute later must NOT retry: claim already consumed. - poll_run( - minimal_config, - store=store, - fetch_alerts_fn=lambda **kw: [], - alerts_affecting_route_fn=lambda *a, **kw: [], - notifier=cast(TelegramNotifier, failing), - plan_event_fn=MockPlanner(plan), - now_fn=lambda: now + timedelta(minutes=1), - ha_fetch_fn=lambda *a, **kw: None, + # A minute later (still inside the grace window) it retries. + _poll_at(minimal_config, store, plan, failing, now + timedelta(minutes=1)) + assert failing.sent == ["leave now", "leave now"] + + +def test_poll_stops_retrying_once_send_succeeds( + minimal_config: Config, + today_events: list[Event], + sample_route: Route, +) -> None: + now = now_nyc() + store = _seed_due_leave_plan(minimal_config, today_events[0], sample_route, now) + plan = store.get_plan(today_events[0].id) + assert plan is not None + + flaky = FlakyNotifier(fail_times=1) # first send fails, second succeeds + _poll_at(minimal_config, store, plan, flaky, now) + _poll_at(minimal_config, store, plan, flaky, now + timedelta(minutes=1)) + assert flaky.sent == ["leave now", "leave now"] + + # Now fired for good — a third poll does not send again. + _poll_at(minimal_config, store, plan, flaky, now + timedelta(minutes=2)) + assert flaky.sent == ["leave now", "leave now"] + assert store.pending_pings(before=now + timedelta(minutes=5)) == [] + + +def test_poll_gives_up_after_attempt_cap( + minimal_config: Config, + today_events: list[Event], + sample_route: Route, +) -> None: + """A persistently-broken notifier must not retry forever (no storm).""" + now = now_nyc() + store = _seed_due_leave_plan(minimal_config, today_events[0], sample_route, now) + plan = store.get_plan(today_events[0].id) + assert plan is not None + + failing = SpyNotifier(return_value=False) + # Poll once per minute; after _MAX_SEND_ATTEMPTS the row is abandoned. + for i in range(8): + _poll_at(minimal_config, store, plan, failing, now + timedelta(minutes=i)) + + from commutecompass.jobs.poll import _MAX_SEND_ATTEMPTS + + assert len(failing.sent) == _MAX_SEND_ATTEMPTS + assert store.pending_pings(before=now + timedelta(minutes=10)) == [] + + +def test_poll_records_heartbeat( + minimal_config: Config, + today_events: list[Event], + sample_route: Route, +) -> None: + """Every poll run records a 'poll' heartbeat for the dead-man's-switch.""" + now = now_nyc() + store = Store(minimal_config.paths.db_path) + store.init_schema() + assert store.get_job_heartbeat("poll") is None + + _poll_at(minimal_config, store, Plan(event=today_events[0]), SpyNotifier(), now) + assert store.get_job_heartbeat("poll") == now + + +def test_poll_does_not_retry_stale_leave_ping( + minimal_config: Config, + today_events: list[Event], + sample_route: Route, +) -> None: + """Outside the grace window a failed leave send is abandoned, not re-fired.""" + now = now_nyc() + # fire_at is well past the grace window already. + store = _seed_due_leave_plan( + minimal_config, today_events[0], sample_route, now, fire_offset_minutes=30 ) + plan = store.get_plan(today_events[0].id) + assert plan is not None + + failing = SpyNotifier(return_value=False) + _poll_at(minimal_config, store, plan, failing, now) assert failing.sent == ["leave now"] + # Stale: consumed, not released. + assert store.pending_pings(before=now + timedelta(minutes=1)) == [] def test_poll_quiet_hours_leaves_unclaimed_for_later( diff --git a/tests/test_llm.py b/tests/test_llm.py index 3af6a54..a957da8 100644 --- a/tests/test_llm.py +++ b/tests/test_llm.py @@ -74,6 +74,23 @@ def test_unknown_kind_returns_none(self) -> None: ) assert result is None + def test_transient_failure_is_retried_then_succeeds(self) -> None: + """A single transient blip must not lose the resolution (retry recovers).""" + with patch("commutecompass.llm.httpx.Client") as mock_client_cls, patch( + "commutecompass.retry.time.sleep" + ): + mock_instance = mock_client_cls.return_value.__enter__.return_value + mock_instance.post.side_effect = [ + httpx.TimeoutException("blip"), + _make_response('{"kind": "address", "value": "200 Example St, NY"}'), + ] + client = _make_client() + result = client.resolve_location("200 Example St", {}) + + assert result is not None + assert result.value == "200 Example St, NY" + assert mock_instance.post.call_count == 2 + def test_fenced_json_parsed_correctly(self) -> None: fenced = """```json {"kind": "address", "value": "200 W 41st St, New York, NY 10036"} diff --git a/tests/test_monitoring.py b/tests/test_monitoring.py new file mode 100644 index 0000000..8734caa --- /dev/null +++ b/tests/test_monitoring.py @@ -0,0 +1,36 @@ +"""Tests for the heartbeat dead-man's-switch.""" + +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import httpx + +from commutecompass.monitoring import ping_heartbeat + + +def _response(status: int) -> httpx.Response: + return httpx.Response(status, request=MagicMock(spec=httpx.Request)) + + +def test_ping_heartbeat_success() -> None: + with patch("commutecompass.monitoring.httpx.Client") as mock_cls: + inst = mock_cls.return_value.__enter__.return_value + inst.get.return_value = _response(200) + assert ping_heartbeat("https://hc-ping.example/abc") is True + + +def test_ping_heartbeat_empty_url_is_noop() -> None: + with patch("commutecompass.monitoring.httpx.Client") as mock_cls: + assert ping_heartbeat("") is False + mock_cls.assert_not_called() + + +def test_ping_heartbeat_swallows_failure() -> None: + """A monitoring blip must never raise into the calling job.""" + with patch("commutecompass.monitoring.httpx.Client") as mock_cls, patch( + "commutecompass.retry.time.sleep" + ): + inst = mock_cls.return_value.__enter__.return_value + inst.get.side_effect = httpx.ConnectError("down") + assert ping_heartbeat("https://hc-ping.example/abc") is False diff --git a/tests/test_mta.py b/tests/test_mta.py index b8c456a..8afa3fb 100644 --- a/tests/test_mta.py +++ b/tests/test_mta.py @@ -129,9 +129,11 @@ def test_parses_valid_protobuf_fixture(self) -> None: # Subway feed alerts are first (system = "MTA Subway") subway_alerts = [a for a in alerts if a.id.startswith("MTA Subway")] assert len(subway_alerts) == 2, f"Expected 2 subway alerts, got {len(subway_alerts)}: {subway_alerts}" - ids = {a.id for a in subway_alerts} - assert any("C" in id_ for id_ in ids) - assert any("A" in id_ for id_ in ids) + # The two subway alerts cover the C and A lines (check the structured + # field, not the id string, which now derives from the feed entity id). + all_routes = set().union(*(a.affected_routes for a in subway_alerts)) + assert "C" in all_routes + assert "A" in all_routes def test_filters_entities_without_alerts(self) -> None: """Feed entities without alert payload are ignored.""" @@ -170,6 +172,48 @@ def test_filters_entities_without_alerts(self) -> None: assert alerts == [] + def test_distinct_alerts_same_route_and_time_get_distinct_ids(self) -> None: + """Two different alerts on the same route/time must not collapse to one id. + + Exercises the derived-id fallback (feed omits entity ids): the alert text + is hashed into the id so the ledger doesn't suppress the second alert. + """ + from google.transit.gtfs_realtime_pb2 import FeedEntity, FeedMessage + + start = int(time.time()) + + def _make_entity(header: str) -> FeedEntity: + ent = FeedEntity() + ent.id = "" # force the derived-id fallback + informed = ent.alert.informed_entity.add() + informed.route_id = "C" + period = ent.alert.active_period.add() + period.start = start + tr = ent.alert.header_text.translation.add() + tr.text = header + return ent + + feed = FeedMessage() + feed.header.gtfs_realtime_version = "2.0" + feed.header.timestamp = start + feed.entity.append(_make_entity("Signal problems at Jay St")) + feed.entity.append(_make_entity("Sick passenger at Hoyt St")) + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.content = feed.SerializeToString() + mock_client = MagicMock() + mock_client.get.return_value = mock_response + mock_client.__enter__ = MagicMock(return_value=mock_client) + mock_client.__exit__ = MagicMock(return_value=None) + + with patch("commutecompass.mta.httpx.Client", MagicMock(return_value=mock_client)): + alerts = fetch_alerts("u", "", "", client=mock_client) + + subway = [a for a in alerts if a.id.startswith("MTA Subway")] + assert len(subway) == 2 + assert subway[0].id != subway[1].id + def test_url_construction_with_empty_strings(self) -> None: """Empty strings fall back to canonical MTA URLs.""" with patch("commutecompass.mta.httpx.Client") as mock_client_cls: @@ -591,6 +635,28 @@ def test_line_substring_match(self) -> None: # "C" does not contain "ABC" and "ABC" does not contain "C" — no match assert _systems_lines_overlap(alert, route) is False + def test_line_no_substring_overmatch(self) -> None: + """Affected route '1' must NOT match bus line 'B41' (substring regression).""" + now = make_aware(datetime.now(NYC_TZ)) + alert = alert_with_period( + "x", {"1"}, {"MTA Bus"}, + now - timedelta(hours=1), now + timedelta(hours=1), + ) + leg = subway_leg("B41", system="MTA Bus") + route = sample_route([leg]) + assert _systems_lines_overlap(alert, route) is False + + def test_line_matches_decorated_token(self) -> None: + """Affected route 'C' matches a decorated line like 'C-local'.""" + now = make_aware(datetime.now(NYC_TZ)) + alert = alert_with_period( + "x", {"C"}, {"MTA Subway"}, + now - timedelta(hours=1), now + timedelta(hours=1), + ) + leg = subway_leg("C-local") + route = sample_route([leg]) + assert _systems_lines_overlap(alert, route) is True + def test_wildcard_affected_routes(self) -> None: """Wildcard in affected_routes matches any line in system.""" now = make_aware(datetime.now(NYC_TZ)) @@ -726,13 +792,33 @@ def test_extracts_line_ids_and_stops(self) -> None: mode="TRANSIT", system="MTA Subway", line="C", headsign="Fulton St", depart_at=now, arrive_at=now + timedelta(minutes=30), duration_seconds=1800, summary="C from Jay St-MetroTech to Fulton St", + departure_stop="Jay St-MetroTech", arrival_stop="Fulton St", ), ] route = sample_route(legs) stop_names, line_ids = _build_route_context(route) assert "fulton st" in stop_names + assert "jay st-metrotech" in stop_names # whole structured stop name + assert "metrotech" in stop_names # word token assert "c" in line_ids + def test_stops_with_to_and_and_in_name_are_intact(self) -> None: + """Structured stops avoid the summary round-trip that split on to/and.""" + now = make_aware(datetime.now(NYC_TZ)) + legs = [ + TransitLeg( + mode="TRANSIT", system="LIRR", line="Babylon", + depart_at=now, arrive_at=now + timedelta(minutes=40), + duration_seconds=2400, summary="Babylon from Atlantic Terminal to Wantagh", + departure_stop="Atlantic Terminal", + arrival_stop="Forest Hills and Kew Gardens", + ), + ] + stop_names, _ = _build_route_context(sample_route(legs)) + # The whole name survives intact rather than being shredded on " and ". + assert "forest hills and kew gardens" in stop_names + assert "atlantic terminal" in stop_names + def test_empty_route(self) -> None: route = Route(legs=[], depart_at=make_aware(datetime.now(NYC_TZ)), arrive_at=make_aware(datetime.now(NYC_TZ)), total_duration_seconds=0) diff --git a/tests/test_planner.py b/tests/test_planner.py index 9dfdbe2..e8b8e18 100644 --- a/tests/test_planner.py +++ b/tests/test_planner.py @@ -255,9 +255,12 @@ def test_plan_event_no_route( resolved_location: ResolvedLocation, nyc_now: datetime, ) -> None: - """Returns error='no_route' when plan_route returns None.""" + """Returns error='no_route' only when live routing AND every fallback fail.""" + store = MagicMock() + store.get_cached_route.return_value = None with patch("commutecompass.resolver.resolve") as mock_resolve, \ - patch("commutecompass.routing.plan_route") as mock_plan_route: + patch("commutecompass.routing.plan_route") as mock_plan_route, \ + patch("commutecompass.routing.estimate_route", return_value=None): mock_resolve.return_value = resolved_location mock_plan_route.return_value = None @@ -265,7 +268,7 @@ def test_plan_event_no_route( event, config, MagicMock(spec=VenueRegistry), - MagicMock(), + store, MagicMock(spec=OpencodeGoClient), ) @@ -275,6 +278,124 @@ def test_plan_event_no_route( assert result.prep_at is None +def test_plan_event_reuses_cached_route_when_live_routing_down( + event: Event, + config: Config, + resolved_location: ResolvedLocation, + mock_route: Route, + nyc_now: datetime, +) -> None: + """When the Directions call fails, the last good cached route still plans the day.""" + store = MagicMock() + store.get_cached_route.return_value = mock_route.model_copy() + with patch("commutecompass.resolver.resolve") as mock_resolve, \ + patch("commutecompass.routing.plan_route", return_value=None), \ + patch("commutecompass.planner.now_nyc", return_value=nyc_now): + mock_resolve.return_value = resolved_location + + result = plan_event( + event, config, MagicMock(spec=VenueRegistry), store, MagicMock(spec=OpencodeGoClient) + ) + + assert result.error is None + assert result.route is not None + assert result.route.approximate is True + assert result.leave_at is not None + store.get_cached_route.assert_called_once() + + +def test_plan_event_falls_back_to_distance_estimate( + event: Event, + config: Config, + resolved_location: ResolvedLocation, + nyc_now: datetime, +) -> None: + """No live route and no cache → a coarse distance estimate keeps alarms alive.""" + store = MagicMock() + store.get_cached_route.return_value = None + with patch("commutecompass.resolver.resolve") as mock_resolve, \ + patch("commutecompass.routing.plan_route", return_value=None), \ + patch("commutecompass.planner.now_nyc", return_value=nyc_now): + mock_resolve.return_value = resolved_location # has lat/lon + + result = plan_event( + event, config, MagicMock(spec=VenueRegistry), store, MagicMock(spec=OpencodeGoClient) + ) + + assert result.error is None + assert result.route is not None + assert result.route.approximate is True + assert result.route.total_duration_seconds > 0 + + +def test_plan_event_applies_weather_buffer( + event: Event, + config: Config, + resolved_location: ResolvedLocation, + mock_route: Route, + nyc_now: datetime, +) -> None: + """When weather is enabled and rain is forecast, leave_at moves earlier.""" + config.weather.enabled = True + config.weather.rain_buffer_minutes = 15 + store = MagicMock() + with patch("commutecompass.resolver.resolve", return_value=resolved_location), \ + patch("commutecompass.routing.plan_route", return_value=mock_route), \ + patch("commutecompass.weather._fetch_forecast") as mock_fetch, \ + patch("commutecompass.planner.now_nyc", return_value=nyc_now): + # Forecast hour matching event.start (14:00) shows likely rain. + hour = event.start.strftime("%Y-%m-%dT%H:00") + mock_fetch.return_value = { + "time": [hour], + "precipitation": [1.0], + "precipitation_probability": [90], + "snowfall": [0.0], + } + with_weather = plan_event( + event, config, MagicMock(spec=VenueRegistry), store, MagicMock(spec=OpencodeGoClient) + ) + + config.weather.enabled = False + with patch("commutecompass.resolver.resolve", return_value=resolved_location), \ + patch("commutecompass.routing.plan_route", return_value=mock_route), \ + patch("commutecompass.planner.now_nyc", return_value=nyc_now): + without_weather = plan_event( + event, config, MagicMock(spec=VenueRegistry), store, MagicMock(spec=OpencodeGoClient) + ) + + assert with_weather.weather_buffer_minutes == 15 + assert with_weather.weather_reason == "rain" + assert without_weather.weather_buffer_minutes == 0 + # 15 extra buffer minutes ⇒ leave 15 minutes earlier. + assert with_weather.leave_at is not None and without_weather.leave_at is not None + delta = (without_weather.leave_at - with_weather.leave_at).total_seconds() / 60 + assert 14.5 < delta < 15.5 + + +def test_plan_event_caches_live_route( + event: Event, + config: Config, + resolved_location: ResolvedLocation, + mock_route: Route, + nyc_now: datetime, +) -> None: + """A successful live route is written to the cache for future fallback use.""" + store = MagicMock() + with patch("commutecompass.resolver.resolve") as mock_resolve, \ + patch("commutecompass.routing.plan_route", return_value=mock_route), \ + patch("commutecompass.planner.now_nyc", return_value=nyc_now): + mock_resolve.return_value = resolved_location + + result = plan_event( + event, config, MagicMock(spec=VenueRegistry), store, MagicMock(spec=OpencodeGoClient) + ) + + assert result.error is None + assert result.route is not None + assert result.route.approximate is False + store.cache_route.assert_called_once() + + def test_plan_event_mode_override( event: Event, config: Config, diff --git a/tests/test_routing.py b/tests/test_routing.py index e97df69..15fa42d 100644 --- a/tests/test_routing.py +++ b/tests/test_routing.py @@ -10,10 +10,113 @@ import pytest from commutecompass.models import Origin, ResolvedLocation, Route -from commutecompass.routing import _parse_route, _unix, plan_route +from commutecompass.routing import ( + _parse_route, + _unix, + estimate_route, + plan_route, + route_cache_key, +) from commutecompass.timeutil import NYC_TZ +# ─── Transfer counting ───────────────────────────────────────────────────────── + +def _transit_step(line: str, dur: int = 1500) -> dict[str, Any]: + return { + "travel_mode": "TRANSIT", + "duration": {"value": dur}, + "transit_details": { + "line": {"short_name": line, "vehicle": {"type": "SUBWAY"}}, + "departure_stop": {"name": "A"}, + "arrival_stop": {"name": "B"}, + }, + } + + +def _walk_step(dur: int = 300) -> dict[str, Any]: + return {"travel_mode": "WALKING", "duration": {"value": dur}} + + +def _route_from_steps(steps: list[dict[str, Any]]) -> Route: + resp = { + "status": "OK", + "routes": [ + { + "legs": [ + { + "departure_time": {"value": 1000}, + "arrival_time": {"value": 5000}, + "duration": {"value": 4000}, + "steps": steps, + } + ] + } + ], + } + route = _parse_route(resp) + assert route is not None + return route + + +def test_transfer_count_includes_walking_transfer() -> None: + """A walk between two trains is still one transfer, not zero.""" + route = _route_from_steps( + [_walk_step(), _transit_step("A"), _walk_step(), _transit_step("C"), _walk_step()] + ) + assert route.transfers == 1 + + +def test_transfer_count_single_train_is_zero() -> None: + route = _route_from_steps([_walk_step(), _transit_step("A"), _walk_step()]) + assert route.transfers == 0 + + +# ─── Fallback estimate ───────────────────────────────────────────────────────── + +def test_route_cache_key_rounds_coordinates() -> None: + a = Origin(address="a", lat=40.69501, lon=-73.98904) + b = Origin(address="b", lat=40.69499, lon=-73.98897) # within ~11m + assert route_cache_key(a) == route_cache_key(b) + far = Origin(address="c", lat=40.75, lon=-73.99) + assert route_cache_key(far) != route_cache_key(a) + + +def test_estimate_route_produces_approximate_route() -> None: + origin = Origin(address="home", lat=40.6950, lon=-73.9890) + dest = ResolvedLocation( + kind="address", value="Midtown", lat=40.7549, lon=-73.9840, source="geocode" + ) + arrival = datetime(2026, 5, 8, 14, 30, tzinfo=NYC_TZ) + + route = estimate_route(origin, dest, arrival, "transit") + assert route is not None + assert route.approximate is True + assert route.total_duration_seconds > 0 + # arrive_at is the requested time; depart_at precedes it by the estimate. + assert route.arrive_at == arrival + assert route.depart_at < arrival + + +def test_estimate_route_none_without_destination_coords() -> None: + origin = Origin(address="home", lat=40.6950, lon=-73.9890) + dest = ResolvedLocation(kind="station", value="Somewhere LIRR", source="llm") + arrival = datetime(2026, 5, 8, 14, 30, tzinfo=NYC_TZ) + assert estimate_route(origin, dest, arrival, "transit") is None + + +def test_estimate_route_slower_modes_take_longer() -> None: + origin = Origin(address="home", lat=40.6950, lon=-73.9890) + dest = ResolvedLocation( + kind="address", value="Midtown", lat=40.7549, lon=-73.9840, source="geocode" + ) + arrival = datetime(2026, 5, 8, 14, 30, tzinfo=NYC_TZ) + walking = estimate_route(origin, dest, arrival, "walking") + driving = estimate_route(origin, dest, arrival, "driving") + assert walking is not None and driving is not None + assert walking.total_duration_seconds > driving.total_duration_seconds + + # ─── Helper fixtures ─────────────────────────────────────────────────────────── @pytest.fixture diff --git a/tests/test_store.py b/tests/test_store.py index 1236837..68583fd 100644 --- a/tests/test_store.py +++ b/tests/test_store.py @@ -529,6 +529,81 @@ def test_get_geocode_miss(tmp_db_path: Path) -> None: assert store.get_geocode("never-cached-address") is None +# ── Route cache tests ─────────────────────────────────────────────────────────── + +def test_cache_route_round_trip(tmp_db_path: Path) -> None: + """A cached route is retrievable for the same (origin, dest, mode) triple.""" + store = Store(tmp_db_path) + store.init_schema() + route = make_route(datetime(2026, 5, 8, 14, 0, tzinfo=timezone.utc)) + + store.cache_route("40.6950,-73.9890", "Midtown", "transit", route) + got = store.get_cached_route("40.6950,-73.9890", "Midtown", "transit") + assert got is not None + assert got.total_duration_seconds == route.total_duration_seconds + # Different mode / dest / origin are cache misses. + assert store.get_cached_route("40.6950,-73.9890", "Midtown", "driving") is None + assert store.get_cached_route("40.6950,-73.9890", "Brooklyn", "transit") is None + + +def test_cache_route_keeps_latest(tmp_db_path: Path) -> None: + """Re-caching the same triple overwrites the previous route.""" + store = Store(tmp_db_path) + store.init_schema() + older = make_route(datetime(2026, 5, 8, 14, 0, tzinfo=timezone.utc)) + older.total_duration_seconds = 1000 + newer = make_route(datetime(2026, 5, 8, 14, 0, tzinfo=timezone.utc)) + newer.total_duration_seconds = 2000 + + store.cache_route("k", "d", "transit", older) + store.cache_route("k", "d", "transit", newer) + got = store.get_cached_route("k", "d", "transit") + assert got is not None + assert got.total_duration_seconds == 2000 + + +def test_get_cached_route_respects_max_age(tmp_db_path: Path) -> None: + """A stale cached route past max_age is treated as a miss.""" + store = Store(tmp_db_path) + store.init_schema() + route = make_route(datetime(2026, 5, 8, 14, 0, tzinfo=timezone.utc)) + store.cache_route("k", "d", "transit", route) + assert store.get_cached_route("k", "d", "transit", max_age_days=0) is None + + +# ── Schema version tests ──────────────────────────────────────────────────────── + +def test_schema_version_stamped(tmp_db_path: Path) -> None: + """init_schema stamps the current schema version in PRAGMA user_version.""" + from commutecompass.store import SCHEMA_VERSION + + store = Store(tmp_db_path) + assert store.schema_version() == 0 # fresh db, before init + store.init_schema() + assert store.schema_version() == SCHEMA_VERSION + # Idempotent re-init keeps the version stable. + store.init_schema() + assert store.schema_version() == SCHEMA_VERSION + + +# ── Job heartbeat tests ───────────────────────────────────────────────────────── + +def test_job_heartbeat_round_trip(tmp_db_path: Path) -> None: + """record_job_success / get_job_heartbeat round-trip; latest wins.""" + store = Store(tmp_db_path) + store.init_schema() + assert store.get_job_heartbeat("poll") is None + + t1 = datetime(2026, 5, 8, 6, 0, tzinfo=timezone.utc) + t2 = datetime(2026, 5, 8, 6, 1, tzinfo=timezone.utc) + store.record_job_success("poll", t1) + assert store.get_job_heartbeat("poll") == t1 + store.record_job_success("poll", t2) + assert store.get_job_heartbeat("poll") == t2 + # Distinct jobs are tracked independently. + assert store.get_job_heartbeat("morning") is None + + # ── Alert ledger tests ───────────────────────────────────────────────────────── def test_mark_alert_seen_and_is_alert_seen(tmp_db_path: Path) -> None: diff --git a/tests/test_venues.py b/tests/test_venues.py index 72d87c7..b34e8fc 100644 --- a/tests/test_venues.py +++ b/tests/test_venues.py @@ -71,6 +71,28 @@ def test_fuzzy_match_theater() -> None: assert result.kind == "station" +def test_fuzzy_match_collapsed_whitespace_variant() -> None: + """'Studio100' (no space) fuzzy-matches the 'Studio 100' alias.""" + registry = VenueRegistry.load(FIXTURE_PATH) + result = registry.match("Studio100") + assert result is not None + assert result.value == "200 Example St, New York, NY 10001" + + +def test_fuzzy_does_not_match_different_room_number() -> None: + """'Studio 200' must NOT match 'Studio 100' — different rooms (regression).""" + registry = VenueRegistry.load(FIXTURE_PATH) + assert registry.match("Studio 200") is None + assert registry.match("Studio200") is None + + +def test_fuzzy_does_not_match_anagram() -> None: + """A character anagram must not match (the old char-set Jaccard bug).""" + registry = VenueRegistry.load(FIXTURE_PATH) + # "loohcs elpmaxe" has the same characters as "example school" reversed. + assert registry.match("loohcs elpmaxe") is None + + def test_no_match_returns_none() -> None: """Unknown venue returns None.""" registry = VenueRegistry.load(FIXTURE_PATH) diff --git a/tests/test_weather.py b/tests/test_weather.py new file mode 100644 index 0000000..2ee111a --- /dev/null +++ b/tests/test_weather.py @@ -0,0 +1,79 @@ +"""Tests for the weather-aware buffer.""" + +from __future__ import annotations + +from datetime import datetime +from typing import Any + +from commutecompass.config import WeatherConfig +from commutecompass.timeutil import NYC_TZ +from commutecompass.weather import weather_buffer + + +AT = datetime(2026, 5, 8, 14, 30, tzinfo=NYC_TZ) # → forecast hour "...T14:00" + + +def _fetcher(hourly: dict[str, Any]) -> Any: + return lambda lat, lon, url: hourly + + +def _hourly(*, precip: float, prob: int, snow: float) -> dict[str, Any]: + return { + "time": ["2026-05-08T13:00", "2026-05-08T14:00"], + "precipitation": [0.0, precip], + "precipitation_probability": [5, prob], + "snowfall": [0.0, snow], + } + + +def _raises(*a: Any, **k: Any) -> Any: + raise AssertionError("fetcher should not be called") + + +def _boom(*a: Any, **k: Any) -> Any: + raise RuntimeError("network down") + + +def test_disabled_returns_zero() -> None: + cfg = WeatherConfig(enabled=False) + # Fetcher would raise if called — disabled must short-circuit. + assert weather_buffer(40.7, -74.0, AT, cfg, fetcher=_raises) == (0, None) + + +def test_clear_forecast_returns_zero() -> None: + cfg = WeatherConfig(enabled=True) + hourly = _hourly(precip=0.0, prob=5, snow=0.0) + assert weather_buffer(40.7, -74.0, AT, cfg, fetcher=_fetcher(hourly)) == (0, None) + + +def test_rain_by_probability() -> None: + cfg = WeatherConfig(enabled=True, rain_buffer_minutes=12, precip_probability_threshold=50) + hourly = _hourly(precip=0.0, prob=80, snow=0.0) + buf = weather_buffer(40.7, -74.0, AT, cfg, fetcher=_fetcher(hourly)) + assert buf.minutes == 12 + assert buf.reason == "rain" + + +def test_rain_below_threshold_is_clear() -> None: + cfg = WeatherConfig(enabled=True, precip_probability_threshold=50) + hourly = _hourly(precip=0.0, prob=30, snow=0.0) + assert weather_buffer(40.7, -74.0, AT, cfg, fetcher=_fetcher(hourly)).minutes == 0 + + +def test_snow_takes_priority_and_uses_snow_buffer() -> None: + cfg = WeatherConfig(enabled=True, rain_buffer_minutes=10, snow_buffer_minutes=25) + hourly = _hourly(precip=2.0, prob=90, snow=1.5) + buf = weather_buffer(40.7, -74.0, AT, cfg, fetcher=_fetcher(hourly)) + assert buf.minutes == 25 + assert buf.reason == "snow" + + +def test_fetch_error_swallowed() -> None: + cfg = WeatherConfig(enabled=True) + assert weather_buffer(40.7, -74.0, AT, cfg, fetcher=_boom) == (0, None) + + +def test_missing_hour_returns_zero() -> None: + cfg = WeatherConfig(enabled=True) + hourly = {"time": ["2026-05-08T09:00"], "precipitation": [9.0], "snowfall": [0.0]} + assert weather_buffer(40.7, -74.0, AT, cfg, fetcher=_fetcher(hourly)).minutes == 0