diff --git a/CHANGELOG.md b/CHANGELOG.md index 70afc23..6358d4a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,21 @@ and this project adheres to semantic versioning once public releases begin. ### Added +- Machine-readable run progress for long commands (Ticket 106). `sample`, + `propagate`, and `batch` can now stream structured JSONL progress so a + non-interactive worker can show live progress instead of a flat "running" + until exit. Two opt-in flags select it: `--progress-format jsonl` writes one + compact JSON record per interval to stderr, and `--progress-file PATH` writes + the same stream to a sidecar file. Each record is + `{"event":"progress","command":...,"completed":...,"total":...,"elapsed_s":...}` + with monotonically increasing `completed` and a guaranteed final record where + `completed == total` (sample count for `sample`/`propagate`, run count for + `batch`); `elapsed_s` is monotonic wall-clock. Progress is a stderr/sidecar + side-channel only — it never appears in the `--output` stream, adds no schema + or envelope version, and leaves the result envelope, deterministic results, + and exit codes unchanged. Off by default: a run with no progress flag behaves + byte-for-byte as before. A progress callback is threaded through + `run_monte_carlo`, `run_stochastic_propagation`, and `run_batch_manifest`. - Contract-version discovery command (Ticket 105). A new read-only `bvlos-sim schema-versions` command (alias `contracts`) prints the resolved `tool_version` plus every supported output-envelope and input-schema version as canonical JSON diff --git a/adapters/batch_support.py b/adapters/batch_support.py index 9132024..5acfc7c 100644 --- a/adapters/batch_support.py +++ b/adapters/batch_support.py @@ -1,5 +1,6 @@ """Batch estimate execution support for CLI and tests.""" +from collections.abc import Callable from dataclasses import dataclass from io import StringIO @@ -117,10 +118,15 @@ def _run_estimate(run: BatchRun) -> BatchRunResult: ) -def run_batch_manifest(manifest: BatchManifest) -> list[BatchRunResult]: +def run_batch_manifest( + manifest: BatchManifest, + *, + progress: Callable[[int, int], None] | None = None, +) -> list[BatchRunResult]: """Run all estimates in a validated batch manifest.""" results: list[BatchRunResult] = [] - for run in manifest.runs: + total = len(manifest.runs) + for index, run in enumerate(manifest.runs): try: results.append(_run_estimate(run)) except _BATCH_RUN_INPUT_ERRORS as exc: @@ -134,6 +140,8 @@ def run_batch_manifest(manifest: BatchManifest) -> list[BatchRunResult]: error_message=str(exc), ) ) + if progress is not None: + progress(index + 1, total) return results @@ -170,7 +178,11 @@ def render_batch_csv(results: list[BatchRunResult]) -> str: """Render the batch result table as CSV (suitable for import into spreadsheets).""" rows = ["id,status,reserve_margin_percent,flight_time_s,warning_count"] for r in results: - reserve = "" if r.reserve_margin_percent is None else f"{r.reserve_margin_percent:.2f}" + reserve = ( + "" + if r.reserve_margin_percent is None + else f"{r.reserve_margin_percent:.2f}" + ) flight_time = "" if r.flight_time_s is None else f"{r.flight_time_s:.1f}" rows.append(f"{r.id},{r.status},{reserve},{flight_time},{r.warning_count}") return "\n".join(rows) + "\n" diff --git a/adapters/cli.py b/adapters/cli.py index 53cda55..a466464 100644 --- a/adapters/cli.py +++ b/adapters/cli.py @@ -56,6 +56,11 @@ class SoraOutputFormat(StrEnum): MARKDOWN = "markdown" +class ProgressFormat(StrEnum): + NONE = "none" + JSONL = "jsonl" + + _DOCUMENT_OUTPUT_FORMATS: dict[DocumentOutputFormat, OutputFormat] = { DocumentOutputFormat.JSON: OutputFormat.JSON, DocumentOutputFormat.MARKDOWN: OutputFormat.MARKDOWN, @@ -74,6 +79,7 @@ class SoraOutputFormat(StrEnum): "BatchOutputFormat", "CliExitCode", "DocumentOutputFormat", + "ProgressFormat", "ScenarioExitCode", "SoraOutputFormat", "SummaryOutputFormat", diff --git a/adapters/commands/batch.py b/adapters/commands/batch.py index 29724ad..dfea189 100644 --- a/adapters/commands/batch.py +++ b/adapters/commands/batch.py @@ -13,10 +13,15 @@ render_batch_table, run_batch_manifest, ) -from adapters.cli_batch_support import BatchOutputFormat, _batch_exit_code, write_batch_outputs +from adapters.cli_batch_support import ( + BatchOutputFormat, + _batch_exit_code, + write_batch_outputs, +) from adapters.cli_support import OutputWriteError, _write_output from adapters.envelope import OutputFormat from adapters.io import InputLoadError, load_mission, load_vehicle +from adapters.progress import progress_reporter BatchStdoutRenderer = Callable[[list[BatchRunResult]], str] @@ -25,7 +30,9 @@ BatchOutputFormat.CSV: render_batch_csv, } _BATCH_FILE_OUTPUT_FORMATS = frozenset( - output_format for output_format in BatchOutputFormat if output_format != BatchOutputFormat.CSV + output_format + for output_format in BatchOutputFormat + if output_format != BatchOutputFormat.CSV ) @@ -71,12 +78,26 @@ def _validate_batch_manifest(manifest: Path) -> None: def batch( manifest: Path = typer.Argument(..., exists=True, readable=True, resolve_path=True), - output_dir: Path | None = typer.Option(None, "--output-dir", help="Directory for per-run output files. Required when --format is not csv or summary."), + output_dir: Path | None = typer.Option( + None, + "--output-dir", + help="Directory for per-run output files. Required when --format is not csv or summary.", + ), format: BatchOutputFormat = typer.Option( BatchOutputFormat.SUMMARY, "--format", help="Stdout format. Use csv for spreadsheet import. Use --output-dir with json/markdown/geojson/kml/checklist/profile to write per-run files.", ), + progress_format: cli.ProgressFormat = typer.Option( + cli.ProgressFormat.NONE, + "--progress-format", + help="Emit machine-readable progress. Use jsonl for one JSON record per run on stderr.", + ), + progress_file: Path | None = typer.Option( + None, + "--progress-file", + help="Write JSONL progress to this file instead of stderr (implies --progress-format jsonl).", + ), validate_only: bool = typer.Option( False, "--validate-only", @@ -100,7 +121,12 @@ def batch( err=True, ) batch_manifest = load_batch_manifest(manifest) - results = run_batch_manifest(batch_manifest) + with progress_reporter( + "batch", + enabled=progress_format is cli.ProgressFormat.JSONL, + progress_file=progress_file, + ) as reporter: + results = run_batch_manifest(batch_manifest, progress=reporter) _emit_batch_warnings(results) _write_batch_file_outputs( output_dir=output_dir, diff --git a/adapters/commands/propagate.py b/adapters/commands/propagate.py index 80787b7..f77fcfc 100644 --- a/adapters/commands/propagate.py +++ b/adapters/commands/propagate.py @@ -13,13 +13,17 @@ _write_output, ) from adapters.io import InputLoadError, load_mission, load_vehicle +from adapters.progress import progress_reporter from adapters.stochastic_envelope import build_stochastic_envelope from adapters.stochastic_io import load_stochastic_plan, resolve_stochastic_asset_path def propagate( stochastic_file: Path = typer.Argument( - ..., exists=True, readable=True, resolve_path=True, + ..., + exists=True, + readable=True, + resolve_path=True, help="Path to stochastic.v1 YAML file.", ), format: cli.SummaryOutputFormat = typer.Option( @@ -27,7 +31,19 @@ def propagate( "--format", help="Output format. Use summary for a one-line feasibility and reserve result.", ), - output: Path | None = typer.Option(None, "--output", "-o", help="Write output to file instead of stdout."), + output: Path | None = typer.Option( + None, "--output", "-o", help="Write output to file instead of stdout." + ), + progress_format: cli.ProgressFormat = typer.Option( + cli.ProgressFormat.NONE, + "--progress-format", + help="Emit machine-readable progress. Use jsonl for one JSON record per interval on stderr.", + ), + progress_file: Path | None = typer.Option( + None, + "--progress-file", + help="Write JSONL progress to this file instead of stderr (implies --progress-format jsonl).", + ), validate_only: bool = typer.Option( False, "--validate-only", @@ -69,17 +85,23 @@ def propagate( mission_document=mission_document, ) - result = cli.run_stochastic_propagation( - plan, - mission_model, - vehicle_model, - wind_provider=mission_assets.wind_provider, - terrain_provider=mission_assets.terrain_provider, - population_provider=mission_assets.population_provider, - obstacle_provider=mission_assets.obstacle_provider, - geofences=mission_assets.geofences, - landing_zones=mission_assets.landing_zones, - ) + with progress_reporter( + "propagate", + enabled=progress_format is cli.ProgressFormat.JSONL, + progress_file=progress_file, + ) as reporter: + result = cli.run_stochastic_propagation( + plan, + mission_model, + vehicle_model, + wind_provider=mission_assets.wind_provider, + terrain_provider=mission_assets.terrain_provider, + population_provider=mission_assets.population_provider, + obstacle_provider=mission_assets.obstacle_provider, + geofences=mission_assets.geofences, + landing_zones=mission_assets.landing_zones, + progress=reporter, + ) envelope = build_stochastic_envelope( result=result, stochastic_document=stochastic_document, diff --git a/adapters/commands/sample.py b/adapters/commands/sample.py index d0ab1b0..d1c4d4b 100644 --- a/adapters/commands/sample.py +++ b/adapters/commands/sample.py @@ -13,13 +13,20 @@ _write_output, ) from adapters.io import InputLoadError, load_mission, load_vehicle +from adapters.progress import progress_reporter from adapters.uncertainty_envelope import build_uncertainty_envelope -from adapters.uncertainty_io import load_uncertainty_plan, resolve_uncertainty_asset_path +from adapters.uncertainty_io import ( + load_uncertainty_plan, + resolve_uncertainty_asset_path, +) def sample( uncertainty_file: Path = typer.Argument( - ..., exists=True, readable=True, resolve_path=True, + ..., + exists=True, + readable=True, + resolve_path=True, help="Path to uncertainty.v1 YAML file.", ), format: cli.SummaryOutputFormat = typer.Option( @@ -27,7 +34,19 @@ def sample( "--format", help="Output format. Use summary for a one-line feasibility and reserve result.", ), - output: Path | None = typer.Option(None, "--output", "-o", help="Write output to file instead of stdout."), + output: Path | None = typer.Option( + None, "--output", "-o", help="Write output to file instead of stdout." + ), + progress_format: cli.ProgressFormat = typer.Option( + cli.ProgressFormat.NONE, + "--progress-format", + help="Emit machine-readable progress. Use jsonl for one JSON record per interval on stderr.", + ), + progress_file: Path | None = typer.Option( + None, + "--progress-file", + help="Write JSONL progress to this file instead of stderr (implies --progress-format jsonl).", + ), validate_only: bool = typer.Option( False, "--validate-only", @@ -69,17 +88,23 @@ def sample( mission_document=mission_document, ) - result = cli.run_monte_carlo( - plan, - mission_model, - vehicle_model, - wind_provider=mission_assets.wind_provider, - terrain_provider=mission_assets.terrain_provider, - population_provider=mission_assets.population_provider, - obstacle_provider=mission_assets.obstacle_provider, - geofences=mission_assets.geofences, - landing_zones=mission_assets.landing_zones, - ) + with progress_reporter( + "sample", + enabled=progress_format is cli.ProgressFormat.JSONL, + progress_file=progress_file, + ) as reporter: + result = cli.run_monte_carlo( + plan, + mission_model, + vehicle_model, + wind_provider=mission_assets.wind_provider, + terrain_provider=mission_assets.terrain_provider, + population_provider=mission_assets.population_provider, + obstacle_provider=mission_assets.obstacle_provider, + geofences=mission_assets.geofences, + landing_zones=mission_assets.landing_zones, + progress=reporter, + ) envelope = build_uncertainty_envelope( result=result, uncertainty_document=uncertainty_document, diff --git a/adapters/progress.py b/adapters/progress.py new file mode 100644 index 0000000..a06352e --- /dev/null +++ b/adapters/progress.py @@ -0,0 +1,104 @@ +"""Machine-readable run progress for long commands (Ticket 106). + +A small, dependency-free progress side-channel for the long-running loops +(``propagate``, ``sample``, ``batch``). When enabled, one compact JSON object is +written per emit interval plus a guaranteed final record where +``completed == total``: + + {"event":"progress","command":"propagate","completed":250,"total":1000,"elapsed_s":75.3} + +Records go to stderr or a sidecar file, NEVER to the ``--output`` stream, and the +feature is opt-in. When disabled the command passes ``None`` as the callback, so +the hot loop pays nothing beyond a single ``is not None`` check. +""" + +from __future__ import annotations + +import json +import sys +import time +from collections.abc import Iterator +from contextlib import contextmanager +from pathlib import Path +from typing import TextIO + +# Target number of progress records over a run when no explicit interval is set. +# The reporter derives the per-tick interval from this and the run's total so a +# 1,000-sample run emits roughly one line per 5% rather than 1,000 lines. +_DEFAULT_RECORD_TARGET = 20 + + +class ProgressReporter: + """Emits JSONL progress records to a sink at a fixed completed-count interval. + + Call the instance as ``reporter(completed, total)`` once per finished + iteration. It emits when at least ``interval`` iterations have completed since + the last record, and always emits the final ``completed == total`` record. + ``elapsed_s`` is wall-clock seconds from construction, via a monotonic clock. + """ + + def __init__( + self, + sink: TextIO, + command: str, + *, + interval: int | None = None, + ) -> None: + self._sink = sink + self._command = command + self._interval = interval + self._start = time.monotonic() + self._last_emitted = 0 + self._final_emitted = False + + def __call__(self, completed: int, total: int) -> None: + if self._final_emitted: + return + step = self._interval if self._interval is not None else _emit_interval(total) + is_final = completed >= total + if is_final or completed - self._last_emitted >= step: + self._emit(completed, total) + self._last_emitted = completed + self._final_emitted = is_final + + def _emit(self, completed: int, total: int) -> None: + record = { + "event": "progress", + "command": self._command, + "completed": completed, + "total": total, + "elapsed_s": round(time.monotonic() - self._start, 3), + } + self._sink.write(json.dumps(record, separators=(",", ":")) + "\n") + self._sink.flush() + + +def _emit_interval(total: int) -> int: + return max(1, total // _DEFAULT_RECORD_TARGET) + + +@contextmanager +def progress_reporter( + command: str, + *, + enabled: bool, + progress_file: Path | None, +) -> Iterator[ProgressReporter | None]: + """Yield a ProgressReporter when progress is enabled, else ``None``. + + Progress is enabled when ``enabled`` is set or a ``progress_file`` is given. + With a file the reporter streams JSONL there (opened for live tailing, not an + atomic replace); otherwise it writes to stderr. The file is always closed on + exit, even if the wrapped run raises. + """ + if not enabled and progress_file is None: + yield None + return + if progress_file is not None: + with progress_file.open("w", encoding="utf-8") as handle: + yield ProgressReporter(handle, command) + return + yield ProgressReporter(sys.stderr, command) + + +__all__ = ["ProgressReporter", "progress_reporter"] diff --git a/docs/CLI_EXIT_CODES.md b/docs/CLI_EXIT_CODES.md index eb18e8c..0cd731f 100644 --- a/docs/CLI_EXIT_CODES.md +++ b/docs/CLI_EXIT_CODES.md @@ -92,3 +92,8 @@ misread a result: - **Treat any code outside `{0, 10, 11, 12, 13}` as a harness fault.** Shell status `1` (an uncaught traceback) and `2` (argument-parser error) are not part of this contract; if you see `1`, file it as a bug. +- **`--progress-format jsonl` / `--progress-file` add no exit codes.** On + `sample`, `propagate`, and `batch`, machine-readable progress is a + stderr/sidecar side-channel only (one JSON record per interval); it never + touches the `--output` stream and does not change the exit-code semantics + above. diff --git a/docs/USAGE.md b/docs/USAGE.md index 594f5e5..ef0ca86 100644 --- a/docs/USAGE.md +++ b/docs/USAGE.md @@ -254,6 +254,10 @@ Batch exits `0` only when all runs are feasible, `10` when any run is infeasible and no run had an input error, `11` when any run cannot load its inputs, and `13` for unexpected internal failures. +`batch` supports machine-readable progress for non-interactive workers — see +[Run Progress (JSONL)](#run-progress-jsonl) below. One record is emitted per +completed run, with `total` equal to the number of runs in the manifest. + ## Mission Estimation Run the example mission: @@ -1405,6 +1409,8 @@ assertions: The `sample` command runs a seeded uncertainty plan and emits `uncertainty-report.v1`. Use it when wind, speed, power, or other configured inputs need distribution bounds rather than a single deterministic estimate. +For long runs it can stream machine-readable progress — see +[Run Progress (JSONL)](#run-progress-jsonl). ```bash uv run bvlos-sim sample \ @@ -1474,7 +1480,9 @@ cruise_speed_mps: The `propagate` command runs a time-stepped particle propagator over the full mission timeline. Each particle carries independently sampled wind, cruise speed, cruise power, and battery capacity. Per-step `p_reserve_violation` -tracks energy risk accumulation. Emits `stochastic-envelope.v1`. +tracks energy risk accumulation. Emits `stochastic-envelope.v1`. For long runs +it can stream machine-readable progress — see +[Run Progress (JSONL)](#run-progress-jsonl). ```bash uv run bvlos-sim propagate \ @@ -1575,6 +1583,52 @@ parameters: std: 25.0 ``` +## Run Progress (JSONL) + +The long-running commands `sample`, `propagate`, and `batch` can emit +structured, line-oriented progress so a non-interactive caller (a queue worker) +can show live progress instead of a flat "running" until the process exits. The +feature is **opt-in and off by default**; a run with no progress flag behaves +byte-for-byte as before. + +Two flags control it, consistent across all three commands: + +- `--progress-format jsonl` — emit JSONL progress to **stderr** (default is + `none`, which emits nothing). +- `--progress-file PATH` — write the JSONL stream to a file instead of stderr + (implies `jsonl`). The file is opened for live tailing, not an atomic replace, + so a worker can follow it as it grows. + +```bash +# progress on stderr, result envelope on stdout +uv run bvlos-sim sample \ + examples/uncertainty/pipeline_demo_001_wind_uncertainty.yaml \ + --progress-format jsonl + +# progress to a sidecar file, result to --output +uv run bvlos-sim propagate \ + examples/stochastic/pipeline_demo_001_stochastic.yaml \ + --progress-file /tmp/propagate.progress.jsonl \ + --output /tmp/stochastic.json +``` + +Each line is one compact JSON object with stable keys: + +```json +{"event":"progress","command":"propagate","completed":250,"total":1000,"elapsed_s":75.3} +``` + +- `completed` increases monotonically and the final record always has + `completed == total`. For `sample`/`propagate`, `total` is the plan's sample + count; for `batch`, it is the number of runs in the manifest. +- `elapsed_s` is wall-clock seconds from the start of the run (monotonic clock). +- Records are emitted at an interval (about one record per 5% of the run) plus a + guaranteed final record. + +Progress is a **stderr/sidecar side-channel only**: it never appears in the +`--output` JSON stream, introduces no new schema or envelope version, and does +not change the result envelope, the deterministic results, or the exit code. + ## SITL Evidence Contract The `sitl` command reuses an existing `scenario.v1` file, runs the deterministic diff --git a/docs/tickets/106-machine-readable-run-progress.md b/docs/tickets/106-machine-readable-run-progress.md index aeabd8d..ad43eb9 100644 --- a/docs/tickets/106-machine-readable-run-progress.md +++ b/docs/tickets/106-machine-readable-run-progress.md @@ -2,7 +2,7 @@ ## Status -Planned. +Implemented. ## Goal @@ -51,3 +51,67 @@ unstructured stage text, not per-iteration machine-readable framing. service. - Third-party progress libraries (`tqdm` and similar) — keep the dependency footprint clean, consistent with Ticket 067. + +## Implementation + +### New files + +| File | Purpose | +|------|---------| +| `adapters/progress.py` | `ProgressReporter` (JSONL emitter) and the `progress_reporter` context manager that binds it to stderr or a file, or yields `None` when disabled. | +| `tests/test_run_progress.py` | CLI progress-file/stderr tests for all three commands, off-by-default invariance, and a direct callback contract on `run_monte_carlo`. | + +### Progress helper + +`ProgressReporter` is constructed with a sink (an open text stream), the command +name, and an optional explicit interval. It is called once per finished +iteration as `reporter(completed, total)` and emits a record when at least +`interval` iterations have completed since the last one, always emitting the +final `completed == total` record. When no interval is given it derives one from +the total so a run emits roughly 20 records (about one per 5%) rather than one +per iteration. Each record is a single compact line: + +```json +{"event":"progress","command":"propagate","completed":250,"total":1000,"elapsed_s":75.3} +``` + +`json.dumps(..., separators=(",", ":"))` keeps it compact, the sink is flushed +after every line so a worker sees progress live, and `elapsed_s` is wall-clock +seconds from construction via `time.monotonic()` (kept out of anything golden). + +`progress_reporter(command, *, enabled, progress_file)` is a context manager: +it yields a reporter bound to the file (opened for live tailing, not an atomic +replace) when `progress_file` is set, else to stderr when `enabled`, else +`None`. The file is always closed on exit, even if the run raises. + +### Threaded callback + +A keyword-only `progress: Callable[[int, int], None] | None = None` parameter was +added to `run_monte_carlo` (`estimator/execution/monte_carlo.py`), +`run_stochastic_propagation` (`estimator/execution/propagator.py`, forwarded into +`ParticleSampler`), and `run_batch_manifest` (`adapters/batch_support.py`). Each +loop calls `progress(completed, total)` once per iteration with `total` equal to +the plan sample count or `len(manifest.runs)`. Default `None` means no callback +is invoked, so the hot loop pays only a single `is not None` check and behaves +exactly as before. The per-iteration `continue` paths in the Monte Carlo and +particle-sampler loops were refactored to `if/else` so the progress tick fires +on every iteration, including failed/spatial-infeasible samples; the computed +results are unchanged. + +### CLI flags + +`sample`, `propagate`, and `batch` each gained two flags: `--progress-format` +(a `ProgressFormat` enum, default `none`, set `jsonl` to enable on stderr) and +`--progress-file PATH` (writes the JSONL stream to a file, implying `jsonl`). +The command opens a `progress_reporter` around the run call and passes its +callback (or `None`) down. + +### Side-channel only / off by default + +Progress goes to stderr or the sidecar file and never to the `--output` stream; +the result envelope, the golden fixtures, and the deterministic results are +unchanged, and no new schema or envelope version is introduced. With no progress +flag a run is byte-for-byte identical to before (covered by a test that compares +the `--output` bytes of a plain run against a progress-enabled run and asserts an +empty stderr). This is the machine-readable, non-TTY half of progress; the human +TTY bar remains Ticket 067's scope and is not implemented here. diff --git a/docs/tickets/README.md b/docs/tickets/README.md index 8933940..d6ffd42 100644 --- a/docs/tickets/README.md +++ b/docs/tickets/README.md @@ -1,6 +1,6 @@ # Ticket Backlog -**61 implemented · 17 planned · 1245 tests passing** +**62 implemented · 16 planned · 1251 tests passing** This directory tracks every capability from idea to implementation. Completed tickets are kept as historical records. Open tickets describe what to build @@ -53,7 +53,6 @@ worker depends on. | # | Ticket | What it adds | |---|---|---| -| 106 | [Machine-readable run progress](./106-machine-readable-run-progress.md) | JSONL progress for `propagate`/`sample`/`batch` so a non-TTY worker can show live progress (extends 067) | | 107 | [Machine-readable preflight report](./107-machine-readable-preflight-report.md) | JSON `--validate-only` envelope plus GeoJSON asset preflight across run types (composes with 089) | ### Core simulation gaps @@ -203,3 +202,4 @@ New capabilities should work *with* existing pieces, not alongside them in isola 58. [083](./083-calibration-profile-data-and-fitting.md) Calibration profile data and fitting 59. [104](./104-atomic-output-writes-and-cancellation.md) Atomic output writes and clean cancellation 60. [105](./105-contract-version-discovery-command.md) Contract-version discovery command +61. [106](./106-machine-readable-run-progress.md) Machine-readable run progress diff --git a/estimator/execution/monte_carlo.py b/estimator/execution/monte_carlo.py index 23722df..4b13e1d 100644 --- a/estimator/execution/monte_carlo.py +++ b/estimator/execution/monte_carlo.py @@ -2,7 +2,7 @@ import random import statistics as stats_module -from collections.abc import Sequence +from collections.abc import Callable, Sequence from estimator.core.enums import EstimateStatus from estimator.core.geofence import GeofenceZone @@ -61,6 +61,7 @@ def run_monte_carlo( obstacle_provider: ObstacleProvider | None = None, geofences: Sequence[GeofenceZone] | None = None, landing_zones: Sequence[LandingZone] | None = None, + progress: Callable[[int, int], None] | None = None, ) -> MonteCarloResult: """Run a seeded Monte Carlo uncertainty analysis and return aggregated results. @@ -91,7 +92,9 @@ def run_monte_carlo( if failure is not None else "Baseline mission estimate failed before sampling could start." ) - raise ValueError(f"Monte Carlo sampling requires a feasible baseline: {message}") + raise ValueError( + f"Monte Carlo sampling requires a feasible baseline: {message}" + ) rng = random.Random(plan.seed) @@ -102,7 +105,7 @@ def run_monte_carlo( energy_sample_count = 0 failed = 0 - for _ in range(plan.samples): + for sample_index in range(plan.samples): sampled_wind_east = ( _sample(rng, params.wind_east_mps) if params.wind_east_mps else None ) @@ -145,15 +148,17 @@ def run_monte_carlo( ) if result.status == EstimateStatus.ERROR: failed += 1 - continue - - times.append(result.total_time_s) - if result.energy is not None: - energy_sample_count += 1 - reserves_wh.append(result.energy.reserve_at_landing_wh) - reserves_pct.append(result.energy.reserve_at_landing_percent) - if result.energy.is_feasible: - feasible_count += 1 + else: + times.append(result.total_time_s) + if result.energy is not None: + energy_sample_count += 1 + reserves_wh.append(result.energy.reserve_at_landing_wh) + reserves_pct.append(result.energy.reserve_at_landing_percent) + if result.energy.is_feasible: + feasible_count += 1 + + if progress is not None: + progress(sample_index + 1, plan.samples) completed = plan.samples - failed feasibility_rate = ( diff --git a/estimator/execution/propagation/sampling.py b/estimator/execution/propagation/sampling.py index eed7488..849786f 100644 --- a/estimator/execution/propagation/sampling.py +++ b/estimator/execution/propagation/sampling.py @@ -1,7 +1,7 @@ """Sampled-parameter draws, estimator input wiring, and particle creation.""" import random -from collections.abc import Sequence +from collections.abc import Callable, Sequence from dataclasses import dataclass from estimator.core.geofence import GeofenceZone @@ -14,7 +14,10 @@ from estimator.execution.engine import try_estimate_mission_distance_time from estimator.execution.propagation.curves import best_position_legs from estimator.execution.propagation.particles import ParticlePopulation, ParticleTrack -from estimator.execution.propagation.stats import sample_optional, sample_positive_optional +from estimator.execution.propagation.stats import ( + sample_optional, + sample_positive_optional, +) from schemas.mission import MissionPlan from schemas.stochastic import StochasticPropagationPlan from schemas.vehicle import VehicleProfile @@ -43,7 +46,9 @@ def from_plan( wind_north_mps=sample_optional(rng, params.wind_north_mps), cruise_speed_mps=sample_positive_optional(rng, params.cruise_speed_mps), cruise_power_w=sample_positive_optional(rng, params.cruise_power_w), - battery_capacity_wh=sample_positive_optional(rng, params.battery_capacity_wh), + battery_capacity_wh=sample_positive_optional( + rng, params.battery_capacity_wh + ), ) @@ -99,32 +104,37 @@ class ParticleSampler: rng: random.Random sensors: SensorProfile | None controller: ControllerProfile | None + progress: Callable[[int, int], None] | None = None def run(self) -> ParticlePopulation: particles: list[ParticleTrack] = [] position_legs = self.baseline_legs spatial_infeasible_count = 0 - for _ in range(self.plan.samples): + for sample_index in range(self.plan.samples): sample = SampledParameters.from_plan(plan=self.plan, rng=self.rng) result = self.estimator_inputs.with_sample(sample).estimate() if is_spatial_infeasible(result): spatial_infeasible_count += 1 - continue - - particle = ParticleTrack.from_estimate( - estimate=result, - wind_east_mps=sample.wind_east_mps if sample.wind_east_mps is not None else 0.0, - wind_north_mps=sample.wind_north_mps if sample.wind_north_mps is not None else 0.0, - sensors=self.sensors, - controller=self.controller, - ) - if particle is None: - continue - - particles.append(particle) - position_legs = best_position_legs(position_legs, result) + else: + particle = ParticleTrack.from_estimate( + estimate=result, + wind_east_mps=sample.wind_east_mps + if sample.wind_east_mps is not None + else 0.0, + wind_north_mps=sample.wind_north_mps + if sample.wind_north_mps is not None + else 0.0, + sensors=self.sensors, + controller=self.controller, + ) + if particle is not None: + particles.append(particle) + position_legs = best_position_legs(position_legs, result) + + if self.progress is not None: + self.progress(sample_index + 1, self.plan.samples) return ParticlePopulation( particles=tuple(particles), diff --git a/estimator/execution/propagator.py b/estimator/execution/propagator.py index f7e0a52..9a20878 100644 --- a/estimator/execution/propagator.py +++ b/estimator/execution/propagator.py @@ -5,7 +5,7 @@ """ import random -from collections.abc import Sequence +from collections.abc import Callable, Sequence from estimator.core.enums import EstimateStatus from estimator.core.geofence import GeofenceZone @@ -17,7 +17,10 @@ from estimator.execution.propagation.curves import PositionInterpolator from estimator.execution.propagation.sampling import EstimatorInputs, ParticleSampler from estimator.execution.propagation.stats import compute_stats, feasibility_rate -from estimator.execution.propagation.timeline import TimelineBuilder, reserve_threshold_wh +from estimator.execution.propagation.timeline import ( + TimelineBuilder, + reserve_threshold_wh, +) from schemas.mission import MissionPlan from schemas.stochastic import StochasticPropagationPlan, StochasticPropagationResult from schemas.vehicle import VehicleProfile @@ -34,6 +37,7 @@ def run_stochastic_propagation( obstacle_provider: ObstacleProvider | None = None, geofences: Sequence[GeofenceZone] | None = None, landing_zones: Sequence[LandingZone] | None = None, + progress: Callable[[int, int], None] | None = None, ) -> StochasticPropagationResult: """Run seeded stochastic state propagation and return a timeline report.""" estimator_inputs = EstimatorInputs( @@ -54,7 +58,9 @@ def run_stochastic_propagation( if failure is not None else "Baseline mission estimate failed before propagation could start." ) - raise ValueError(f"Stochastic propagation requires a feasible baseline: {message}") + raise ValueError( + f"Stochastic propagation requires a feasible baseline: {message}" + ) rng = random.Random(plan.seed) population = ParticleSampler( @@ -64,6 +70,7 @@ def run_stochastic_propagation( rng=rng, sensors=vehicle.sensors, controller=vehicle.controller, + progress=progress, ).run() threshold_wh = reserve_threshold_wh(baseline) diff --git a/tests/test_run_progress.py b/tests/test_run_progress.py new file mode 100644 index 0000000..20115b3 --- /dev/null +++ b/tests/test_run_progress.py @@ -0,0 +1,194 @@ +"""Tests for machine-readable run progress (Ticket 106). + +Covers the JSONL progress side-channel on ``sample``, ``propagate``, and +``batch``: record shape, monotonic ``completed`` with a final ``completed == +total`` record, that progress never leaks into the ``--output`` stream, that the +feature is off by default, and a direct callback contract on ``run_monte_carlo``. +""" + +import json +from pathlib import Path + +from typer.testing import CliRunner + +from adapters.cli import CliExitCode, app +from adapters.io import load_mission, load_vehicle +from adapters.uncertainty_io import ( + load_uncertainty_plan, + resolve_uncertainty_asset_path, +) +from estimator.execution.monte_carlo import run_monte_carlo + +REPO_ROOT = Path(__file__).resolve().parents[1] +EXAMPLE_UNCERTAINTY = ( + REPO_ROOT / "examples/uncertainty/pipeline_demo_001_wind_uncertainty.yaml" +) +EXAMPLE_STOCHASTIC = REPO_ROOT / "examples/stochastic/pipeline_demo_001_stochastic.yaml" +EXAMPLE_BATCH = REPO_ROOT / "examples/batch/demo_batch.yaml" + +runner = CliRunner() + + +def _read_progress_records(path: Path) -> list[dict]: + lines = [line for line in path.read_text(encoding="utf-8").splitlines() if line] + return [json.loads(line) for line in lines] + + +def _assert_valid_progress(records: list[dict], *, command: str, total: int) -> None: + assert records, "expected at least one progress record" + completions = [] + for record in records: + assert record["event"] == "progress" + assert record["command"] == command + assert record["total"] == total + assert isinstance(record["elapsed_s"], (int, float)) + assert record["elapsed_s"] >= 0 + completions.append(record["completed"]) + # completed is strictly increasing and the final record reaches total. + assert all(b > a for a, b in zip(completions, completions[1:], strict=False)) + assert completions[-1] == total + + +# --------------------------------------------------------------------------- +# CLI progress-file tests +# --------------------------------------------------------------------------- + + +def test_sample_progress_file_records(tmp_path: Path) -> None: + progress_path = tmp_path / "progress.jsonl" + output_path = tmp_path / "out.json" + result = runner.invoke( + app, + [ + "sample", + str(EXAMPLE_UNCERTAINTY), + "--progress-file", + str(progress_path), + "--output", + str(output_path), + ], + ) + assert result.exit_code == int(CliExitCode.SUCCESS) + records = _read_progress_records(progress_path) + _assert_valid_progress(records, command="sample", total=200) + # The output envelope must not contain progress framing. + assert '"event":"progress"' not in output_path.read_text(encoding="utf-8") + assert '"event": "progress"' not in output_path.read_text(encoding="utf-8") + + +def test_propagate_progress_file_records(tmp_path: Path) -> None: + progress_path = tmp_path / "progress.jsonl" + output_path = tmp_path / "out.json" + result = runner.invoke( + app, + [ + "propagate", + str(EXAMPLE_STOCHASTIC), + "--progress-file", + str(progress_path), + "--output", + str(output_path), + ], + ) + assert result.exit_code == int(CliExitCode.SUCCESS) + records = _read_progress_records(progress_path) + _assert_valid_progress(records, command="propagate", total=100) + assert "progress" not in output_path.read_text(encoding="utf-8") + + +def test_batch_progress_file_records(tmp_path: Path) -> None: + progress_path = tmp_path / "progress.jsonl" + result = runner.invoke( + app, + [ + "batch", + str(EXAMPLE_BATCH), + "--progress-file", + str(progress_path), + ], + ) + # The demo manifest includes an infeasible run, so the exit code is 10; the + # progress side-channel is independent of the feasibility verdict. + assert result.exit_code in { + int(CliExitCode.SUCCESS), + int(CliExitCode.INFEASIBLE), + } + records = _read_progress_records(progress_path) + _assert_valid_progress(records, command="batch", total=3) + # stdout carries the table, never progress framing. + assert '"event":"progress"' not in result.stdout + + +# --------------------------------------------------------------------------- +# stderr sink and off-by-default +# --------------------------------------------------------------------------- + + +def test_progress_to_stderr_not_stdout() -> None: + result = runner.invoke( + app, + ["sample", str(EXAMPLE_UNCERTAINTY), "--progress-format", "jsonl"], + ) + assert result.exit_code == int(CliExitCode.SUCCESS) + assert '"event":"progress"' in result.stderr + # The result envelope on stdout stays clean JSON with no progress records. + payload = json.loads(result.stdout) + assert payload["schema_version"] + assert '"event":"progress"' not in result.stdout + + +def test_progress_off_by_default_is_unchanged(tmp_path: Path) -> None: + baseline_out = tmp_path / "baseline.json" + progress_out = tmp_path / "with_progress.json" + progress_file = tmp_path / "progress.jsonl" + + baseline = runner.invoke( + app, ["sample", str(EXAMPLE_UNCERTAINTY), "--output", str(baseline_out)] + ) + with_progress = runner.invoke( + app, + [ + "sample", + str(EXAMPLE_UNCERTAINTY), + "--output", + str(progress_out), + "--progress-file", + str(progress_file), + ], + ) + + assert baseline.exit_code == int(CliExitCode.SUCCESS) + assert with_progress.exit_code == int(CliExitCode.SUCCESS) + # Enabling progress does not change the result envelope at all. + assert baseline_out.read_bytes() == progress_out.read_bytes() + # A run with no progress flag emits no progress framing on stderr. + assert "progress" not in baseline.stderr + + +# --------------------------------------------------------------------------- +# Direct callback contract +# --------------------------------------------------------------------------- + + +def test_run_monte_carlo_invokes_callback_to_completion() -> None: + plan, _ = load_uncertainty_plan(EXAMPLE_UNCERTAINTY) + mission_path = resolve_uncertainty_asset_path( + plan.mission_file, uncertainty_path=EXAMPLE_UNCERTAINTY + ) + vehicle_path = resolve_uncertainty_asset_path( + plan.vehicle_file, uncertainty_path=EXAMPLE_UNCERTAINTY + ) + mission, _ = load_mission(mission_path) + vehicle, _ = load_vehicle(vehicle_path) + + calls: list[tuple[int, int]] = [] + + def record(completed: int, total: int) -> None: + calls.append((completed, total)) + + run_monte_carlo(plan, mission, vehicle, progress=record) + + assert calls, "callback should be invoked at least once" + completions = [completed for completed, _ in calls] + assert all(b >= a for a, b in zip(completions, completions[1:], strict=False)) + assert calls[-1] == (plan.samples, plan.samples)