diff --git a/dotbot/calibration/lighthouse2.py b/dotbot/calibration/lighthouse2.py index 23803736..f181c5d2 100644 --- a/dotbot/calibration/lighthouse2.py +++ b/dotbot/calibration/lighthouse2.py @@ -12,6 +12,7 @@ import datetime import math import os +import re import tomllib from dataclasses import dataclass from pathlib import Path @@ -189,6 +190,18 @@ def _build_calibration_payload( return bytes(payload) +def _slug_tag(tag: str) -> str: + """Filename-safe slug for a free-form calibration tag. + + Keeps ASCII letters, digits, dot, dash and underscore; collapses any + other run of characters to a single dash and trims dashes and dots off + the ends (so a tag like "../x" can't smuggle in a leading ".."). Returns + "" when nothing usable remains, so callers can treat the tag as absent. + The slug is safe to drop into both a filename and a TOML string. + """ + return re.sub(r"[^A-Za-z0-9._-]+", "-", tag).strip("-.") + + def _parse_calibration_payload(payload: bytes) -> list[bytes]: """Inverse of `_build_calibration_payload`: yields the per-LH 36-byte matrix chunks. Used when loading from either TOML or legacy .out.""" @@ -364,7 +377,7 @@ def load_calibration(self) -> list[bytes]: return _parse_calibration_payload(self.calibration_output_path.read_bytes()) return [] - def save_calibration(self) -> Path: + def save_calibration(self, tag: Optional[str] = None) -> Path: """Save the calibration as a timestamped TOML file (+ legacy .out). The TOML file is the new primary record: versioned, metadata- @@ -372,6 +385,11 @@ def save_calibration(self) -> Path: written so external consumers (swarmit OTA, dotbot-provision) keep working until they learn to read TOML. + `tag`, when given, is a free-form arena/setup label (e.g. + "office-2x2m"); a filename-safe slug of it is inserted into the + filename and recorded under `[metadata]` so the calibration stays + self-describing even after a rename. + Returns the path of the TOML file just written, and also stores it on `self.last_saved_toml_path` so a caller that lost the return value (e.g. the TUI handler) can still surface it after @@ -383,7 +401,14 @@ def save_calibration(self) -> Path: # Filename-safe variant of ISO 8601: `:` is rejected on Windows # and a footgun on some Unix tools. ts_for_filename = now.strftime("%Y-%m-%dT%H-%M-%SZ") - toml_path = CALIBRATION_DIR / f"calibration-{ts_for_filename}.toml" + slug = _slug_tag(tag) if tag else "" + stem = ( + f"calibration-{slug}-{ts_for_filename}" + if slug + else f"calibration-{ts_for_filename}" + ) + toml_path = CALIBRATION_DIR / f"{stem}.toml" + tag_line = f'tag = "{slug}"\n' if slug else "" # Explicit UTF-8 — TOML is spec'd as UTF-8, and Path.write_text # defaults to the platform encoding (cp1252 on Windows), which # mangles any non-ASCII byte and breaks the tomllib reader. @@ -394,6 +419,7 @@ def save_calibration(self) -> Path: f'created_at = "{now.strftime("%Y-%m-%dT%H:%M:%SZ")}"\n' f"calibration_distance_mm = {int(self.calibration_distance)}\n" f"num_lh_stations = {1 + self.extra_lh_num}\n" + f"{tag_line}" "\n" "[calibration]\n" "# 1-byte homography count + N x 36-byte int32 LE matrices,\n" diff --git a/dotbot/calibration/ota.py b/dotbot/calibration/ota.py new file mode 100644 index 00000000..d11e1686 --- /dev/null +++ b/dotbot/calibration/ota.py @@ -0,0 +1,134 @@ +# SPDX-FileCopyrightText: 2026-present Inria +# SPDX-License-Identifier: BSD-3-Clause + +"""Over-the-air LH2 calibration collection (swarmit transport). + +Variant A: a single DotBot, no serial cable. The bot's secure bootloader +samples its own raw LH2 counts on request (READY mode only) and ships them +back inside a SWARMIT_EVENT_LOG. This module triggers one capture per arena +corner and decodes the samples; the homography solve and save live in +`lighthouse2.LighthouseManager`, exactly as in the serial flow. +""" + +from __future__ import annotations + +import queue +import threading +import time +from collections.abc import Callable + +from dotbot.calibration.lighthouse2 import LH2CalibrationSample + +# The four reference corners, in the order LighthouseManager expects them: +# it zips the collected counts against REFERENCE_POINTS_DEFAULT positionally +# (top-left, top-right, bottom-left, bottom-right), so the collection order +# is load-bearing, not cosmetic. +CORNERS = ("top-left", "top-right", "bottom-left", "bottom-right") + +CAPTURE_TIMEOUT_DEFAULT = 5.0 +CAPTURE_RETRIES_DEFAULT = 3 + +# Each raw sample inside the LOG payload is [lh_index:1][count1:4 LE][count2:4 LE]. +_SAMPLE_SIZE = 9 + + +def parse_capture_payload(data: bytes, tag: int) -> list[LH2CalibrationSample]: + """Decode a SWARMIT_EVENT_LOG payload of raw LH2 samples. + + Layout (mirrors the swarmit bootloader): a 1-byte `tag`, then N + fixed-size records. Returns [] for any payload that is not a capture + (regular text log lines do not carry `tag` as their first byte). + """ + if len(data) < 1 or data[0] != tag: + return [] + body = data[1:] + samples: list[LH2CalibrationSample] = [] + for off in range(0, len(body) - _SAMPLE_SIZE + 1, _SAMPLE_SIZE): + lh_index = body[off] + count1 = int.from_bytes(body[off + 1 : off + 5], "little") + count2 = int.from_bytes(body[off + 5 : off + 9], "little") + samples.append(LH2CalibrationSample(lh_index, count1, count2)) + return samples + + +class CaptureSession: + """One shared log-event stream for a whole collect session. + + The bot only emits raw counts in reply to a trigger, so nothing arrives + unsolicited - a single `watch_log_events()` stream serves every corner. + A background reader thread decodes samples addressed to `device` into a + queue; `capture()` triggers and waits, re-triggering on timeout because + the trigger send is best-effort (no transport-level ack). + """ + + def __init__(self, client, device: str, tag: int): + self._client = client + self._device = device.upper() + self._tag = tag + self._queue: queue.Queue = queue.Queue() + self._stop = threading.Event() + self._thread = threading.Thread(target=self._reader, daemon=True) + + def __enter__(self) -> CaptureSession: + self._thread.start() + return self + + def __exit__(self, *exc) -> None: + self._stop.set() + + def _reader(self) -> None: + try: + for event in self._client.watch_log_events(): + if self._stop.is_set(): + break + if str(event.get("addr", "")).upper() != self._device: + continue + data = bytes.fromhex(event.get("data_hex", "")) + for sample in parse_capture_payload(data, self._tag): + self._queue.put(sample) + except Exception as exc: # surfaced on the next capture() get() + self._queue.put(exc) + + def capture( + self, + lh_index: int, + timeout: float, + retries: int, + on_attempt: Callable[[int, int], None] | None = None, + ) -> LH2CalibrationSample: + """Trigger a capture and return the first sample for `lh_index`. + + Retries the trigger up to `retries` times; raises TimeoutError if + no matching sample arrives. `on_attempt(n, total)` runs just before + each trigger so callers can show progress during the otherwise silent + wait. + """ + # Discard anything left over from the previous corner. + while not self._queue.empty(): + self._queue.get_nowait() + + attempts = retries + 1 + for attempt in range(attempts): + if on_attempt is not None: + on_attempt(attempt + 1, attempts) + self._client.request_lh2_capture(self._device) + deadline = time.monotonic() + timeout + while True: + remaining = deadline - time.monotonic() + if remaining <= 0: + break + try: + item = self._queue.get(timeout=remaining) + except queue.Empty: + break + if isinstance(item, Exception): + raise item + if item.lh_index == lh_index: + return item + # A sample for a different lighthouse: ignore, keep waiting. + + raise TimeoutError( + f"no LH{lh_index} sample from {self._device} after " + f"{retries + 1} attempt(s); is the bot in READY (app stopped) " + f"and in view of the lighthouse?" + ) diff --git a/dotbot/cli/calibrate.py b/dotbot/cli/calibrate.py index 5161d687..4d4dfa8c 100644 --- a/dotbot/cli/calibrate.py +++ b/dotbot/cli/calibrate.py @@ -1,11 +1,10 @@ # SPDX-FileCopyrightText: 2026-present Inria # SPDX-License-Identifier: BSD-3-Clause -"""`dotbot run lh2-calibration` — LH2 calibration (serial side). +"""`dotbot run lh2-calibration` - LH2 calibration. -Native subgroup mounting the vendored `dotbot.calibration` package. -Serial-attached, single-device operations. OTA / swarm-wide -counterparts will live under `dotbot swarm calibrate-lh2`. +Native subgroup mounting the vendored `dotbot.calibration` package, for +single-device calibration over either transport. Subcommands: @@ -15,6 +14,10 @@ . Today the only consumer is the swarmit secure bootloader (it #includes the file at compile time). +Cable-free, over-the-air calibration of a DotBot in the arena lives under +`dotbot swarm lh2-calibration` (it drives the fleet transport, not a serial +DK). + Calibration runtime deps (`opencv-python`, `textual`) live behind the `[calibrate]` extra; ImportError at subcommand invocation prints an install hint instead of a traceback. @@ -47,7 +50,7 @@ def _run_tui(ctx: click.Context) -> None: @click.group( name="lh2-calibration", - help="LH2 calibration: capture, apply, export (serial-side / single device).", + help="LH2 calibration for one serial-attached device: capture, apply.", invoke_without_command=True, ) @click.pass_context @@ -80,8 +83,8 @@ def _collect(ctx: click.Context) -> None: help=( "Write the saved calibration as a C header to PATH. Today the " "consumer is the swarmit secure bootloader (#includes the file " - "at compile time). OTA / runtime equivalents will live under " - "`dotbot swarm calibrate-lh2 apply`." + "at compile time). The over-the-air / runtime equivalent is " + "`dotbot swarm lh2-calibration push`." ), ) @click.argument( diff --git a/dotbot/cli/swarm.py b/dotbot/cli/swarm.py index ffce0c6c..dead6f17 100644 --- a/dotbot/cli/swarm.py +++ b/dotbot/cli/swarm.py @@ -60,6 +60,19 @@ def _with_config_injection(swarmit_group): @click.pass_context def cmd(ctx, args): args = list(args) + # `lh2-calibration` is PyDotBot-native (the homography solve lives + # here, not in swarmit), so intercept it before the passthrough and + # hand off to our own group, carrying the resolved config along. + if args and args[0] == "lh2-calibration": + from dotbot.cli.swarm_lh2 import cmd as lh2_group + + lh2_group.main( + args=args[1:], + prog_name="dotbot swarm lh2-calibration", + standalone_mode=True, + obj=ctx.obj, + ) + return final = inject_config(args, ctx.obj) if args else args _run_swarmit(swarmit_group, final) diff --git a/dotbot/cli/swarm_lh2.py b/dotbot/cli/swarm_lh2.py new file mode 100644 index 00000000..544020b8 --- /dev/null +++ b/dotbot/cli/swarm_lh2.py @@ -0,0 +1,266 @@ +# SPDX-FileCopyrightText: 2026-present Inria +# SPDX-License-Identifier: BSD-3-Clause + +"""`dotbot swarm lh2-calibration` - over-the-air LH2 calibration. + +The fleet-side home for LH2 calibration: capture and send a calibration +without a serial cable, driving a single DotBot through the swarmit +transport. Two subcommands: + +- `collect` - walk one DotBot through the 4 arena corners, trigger a + raw-count capture per corner over the air, solve the + homography, and save the calibration under ~/.dotbot/. +- `push ` - send a saved calibration to the bot over the air. A thin + forward to swarmit's `calibrate-lh2`, which picks the payload + format (legacy `.out` or `calibration-*.toml`) by extension. + +The homography solve lives in PyDotBot (`dotbot.calibration.lighthouse2`); the +transport lives in swarmit. `collect` therefore runs natively here, while +`push` is pure transport and reuses swarmit's own command. + +Serial-cable (single DK) calibration and the C-header `apply` export stay +under `dotbot run lh2-calibration`. + +Calibration runtime deps (`opencv-python`) live behind the `[calibrate]` +extra; ImportError at invocation prints an install hint instead of a +traceback. +""" + +import sys +import time + +import click + + +def _build_swarmit_client(ctx, conn, swarm_id, device): + """Build a swarmit client targeting a single `device`. + + Reuses swarmit's own conn-string translation so the two CLIs can't + drift, and falls back to the unified dotbot config's `conn` / `swarm_id` + (like `dotbot swarm`) when the flags are omitted. Imported lazily: the + swarmit protocol registry must not load during PyDotBot test collection. + + Transport selection is swarmit's call: `build_client` probes for a running + swarmit server and falls back to an in-process controller on its own, so + there is no flag to choose here. + """ + from swarmit.cli.main import DEFAULTS, _conn_to_config + from swarmit.client import build_client + from swarmit.testbed.controller import ControllerSettings + + if conn is None or swarm_id is None: + from dotbot.config import resolve + + obj = ctx.obj or {} + config = obj.get("config") + deployment = obj.get("deployment") + if conn is None: + conn = resolve("conn", config=config, deployment=deployment) + if swarm_id is None: + swarm_id = resolve("swarm_id", config=config, deployment=deployment) + + final = {**DEFAULTS, **_conn_to_config(conn, swarm_id)} + settings = ControllerSettings( + serial_port=final["serial_port"], + serial_baudrate=final["baudrate"], + mqtt_host=final["mqtt_host"], + mqtt_port=final["mqtt_port"], + mqtt_use_tls=final["mqtt_use_tls"], + mqtt_username=final.get("mqtt_username"), + mqtt_password=final.get("mqtt_password"), + network_id=int(final["swarmit_network_id"], 16), + adapter=final["adapter"], + devices=[device.upper()], + verbose=False, + ) + return build_client(settings) + + +@click.group( + name="lh2-calibration", + help="Over-the-air LH2 calibration for one DotBot: collect, push.", +) +def cmd() -> None: + pass + + +@cmd.command( + name="collect", + help=( + "Collect LH2 calibration from one DotBot over the air (no serial " + "cable). Walks you through the 4 arena corners, triggers a capture " + "per corner via swarmit, solves the homography, and saves the " + "calibration." + ), +) +@click.option( + "--device", + required=True, + help="DotBot link-layer address in hex (e.g. BC3D3C8A2A6F8E68).", +) +@click.option( + "-n", + "--conn", + "--connection", + "conn", + default=None, + help=( + "Swarm connection string: an MQTT broker `mqtts://host:port` or a " + "serial gateway `/dev/ttyACM0`. Falls back to the dotbot config." + ), +) +@click.option( + "-s", + "--swarm-id", + "swarm_id", + default=None, + help="Swarm id in hex (required for an MQTT broker connection).", +) +@click.option( + "-d", + "--distance", + default=None, + type=int, + help=( + "Distance between reference corners in millimeters " + "(default: the calibration package default)." + ), +) +@click.option( + "--timeout", + default=None, + type=float, + help="Seconds to wait for each capture before re-triggering.", +) +@click.option( + "--retries", + default=None, + type=int, + help="Re-trigger this many times per corner before giving up.", +) +@click.option( + "--tag", + default=None, + help=( + 'Optional arena/setup label (e.g. "office-2x2m") added to the saved ' + "filename and metadata, so calibrations stay self-describing." + ), +) +@click.option( + "--push", + is_flag=True, + help="Send the computed calibration back to the bot over the air.", +) +@click.pass_context +def _collect(ctx, device, conn, swarm_id, distance, timeout, retries, tag, push): + try: + from swarmit.testbed.protocol import LH2_CALIB_TAG + + from dotbot.calibration.lighthouse2 import ( + CALIBRATION_DISTANCE_DEFAULT, + LighthouseManager, + ) + from dotbot.calibration.ota import ( + CAPTURE_RETRIES_DEFAULT, + CAPTURE_TIMEOUT_DEFAULT, + CORNERS, + CaptureSession, + ) + except ImportError as exc: + click.echo( + "`dotbot swarm lh2-calibration collect` needs the calibration " + "runtime deps (opencv-python).\n" + "Install with: pip install dotbot[calibrate]", + err=True, + ) + click.echo(f"(import error was: {exc})", err=True) + sys.exit(1) + + distance = distance if distance is not None else CALIBRATION_DISTANCE_DEFAULT + timeout = timeout if timeout is not None else CAPTURE_TIMEOUT_DEFAULT + retries = retries if retries is not None else CAPTURE_RETRIES_DEFAULT + + try: + client = _build_swarmit_client(ctx, conn, swarm_id, device) + except click.ClickException: + raise + except Exception as exc: + click.echo(f"Could not reach the swarm: {exc}", err=True) + sys.exit(1) + + samples = [] + with client: + with CaptureSession(client, device, LH2_CALIB_TAG) as session: + # Give the transport's own connect/subscribe log lines a beat to + # print before our prompts, so the two don't interleave on screen. + time.sleep(0.2) + click.echo( + f"\nCollecting LH2 calibration from {device.upper()}.\n" + "Stop the bot's app first (capture only runs in READY).\n" + ) + for corner in CORNERS: + click.prompt( + f"Place the DotBot at the {corner} corner, then press Enter", + default="", + show_default=False, + prompt_suffix="", + ) + try: + sample = session.capture( + lh_index=0, + timeout=timeout, + retries=retries, + on_attempt=lambda n, total: click.echo( + f" triggering capture (attempt {n}/{total}), " + f"waiting up to {timeout:g}s..." + ), + ) + except TimeoutError as exc: + click.echo(f" ! {exc}", err=True) + raise click.Abort() + samples.append(sample) + click.echo( + f" captured {corner}: " + f"count1={sample.count1} count2={sample.count2}" + ) + + manager = LighthouseManager(calibration_distance=distance, extra_lh_num=0) + try: + manager.compute_calibration(samples) + except Exception as exc: + click.echo(f"Failed to compute calibration: {exc}", err=True) + sys.exit(1) + path = manager.save_calibration(tag=tag) + click.echo(f"\nCalibration saved to {path}") + + if push: + payload = manager.calibration_output_path.read_bytes() + client.send_lh2_calibration(payload) + click.echo("Sent the calibration to the bot over the air.") + else: + click.echo( + "To send it to the bot over the air:\n" + f" dotbot swarm lh2-calibration push {path}" + ) + + +@cmd.command( + name="push", + help=( + "Send a saved LH2 calibration to the bot over the air. Forwards to " + "swarmit's `calibrate-lh2`, which picks the payload format (legacy " + "`.out` or `calibration-*.toml`) by file extension." + ), +) +@click.argument( + "path", + type=click.Path(exists=True, dir_okay=False), +) +@click.pass_context +def _push(ctx, path): + from dotbot.cli._swarm_inject import inject_config + from dotbot.cli.swarm import _load_swarmit_group, _run_swarmit + + swarmit_group = _load_swarmit_group() + final = inject_config(["calibrate-lh2", path], ctx.obj) + _run_swarmit(swarmit_group, final) diff --git a/dotbot/tests/test_calibration_lighthouse2.py b/dotbot/tests/test_calibration_lighthouse2.py index c693717d..d069ffbd 100644 --- a/dotbot/tests/test_calibration_lighthouse2.py +++ b/dotbot/tests/test_calibration_lighthouse2.py @@ -61,6 +61,48 @@ def test_save_calibration_writes_toml_and_legacy_out(monkeypatch, tmp_path): assert payload == (tmp_path / "calibration.out").read_bytes() +def test_save_calibration_tag_in_filename_and_metadata(monkeypatch, tmp_path): + monkeypatch.setattr(lighthouse2, "CALIBRATION_DIR", tmp_path) + mgr = LighthouseManager(extra_lh_num=0) + mgr.calibration_output_path = tmp_path / lighthouse2.CALIBRATION_LEGACY_OUT + mgr.homographies = [_seed_homography(1.0)] + + path = mgr.save_calibration(tag="office-2x2m") + + assert path.name.startswith("calibration-office-2x2m-") + with open(path, "rb") as f: + parsed = tomllib.load(f) + assert parsed["metadata"]["tag"] == "office-2x2m" + + +def test_save_calibration_sanitizes_and_omits_empty_tag(monkeypatch, tmp_path): + monkeypatch.setattr(lighthouse2, "CALIBRATION_DIR", tmp_path) + mgr = LighthouseManager(extra_lh_num=0) + mgr.calibration_output_path = tmp_path / lighthouse2.CALIBRATION_LEGACY_OUT + mgr.homographies = [_seed_homography(1.0)] + + # Unsafe characters collapse to dashes and the leading ".." is trimmed; + # the slug stays a single filename component inside ~/.dotbot. + path = mgr.save_calibration(tag="../lab room/A") + assert path.parent == tmp_path + assert path.name.startswith("calibration-lab-room-A-") + + # A tag that reduces to nothing is treated as absent (no stray dashes). + path = mgr.save_calibration(tag="///") + assert path.name.startswith("calibration-2") # the timestamp year + with open(path, "rb") as f: + assert "tag" not in tomllib.load(f)["metadata"] + + +def test_slug_tag_rules(): + assert lighthouse2._slug_tag("office-2x2m") == "office-2x2m" + assert lighthouse2._slug_tag(" a b ") == "a-b" + assert lighthouse2._slug_tag("a/b\\c:d") == "a-b-c-d" + assert lighthouse2._slug_tag("--keep_me.v2--") == "keep_me.v2" + assert lighthouse2._slug_tag("..") == "" + assert lighthouse2._slug_tag("***") == "" + + def test_load_calibration_prefers_newest_toml(monkeypatch, tmp_path): monkeypatch.setattr(lighthouse2, "CALIBRATION_DIR", tmp_path) mgr = LighthouseManager(extra_lh_num=0) diff --git a/dotbot/tests/test_calibration_ota.py b/dotbot/tests/test_calibration_ota.py new file mode 100644 index 00000000..856ff958 --- /dev/null +++ b/dotbot/tests/test_calibration_ota.py @@ -0,0 +1,109 @@ +# SPDX-FileCopyrightText: 2026-present Inria +# SPDX-License-Identifier: BSD-3-Clause + +"""Tests for the over-the-air LH2 capture decoding + collection logic. + +These exercise our host-side orchestration (payload decode, trigger/wait/ +retry) with a fake client. They are NOT a substitute for hardware-in-the- +loop validation of the actual swarmit transport - the fake stands in only +for the SwarmitClient surface, never for Mari/MQTT/serial behavior. +""" + +import threading + +from dotbot.calibration.ota import ( + CaptureSession, + parse_capture_payload, +) + +_TAG = 0xCA + + +def _record(lh_index: int, count1: int, count2: int) -> bytes: + return ( + bytes([lh_index]) + count1.to_bytes(4, "little") + count2.to_bytes(4, "little") + ) + + +def _payload(*records: bytes) -> bytes: + return bytes([_TAG]) + b"".join(records) + + +def test_parse_empty_or_untagged_returns_nothing(): + assert parse_capture_payload(b"", _TAG) == [] + # A regular text log line: first byte is not the tag. + assert parse_capture_payload(b"hello world", _TAG) == [] + + +def test_parse_single_sample(): + samples = parse_capture_payload(_payload(_record(0, 49341, 85887)), _TAG) + assert len(samples) == 1 + assert samples[0].lh_index == 0 + assert samples[0].count1 == 49341 + assert samples[0].count2 == 85887 + + +def test_parse_multiple_samples(): + samples = parse_capture_payload( + _payload(_record(0, 1, 2), _record(1, 3, 4), _record(2, 5, 6)), + _TAG, + ) + assert [(s.lh_index, s.count1, s.count2) for s in samples] == [ + (0, 1, 2), + (1, 3, 4), + (2, 5, 6), + ] + + +def test_parse_ignores_trailing_partial_record(): + # Tag + one full 9-byte record + 3 stray bytes that can't form a record. + data = _payload(_record(0, 7, 8)) + b"\x01\x02\x03" + samples = parse_capture_payload(data, _TAG) + assert len(samples) == 1 + assert (samples[0].count1, samples[0].count2) == (7, 8) + + +class _FakeClient: + """Minimal SwarmitClient stand-in: emits one tagged event per trigger. + + Mirrors the real firmware contract (samples only arrive in reply to a + capture request), so CaptureSession's drain-then-trigger ordering is + exercised the same way it is against a bot. + """ + + def __init__(self, device: str, records: bytes): + self._device = device.upper() + self._records = records + self._triggered = threading.Event() + + def request_lh2_capture(self, device: str) -> None: + self._triggered.set() + + def watch_log_events(self): + while True: + if self._triggered.wait(timeout=0.05): + self._triggered.clear() + yield { + "addr": self._device, + "data_hex": _payload(self._records).hex(), + } + + +def test_capture_session_returns_triggered_sample(): + client = _FakeClient("ABCD", _record(0, 111, 222)) + with CaptureSession(client, "abcd", _TAG) as session: + sample = session.capture(lh_index=0, timeout=2.0, retries=2) + assert sample.lh_index == 0 + assert (sample.count1, sample.count2) == (111, 222) + + +def test_capture_session_ignores_other_devices(): + # Event addressed to a different bot must not satisfy the capture. + client = _FakeClient("FFFF", _record(0, 1, 2)) + with CaptureSession(client, "ABCD", _TAG) as session: + try: + session.capture(lh_index=0, timeout=0.3, retries=0) + except TimeoutError: + pass + else: + raise AssertionError("expected TimeoutError for mismatched addr") diff --git a/pyproject.toml b/pyproject.toml index 51864be1..329202dc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,7 +50,7 @@ dependencies = [ "intelhex >= 2.3.0", "marilib-pkg >= 0.9.0rc3", "pydotbot-utils >= 0.3.0", - "swarmit >= 0.8.0rc2", + "swarmit >= 0.8.0rc3", "toml >= 0.10.2", "tomlkit >= 0.13.0", ]