Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Changelog

All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.2.0] - 2025-09-11

### Added

- Advertisement / scan response filtering enabled by default requiring the Nordic UART Service UUID; new `--adv-filter / --no-adv-filter` CLI flag and `LoggerSettings.require_adv_nus` to control it.
- Early scan termination when a preferred address substring is observed (used internally to speed up reconnect loops).
- Support scanning without a `--name` filter (wildcard mode) – name now defaults to empty string instead of requiring a parameter.
- Wizard flow enhancements: ability to disable advertisement filter mid-flow; clearer display of unnamed devices.
- New `NUSClient.connect_discovered` method allowing a two-step scan-then-connect pattern for custom selection logic.
- Added test ensuring `--filter-addr` can be used without specifying a name.

### Changed

- Connection logic refactored: initial scan and selection separated from connection to improve UX and provide warnings when multiple candidates match.
- `NUSClient.scan` now returns devices even if they have an empty name when wildcard scanning; includes new parameters `early_addr_substring` and `require_adv_nus`.
- Improved reconnection loop: re-scan leverages early address substring matching for faster recovery.
- README updated with new CLI flags, troubleshooting hint, and dedicated section explaining advertisement filtering.

### Fixed

- Better handling of devices omitting advertised names (no longer silently excluded when scanning without a name filter).

### Internal

- Additional logging around early scan termination and selection.

## [0.1.3] - 2025-09-09

- Previous release (see git history for details).

[0.2.0]: https://pypi.org/project/nus-logger/0.2.0/
[0.1.3]: https://pypi.org/project/nus-logger/0.1.3/
54 changes: 34 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,20 @@ Press Ctrl-C to stop. By default the tool will auto‑reconnect after an unexpec

Environment variables override flags when corresponding flags are omitted.

| Flag | Description |
| ----------------------------- | -------------------------------------------------------------------- |
| `-h, --help` | Show CLI help |
| `--wizard` | Interactive scan & option wizard (default when no args) |
| `--list` | List visible devices then exit |
| `--name SUBSTR` | Match advertising name |
| `--filter-addr SUBSTR` | Prefer address containing substring |
| `--ts` / `--ts-local` | Add UTC or local timestamps (mutually exclusive) |
| `--raw` | Show hex bytes |
| `--logfile PATH` | Append decoded lines to file (relative or absolute path) |
| `--timeout SECS` | Scan / connect timeout |
| `--verbose` | Dump discovered GATT structure once |
| `--reconnect, --no-reconnect` | Automatically rescan & reconnect after disconnect (default: enabled) |
| Flag | Description |
| -------------------------------- | ------------------------------------------------------------------------------------------------------- |
| `-h, --help` | Show CLI help |
| `--wizard` | Interactive scan & option wizard (default when no args) |
| `--list` | List visible devices then exit |
| `--name SUBSTR` | Match advertising name |
| `--filter-addr SUBSTR` | Prefer address containing substring |
| `--adv-filter / --no-adv-filter` | Require (default) or disable requiring that the NUS 128-bit UUID appears in advertisement/scan response |
| `--ts` / `--ts-local` | Add UTC or local timestamps (mutually exclusive) |
| `--raw` | Show hex bytes |
| `--logfile PATH` | Append decoded lines to file (relative or absolute path) |
| `--timeout SECS` | Scan / connect timeout |
| `--verbose` | Dump discovered GATT structure once |
| `--reconnect, --no-reconnect` | Automatically rescan & reconnect after disconnect (default: enabled) |

</details>

Expand All @@ -95,13 +96,26 @@ To stream the Zephyr logging subsystem over BLE for `nus-logger` to consume you

## Troubleshooting

| Situation | Hint |
| -------------------------------- | --------------------------------------------------------------------------- |
| No devices on Windows | Toggle Bluetooth off/on or airplane mode, verify advertising. |
| Linux permission errors | Ensure user in `bluetooth` group or grant `CAP_NET_RAW` to Python binary. |
| macOS permission prompt | Allow Bluetooth access in System Settings > Privacy & Security > Bluetooth. |
| Disconnects | Reduce distance / interference. |
| Mixed devices with similar names | Use `--filter-addr` to prefer a known address substring. |
| Situation | Hint |
| ----------------------------------- | ---------------------------------------------------------------------------------------- |
| No devices on Windows | Toggle Bluetooth off/on or airplane mode, verify advertising. |
| Linux permission errors | Ensure user in `bluetooth` group or grant `CAP_NET_RAW` to Python binary. |
| macOS permission prompt | Allow Bluetooth access in System Settings > Privacy & Security > Bluetooth. |
| Disconnects | Reduce distance / interference. |
| Mixed devices with similar names | Use `--filter-addr` to prefer a known address substring. |
| Device not found but is advertising | Your firmware may omit the NUS UUID from advertising data. Retry with `--no-adv-filter`. |

### Advertisement / Scan Response Filtering

By default `nus-logger` filters discovered devices to only those whose advertising data (including scan responses) lists the Nordic UART Service UUID (`6E400001-B5A3-F393-E0A9-E50E24DCCA9E`). This reduces false positives when multiple similarly named devices are present.

Some firmware builds intentionally omit 128‑bit service UUIDs to save advertising space. If your device is not being found, disable this filter:

```bash
nus-logger --name my-device --no-adv-filter
```

Platform note: Bleak typically performs active scanning (requesting scan responses). On platforms/backends where only passive advertising data is available, the UUID may also be missing—disabling the filter provides a fallback.

## Development

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "nus-logger"
version = "0.1.3"
version = "0.2.0"
description = "Nordic UART Service (NUS) BLE logger."
readme = "README.md"
requires-python = ">=3.9"
Expand Down
115 changes: 84 additions & 31 deletions src/nus_logger/ble_nus.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,45 +63,87 @@ def on_bytes(self, callback: Callable[[bytes], None]) -> None:
self._notify_cb = callback

# ------------------------------------------------------------------
async def scan(self, name: str, timeout: float, adapter: Optional[str] = None) -> List[DiscoveredDevice]:
async def scan(
self,
name: str,
timeout: float,
adapter: Optional[str] = None,
early_addr_substring: Optional[str] = None,
require_adv_nus: bool = True,
) -> List[DiscoveredDevice]:
"""Scan for devices whose name equals or contains `name`.

Uses a detection callback to access `AdvertisementData.rssi` (avoiding deprecated
BLEDevice.rssi) and returns candidates sorted by strongest RSSI.
If `name` is empty, all devices with a non-empty name are returned.
Behaviour:
* Collect all advertising devices during the scan window (up to `timeout`).
* If `early_addr_substring` is provided, the scan will terminate early as soon as a
device matching BOTH the name filter (or wildcard) and the address substring
is observed. This accelerates reconnection loops where the target device is already
back in range and there's no need to wait the full timeout.
* Returns candidates sorted by strongest RSSI.
* If `name` is empty, all devices are considered (including those without a name).
"""
seen: dict[str, tuple[BLEDevice, AdvertisementData]] = {}

def _detection(device: BLEDevice, adv: AdvertisementData): # pragma: no cover - BLE runtime
if device and device.address:
seen[device.address] = (device, adv)
early_event = asyncio.Event()
lname = name.lower()
early_sub = early_addr_substring.lower() if early_addr_substring else None

def _detection(device: BLEDevice, adv: AdvertisementData): # pragma: no cover - BLE runtime path
if not device or not device.address:
return
seen[device.address] = (device, adv)
if early_sub:
dname = (device.name or "").strip()
# Name must match (unless no name filter supplied) AND address substring matches
if ((not name) or (dname and lname in dname.lower())) and early_sub in device.address.lower():
# Signal early exit; the loop below will stop scanner promptly.
if not early_event.is_set():
early_event.set()
Comment thread
smnmsr marked this conversation as resolved.

scanner = BleakScanner(detection_callback=_detection, adapter=adapter)
await scanner.start()
try:
await asyncio.sleep(timeout)
loop = asyncio.get_running_loop()
deadline = loop.time() + timeout
# Poll until timeout or early match
while loop.time() < deadline:
if early_event.is_set():
self._log.debug(
"Early scan stop triggered by preferred address match '%s'", early_addr_substring)
break
await asyncio.sleep(0.1)
finally:
await scanner.stop()

matches: List[DiscoveredDevice] = []
lname = name.lower()
for _, (dev, adv) in seen.items():
dname = (dev.name or "").strip()
if not dname:
continue
if not name or lname in dname.lower():
rssi_val = adv.rssi if adv and adv.rssi is not None else -200
# Minimal metadata (avoid deprecated BLEDevice.metadata)
meta = {"manufacturer_data": dict(
adv.manufacturer_data) if adv.manufacturer_data else {}}
matches.append(
DiscoveredDevice(
address=dev.address,
name=dname,
rssi=rssi_val,
metadata=meta,
)
if name: # name filter active
if not dname:
continue # cannot match
if lname not in dname.lower():
continue
Comment thread
smnmsr marked this conversation as resolved.
# Optional filter: require that the advertisement (including scan response)
# lists the Nordic UART Service UUID. Some firmwares omit 128-bit UUIDs
# to save space; users can disable this via CLI / settings if needed.
if require_adv_nus:
try:
svc_uuids = [u.lower() for u in (adv.service_uuids or [])]
except AttributeError: # pragma: no cover - defensive for older bleak
svc_uuids = []
if NUS_SERVICE_UUID.lower() not in svc_uuids:
continue
rssi_val = adv.rssi if adv and adv.rssi is not None else -200
meta = {
"manufacturer_data": dict(adv.manufacturer_data) if adv.manufacturer_data else {},
}
matches.append(
DiscoveredDevice(
address=dev.address,
name=dname, # may be empty
rssi=rssi_val,
metadata=meta,
)
)
matches.sort(key=lambda x: x.rssi, reverse=True)
return matches

Expand All @@ -112,6 +154,7 @@ async def scan_and_connect(
timeout: float,
adapter: Optional[str] = None,
preferred_addr_substring: Optional[str] = None,
require_adv_nus: bool = True,
) -> DiscoveredDevice:
"""Scan and connect to the best matching device.

Expand All @@ -120,7 +163,13 @@ async def scan_and_connect(
* If multiple and `preferred_addr_substring` matches, prefer those.
* Then pick highest RSSI.
"""
candidates = await self.scan(name=name, timeout=timeout, adapter=adapter)
candidates = await self.scan(
name=name,
timeout=timeout,
adapter=adapter,
early_addr_substring=preferred_addr_substring,
require_adv_nus=require_adv_nus,
)
if not candidates:
raise BleakError(
f"No device found matching name substring '{name}'.")
Expand All @@ -135,25 +184,31 @@ async def scan_and_connect(
self._log.debug(
"Selected device %s (%s) RSSI=%s dBm", target.name, target.address, target.rssi
)
await self.connect_discovered(target)
return target

# ------------------------------------------------------------------
async def connect_discovered(self, device: DiscoveredDevice) -> None:
"""Connect to a previously discovered `DiscoveredDevice`.

This is factored out so callers can perform a scan separately (e.g. to
implement custom selection warnings) before connecting.
"""
def _handle_disconnect(_: BleakClient): # pragma: no cover - runtime path
# Bleak expects a sync callback; keep minimal work here.
self._log.debug("Device disconnected callback fired")
self._connected_event.clear()

# Pass disconnect callback directly (deprecated set_disconnected_callback removed in future bleak)
client = BleakClient(
target.address, disconnected_callback=_handle_disconnect)
device.address, disconnected_callback=_handle_disconnect)

try:
await client.connect()
except BleakError:
raise

# Discover NUS service (prefer property, fallback if not yet populated)
svcs = getattr(client, "services", None)
if not svcs: # pragma: no cover - depends on bleak version
# Older bleak still exposes get_services during transition
try: # type: ignore[attr-defined]
# type: ignore[attr-defined]
svcs = await client.get_services()
Expand All @@ -173,12 +228,10 @@ def _handle_disconnect(_: BleakClient): # pragma: no cover - runtime path
self._tx_char = tx.uuid
self._rx_char = rx.uuid

# Subscribe to notifications
assert self._tx_char is not None
# type: ignore[arg-type]
await client.start_notify(self._tx_char, self._notification_handler)
self._connected_event.set()
return target

# ------------------------------------------------------------------
# type: ignore[override]
Expand Down
11 changes: 10 additions & 1 deletion src/nus_logger/logger_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class LoggerSettings:
raw: bool = False # include hex
logfile: Optional[str] = None
adapter: Optional[str] = None # platform specific (Linux hciX)
require_adv_nus: bool = True # filter by advertised NUS UUID


@dataclass
Expand Down Expand Up @@ -78,7 +79,14 @@ async def update_settings(self, **kwargs) -> LoggerSettings:

async def scan(self, name: str = "", timeout: Optional[float] = None) -> List[DiscoveredDevice]:
timeout = timeout if timeout is not None else self._settings.timeout
return await self._client.scan(name=name, timeout=timeout, adapter=self._settings.adapter)
# No early stop outside reconnect context here
return await self._client.scan(
name=name,
timeout=timeout,
adapter=self._settings.adapter,
early_addr_substring=None,
require_adv_nus=self._settings.require_adv_nus,
)

async def connect(self, name: Optional[str] = None, filter_addr: Optional[str] = None) -> None:
if name is not None:
Expand Down Expand Up @@ -198,6 +206,7 @@ async def _run_loop(self) -> None:
timeout=self._settings.timeout,
adapter=self._settings.adapter,
preferred_addr_substring=self._settings.filter_addr,
require_adv_nus=self._settings.require_adv_nus,
)
self._connecting = False
await self._client.run_until_disconnect()
Expand Down
Loading
Loading