[Python] Add async components for WebBridgeController#13
Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces an experimental asyncio-based bridge for the Constellation Python core, providing async equivalents of existing threaded ZMQ subscriber pooling and heartbeat monitoring, plus a prototype “combined bridge” that runs heartbeat/CMDP processing in an event loop while keeping CHIRP discovery in a thread.
Changes:
- Adds
AsyncSubscriberPool(async ZMQ SUB socket pool usingzmq.asyncio). - Adds
AsyncHeartbeatReceiver(async heartbeat polling with lives/stale-connection tracking). - Adds
CombinedBridgeprototype wiring CHIRP discovery (thread) + heartbeat + CMDP receive + CSCP command sending.
Reviewed changes
Copilot reviewed 3 out of 4 changed files in this pull request and generated 13 comments.
| File | Description |
|---|---|
python/constellation/core/async_experimental/bridge.py |
Prototype combined bridge coordinating CHIRP discovery, async heartbeat/CMDP processing, and CSCP command dispatch. |
python/constellation/core/async_experimental/async_pools.py |
Async subscriber pool abstraction for CMDP-style PUB/SUB sockets. |
python/constellation/core/async_experimental/async_heartbeat.py |
Async heartbeat receiver that tracks satellite state and liveness via CHP messages. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| import asyncio | ||
| import threading | ||
| import time | ||
| from uuid import UUID |
There was a problem hiding this comment.
Repository modules under python/constellation/core/* consistently start with an SPDX/license docstring. This new module starts directly with imports, which likely breaks the project’s licensing/header convention—please add the standard SPDX header docstring at the top of the file.
| def shutdown(self) -> None: | ||
| """Stop everything.""" | ||
| self._stop.set() | ||
| self._stop_thread.set() | ||
| if hasattr(self, '_chirp_thread'): | ||
| self._chirp_thread.join(timeout=2) | ||
| self._heartbeat.close() | ||
| self._cmdp_pool.close() | ||
| # Close CSCP transmitters | ||
| with self._transmitter_lock: | ||
| for ct in self._transmitters.values(): | ||
| ct.close() | ||
| self._ctx.term() | ||
| self._sync_ctx.term() | ||
|
|
There was a problem hiding this comment.
shutdown() closes sockets/contexts immediately after setting the stop events, but run() may still be polling/receiving when shutdown() is called (especially in the example main, where shutdown happens before awaiting task completion). Closing ZMQ resources while tasks are still using them can raise exceptions or leave tasks stuck. Consider making shutdown async (set stop, await run() completion, then close/term contexts), or ensure callers await the run task finishing before closing resources.
| @dataclass | ||
| class HeartbeatState: | ||
| """Tracked state for a single satellite.""" | ||
| host: UUID | ||
| name: str | ||
| state: SatelliteState = SatelliteState.DEAD | ||
| last_refresh: float = field(default_factory=lambda: asyncio.get_event_loop().time()) | ||
| last_statechange: datetime = field(default_factory=datetime.now) | ||
| interval_ms: int = 2000 | ||
| lives: int = 3 | ||
| role: CHPRole = CHPRole.DYNAMIC | ||
|
|
||
| def refresh(self) -> None: | ||
| self.last_refresh = asyncio.get_event_loop().time() | ||
|
|
||
| def seconds_since_refresh(self) -> float: | ||
| return asyncio.get_event_loop().time() - self.last_refresh | ||
|
|
There was a problem hiding this comment.
HeartbeatState uses asyncio.get_event_loop().time() for monotonic timing. get_event_loop() is deprecated in newer Python versions and can behave unexpectedly when no loop is set; also, the timing here doesn’t need to be tied to an event loop. Prefer time.monotonic() (as used elsewhere in the codebase) or capture loop = asyncio.get_running_loop() once in AsyncHeartbeatReceiver and use loop.time() consistently.
| def _process_heartbeat(self, uuid: UUID, msg: list[bytes]) -> None: | ||
| """Process a received heartbeat message.""" | ||
| try: | ||
| name, timestamp, state_val, flags, interval, status = chp_decode_message(msg) | ||
| state = SatelliteState(state_val) | ||
|
|
||
| hb = self._states.get(uuid) | ||
| if hb is None: | ||
| return | ||
|
|
||
| # Update name if it was unknown | ||
| if hb.name != name: | ||
| hb.name = name | ||
|
|
||
| # Detect state change | ||
| if state != hb.state: | ||
| old_state = hb.state | ||
| hb.state = state | ||
| hb.last_statechange = datetime.now() | ||
| if self._on_state_change: | ||
| self._on_state_change(name, old_state, state) | ||
|
|
||
| # Update tracking | ||
| hb.refresh() | ||
| hb.interval_ms = interval | ||
| hb.role = CHPRole.from_flags(flags) | ||
|
|
||
| # Refresh lives | ||
| if hb.lives != self.INIT_LIVES: | ||
| hb.lives = self.INIT_LIVES | ||
|
|
||
| except Exception as e: | ||
| # Malformed heartbeat (ignore) | ||
| pass | ||
|
|
There was a problem hiding this comment.
The broad except Exception: pass here will hide malformed heartbeat messages and also unexpected bugs in decoding/state handling. Consider at least logging at debug/trace level (and/or counting drops) so real issues aren’t silently ignored during testing and future extension.
|
@stephanlachnit Ready for your feedback. Copilot flagged some cleanup items (SPDX headers, unused imports, DEPART handling) I'll address those based on your direction on the overall approach. |
|
Woooow, sorry for the Copilot noise, this is incredibly annoying. I (think I) have switched it off now for the repo. |
|
I think this is quite promising - it looks like the code changes required to switch to async are not too big. If I get it right, we use |
|
@stephanlachnit Yes, that's exactly right. CHIRP stays as the one thread since it uses raw UDP multicast rather than ZeroMQ so One thing I wanted to check is whether you'd prefer we integrate with the existing |
|
@stephanlachnit |
| def subscribe(self, topic: str, uuid: UUID | None = None) -> None: | ||
| """Subscribe to a topic on one or all sockets. | ||
|
|
||
| When uuid is None, topic is added to _topics so future sockets added | ||
| via add_socket() are automatically subscribed. When uuid is given, | ||
| only that socket is subscribed and _topics is not modified. | ||
| """ | ||
| if uuid is None: | ||
| for sock in self._sockets.values(): | ||
| sock.setsockopt_string(zmq.SUBSCRIBE, topic) | ||
| if topic not in self._topics: | ||
| self._topics.append(topic) | ||
| else: | ||
| sock = self._sockets.get(uuid) | ||
| if sock is not None: | ||
| sock.setsockopt_string(zmq.SUBSCRIBE, topic) | ||
|
|
||
| def unsubscribe(self, topic: str, uuid: UUID | None = None) -> None: | ||
| """Unsubscribe from a topic on one or all sockets. | ||
|
|
||
| When uuid is None, topic is removed from _topics. When uuid is given, | ||
| only that socket is unsubscribed and _topics is not modified. | ||
| """ | ||
| if uuid is None: | ||
| for sock in self._sockets.values(): | ||
| sock.setsockopt_string(zmq.UNSUBSCRIBE, topic) | ||
| if topic in self._topics: | ||
| self._topics.remove(topic) | ||
| else: | ||
| sock = self._sockets.get(uuid) | ||
| if sock is not None: | ||
| sock.setsockopt_string(zmq.UNSUBSCRIBE, topic) |
There was a problem hiding this comment.
While the idea is nice, these methods require a bit more work and should be moved to child class which handles topics more carefully - you can easily get out of sync with topics for single senders, and unfortunately ZeroMQ only tracks total topic subscriptions but not per-sender topic subscriptions, so one has to ensure that topics are not subscribed twice by the same sender.
In C++ we have the CMDPListener for this: https://gitlab.desy.de/constellation/constellation/-/blob/main/cxx/constellation/listener/CMDPListener.hpp
But I think this is not super important for now, so let's postpone this
Nice, this is how I imagined it! |
…ibe semantics and CSCP thread safety
f72cead to
7c5d0a9
Compare
|
@stephanlachnit Implemented async CHIRP using |
Async equivalents of the threading components, explored as part of Issue #335.
AsyncSubscriberPool is an async equivalent of SubscriberPool using zmq.asyncio. AsyncHeartbeatReceiver is an async equivalent of HeartbeatChecker with full lives and stale connection logic. CombinedBridge is a prototype combining CHIRP discovery in a thread with heartbeat tracking, CMDP receiving, and CSCP commands all running in the asyncio event loop.
Tested against PyRandomTransmitter through a full FSM cycle. The bridge detected 4 state transitions, received 7 log messages, and sent 3 commands successfully. The architecture reduces threading from 5 threads plus task queue to 1 thread for CHIRP plus asyncio.
To run, start PyRandomTransmitter in one terminal with
python -m constellation.satellites.PyRandomTransmitter -g test -n Sat1and run the bridge in another withpython -m python.constellation.core.async_experimental.bridge.Related: Issue #335