Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions dotbot/calibration/lighthouse2.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import datetime
import math
import os
import re
import tomllib
from dataclasses import dataclass
from pathlib import Path
Expand Down Expand Up @@ -189,6 +190,18 @@ def _build_calibration_payload(
return bytes(payload)


def _slug_tag(tag: str) -> str:
"""Filename-safe slug for a free-form calibration tag.

Keeps ASCII letters, digits, dot, dash and underscore; collapses any
other run of characters to a single dash and trims dashes and dots off
the ends (so a tag like "../x" can't smuggle in a leading ".."). Returns
"" when nothing usable remains, so callers can treat the tag as absent.
The slug is safe to drop into both a filename and a TOML string.
"""
return re.sub(r"[^A-Za-z0-9._-]+", "-", tag).strip("-.")


def _parse_calibration_payload(payload: bytes) -> list[bytes]:
"""Inverse of `_build_calibration_payload`: yields the per-LH 36-byte
matrix chunks. Used when loading from either TOML or legacy .out."""
Expand Down Expand Up @@ -364,14 +377,19 @@ def load_calibration(self) -> list[bytes]:
return _parse_calibration_payload(self.calibration_output_path.read_bytes())
return []

def save_calibration(self) -> Path:
def save_calibration(self, tag: Optional[str] = None) -> Path:
"""Save the calibration as a timestamped TOML file (+ legacy .out).

The TOML file is the new primary record: versioned, metadata-
bearing, human-inspectable. The legacy `.out` file is also
written so external consumers (swarmit OTA, dotbot-provision)
keep working until they learn to read TOML.

`tag`, when given, is a free-form arena/setup label (e.g.
"office-2x2m"); a filename-safe slug of it is inserted into the
filename and recorded under `[metadata]` so the calibration stays
self-describing even after a rename.

Returns the path of the TOML file just written, and also stores
it on `self.last_saved_toml_path` so a caller that lost the
return value (e.g. the TUI handler) can still surface it after
Expand All @@ -383,7 +401,14 @@ def save_calibration(self) -> Path:
# Filename-safe variant of ISO 8601: `:` is rejected on Windows
# and a footgun on some Unix tools.
ts_for_filename = now.strftime("%Y-%m-%dT%H-%M-%SZ")
toml_path = CALIBRATION_DIR / f"calibration-{ts_for_filename}.toml"
slug = _slug_tag(tag) if tag else ""
stem = (
f"calibration-{slug}-{ts_for_filename}"
if slug
else f"calibration-{ts_for_filename}"
)
toml_path = CALIBRATION_DIR / f"{stem}.toml"
tag_line = f'tag = "{slug}"\n' if slug else ""
# Explicit UTF-8 — TOML is spec'd as UTF-8, and Path.write_text
# defaults to the platform encoding (cp1252 on Windows), which
# mangles any non-ASCII byte and breaks the tomllib reader.
Expand All @@ -394,6 +419,7 @@ def save_calibration(self) -> Path:
f'created_at = "{now.strftime("%Y-%m-%dT%H:%M:%SZ")}"\n'
f"calibration_distance_mm = {int(self.calibration_distance)}\n"
f"num_lh_stations = {1 + self.extra_lh_num}\n"
f"{tag_line}"
"\n"
"[calibration]\n"
"# 1-byte homography count + N x 36-byte int32 LE matrices,\n"
Expand Down
134 changes: 134 additions & 0 deletions dotbot/calibration/ota.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# SPDX-FileCopyrightText: 2026-present Inria
# SPDX-License-Identifier: BSD-3-Clause

"""Over-the-air LH2 calibration collection (swarmit transport).

Variant A: a single DotBot, no serial cable. The bot's secure bootloader
samples its own raw LH2 counts on request (READY mode only) and ships them
back inside a SWARMIT_EVENT_LOG. This module triggers one capture per arena
corner and decodes the samples; the homography solve and save live in
`lighthouse2.LighthouseManager`, exactly as in the serial flow.
"""

from __future__ import annotations

import queue
import threading
import time
from collections.abc import Callable

from dotbot.calibration.lighthouse2 import LH2CalibrationSample

# The four reference corners, in the order LighthouseManager expects them:
# it zips the collected counts against REFERENCE_POINTS_DEFAULT positionally
# (top-left, top-right, bottom-left, bottom-right), so the collection order
# is load-bearing, not cosmetic.
CORNERS = ("top-left", "top-right", "bottom-left", "bottom-right")

CAPTURE_TIMEOUT_DEFAULT = 5.0
CAPTURE_RETRIES_DEFAULT = 3

# Each raw sample inside the LOG payload is [lh_index:1][count1:4 LE][count2:4 LE].
_SAMPLE_SIZE = 9


def parse_capture_payload(data: bytes, tag: int) -> list[LH2CalibrationSample]:
"""Decode a SWARMIT_EVENT_LOG payload of raw LH2 samples.

Layout (mirrors the swarmit bootloader): a 1-byte `tag`, then N
fixed-size records. Returns [] for any payload that is not a capture
(regular text log lines do not carry `tag` as their first byte).
"""
if len(data) < 1 or data[0] != tag:
return []
body = data[1:]
samples: list[LH2CalibrationSample] = []
for off in range(0, len(body) - _SAMPLE_SIZE + 1, _SAMPLE_SIZE):
lh_index = body[off]
count1 = int.from_bytes(body[off + 1 : off + 5], "little")
count2 = int.from_bytes(body[off + 5 : off + 9], "little")
samples.append(LH2CalibrationSample(lh_index, count1, count2))
return samples


class CaptureSession:
"""One shared log-event stream for a whole collect session.

The bot only emits raw counts in reply to a trigger, so nothing arrives
unsolicited - a single `watch_log_events()` stream serves every corner.
A background reader thread decodes samples addressed to `device` into a
queue; `capture()` triggers and waits, re-triggering on timeout because
the trigger send is best-effort (no transport-level ack).
"""

def __init__(self, client, device: str, tag: int):
self._client = client
self._device = device.upper()
self._tag = tag
self._queue: queue.Queue = queue.Queue()
self._stop = threading.Event()
self._thread = threading.Thread(target=self._reader, daemon=True)

def __enter__(self) -> CaptureSession:
self._thread.start()
return self

def __exit__(self, *exc) -> None:
self._stop.set()

def _reader(self) -> None:
try:
for event in self._client.watch_log_events():
if self._stop.is_set():
break
if str(event.get("addr", "")).upper() != self._device:
continue
data = bytes.fromhex(event.get("data_hex", ""))
for sample in parse_capture_payload(data, self._tag):
self._queue.put(sample)
except Exception as exc: # surfaced on the next capture() get()
self._queue.put(exc)

def capture(
self,
lh_index: int,
timeout: float,
retries: int,
on_attempt: Callable[[int, int], None] | None = None,
) -> LH2CalibrationSample:
"""Trigger a capture and return the first sample for `lh_index`.

Retries the trigger up to `retries` times; raises TimeoutError if
no matching sample arrives. `on_attempt(n, total)` runs just before
each trigger so callers can show progress during the otherwise silent
wait.
"""
# Discard anything left over from the previous corner.
while not self._queue.empty():
self._queue.get_nowait()

attempts = retries + 1
for attempt in range(attempts):
if on_attempt is not None:
on_attempt(attempt + 1, attempts)
self._client.request_lh2_capture(self._device)
deadline = time.monotonic() + timeout
while True:
remaining = deadline - time.monotonic()
if remaining <= 0:
break
try:
item = self._queue.get(timeout=remaining)
except queue.Empty:
break
if isinstance(item, Exception):
raise item
if item.lh_index == lh_index:
return item
# A sample for a different lighthouse: ignore, keep waiting.

raise TimeoutError(
f"no LH{lh_index} sample from {self._device} after "
f"{retries + 1} attempt(s); is the bot in READY (app stopped) "
f"and in view of the lighthouse?"
)
17 changes: 10 additions & 7 deletions dotbot/cli/calibrate.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
# SPDX-FileCopyrightText: 2026-present Inria
# SPDX-License-Identifier: BSD-3-Clause

"""`dotbot run lh2-calibration` LH2 calibration (serial side).
"""`dotbot run lh2-calibration` - LH2 calibration.

Native subgroup mounting the vendored `dotbot.calibration` package.
Serial-attached, single-device operations. OTA / swarm-wide
counterparts will live under `dotbot swarm calibrate-lh2`.
Native subgroup mounting the vendored `dotbot.calibration` package, for
single-device calibration over either transport.

Subcommands:

Expand All @@ -15,6 +14,10 @@
<path>. Today the only consumer is the swarmit secure
bootloader (it #includes the file at compile time).

Cable-free, over-the-air calibration of a DotBot in the arena lives under
`dotbot swarm lh2-calibration` (it drives the fleet transport, not a serial
DK).

Calibration runtime deps (`opencv-python`, `textual`) live behind the
`[calibrate]` extra; ImportError at subcommand invocation prints an
install hint instead of a traceback.
Expand Down Expand Up @@ -47,7 +50,7 @@ def _run_tui(ctx: click.Context) -> None:

@click.group(
name="lh2-calibration",
help="LH2 calibration: capture, apply, export (serial-side / single device).",
help="LH2 calibration for one serial-attached device: capture, apply.",
invoke_without_command=True,
)
@click.pass_context
Expand Down Expand Up @@ -80,8 +83,8 @@ def _collect(ctx: click.Context) -> None:
help=(
"Write the saved calibration as a C header to PATH. Today the "
"consumer is the swarmit secure bootloader (#includes the file "
"at compile time). OTA / runtime equivalents will live under "
"`dotbot swarm calibrate-lh2 apply`."
"at compile time). The over-the-air / runtime equivalent is "
"`dotbot swarm lh2-calibration push`."
),
)
@click.argument(
Expand Down
13 changes: 13 additions & 0 deletions dotbot/cli/swarm.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,19 @@ def _with_config_injection(swarmit_group):
@click.pass_context
def cmd(ctx, args):
args = list(args)
# `lh2-calibration` is PyDotBot-native (the homography solve lives
# here, not in swarmit), so intercept it before the passthrough and
# hand off to our own group, carrying the resolved config along.
if args and args[0] == "lh2-calibration":
from dotbot.cli.swarm_lh2 import cmd as lh2_group

lh2_group.main(
args=args[1:],
prog_name="dotbot swarm lh2-calibration",
standalone_mode=True,
obj=ctx.obj,
)
return
final = inject_config(args, ctx.obj) if args else args
_run_swarmit(swarmit_group, final)

Expand Down
Loading
Loading