Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion gently/mesh/capability_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ def _get_system_info() -> dict[str, Any]:
if platform.system() == "Windows":
import ctypes

kernel32 = ctypes.windll.kernel32
# ctypes.windll exists only on Windows; this branch is platform-guarded.
kernel32 = ctypes.windll.kernel32 # type: ignore[attr-defined]
mem_status = ctypes.c_ulonglong()
kernel32.GetPhysicallyInstalledSystemMemory(ctypes.byref(mem_status))
ram_gb = round(mem_status.value / (1024 * 1024), 1)
Expand Down
5 changes: 4 additions & 1 deletion gently/mesh/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ def __init__(
self._known_ids: set = set()
self.transport: asyncio.DatagramTransport | None = None

def connection_made(self, transport: asyncio.DatagramTransport):
def connection_made(self, transport: asyncio.BaseTransport) -> None:
# DatagramProtocol always receives a DatagramTransport here; narrow for
# the typed attribute (and matches the asyncio.BaseProtocol signature).
assert isinstance(transport, asyncio.DatagramTransport)
self.transport = transport

def datagram_received(self, data: bytes, addr: tuple):
Expand Down
53 changes: 27 additions & 26 deletions gently/mesh/peer_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def __init__(self, pairing_manager=None, audit_log=None):
self._audit_log = audit_log
self._pinning_verified: set = set() # track first-success per peer

async def _ensure_session(self):
async def _ensure_session(self) -> aiohttp.ClientSession:
if self._session is None or self._session.closed:
timeout = aiohttp.ClientTimeout(total=settings.mesh.fetch_timeout_s)
# Use permissive SSL context — we verify by cert fingerprint, not CA chain
Expand All @@ -39,6 +39,7 @@ async def _ensure_session(self):
connector = aiohttp.TCPConnector(ssl=ssl_ctx)
self._session = aiohttp.ClientSession(timeout=timeout, connector=connector)
self._pinning_verified.clear()
return self._session

def _auth_headers(self, peer: PeerInfo) -> dict[str, str]:
"""Build auth headers for a trusted peer."""
Expand Down Expand Up @@ -103,12 +104,12 @@ async def fetch_peer_info(self, peer: PeerInfo) -> dict[str, Any] | None:

Returns the parsed JSON dict on success, or None on failure.
"""
await self._ensure_session()
session = await self._ensure_session()
url = f"{peer.base_url}/api/mesh/status"
headers = self._auth_headers(peer)
ssl_fp = self._ssl_for_peer(peer)
try:
async with self._session.get(url, headers=headers, ssl=ssl_fp) as resp:
async with session.get(url, headers=headers, ssl=ssl_fp) as resp:
if resp.status == 200:
self._log_pinning_success(peer, ssl_fp)
return await resp.json()
Expand All @@ -126,12 +127,12 @@ async def fetch_peer_info(self, peer: PeerInfo) -> dict[str, Any] | None:

async def fetch_peer_campaigns(self, peer: PeerInfo) -> list | None:
"""GET /api/campaigns from a peer."""
await self._ensure_session()
session = await self._ensure_session()
url = f"{peer.base_url}/api/campaigns"
headers = self._auth_headers(peer)
ssl_fp = self._ssl_for_peer(peer)
try:
async with self._session.get(url, headers=headers, ssl=ssl_fp) as resp:
async with session.get(url, headers=headers, ssl=ssl_fp) as resp:
if resp.status == 200:
self._log_pinning_success(peer, ssl_fp)
data = await resp.json()
Expand All @@ -144,12 +145,12 @@ async def fetch_peer_campaigns(self, peer: PeerInfo) -> list | None:

async def fetch_campaign_export(self, peer: PeerInfo, campaign_id: str) -> dict | None:
"""GET /api/campaigns/{id}/export from a peer."""
await self._ensure_session()
session = await self._ensure_session()
url = f"{peer.base_url}/api/campaigns/{campaign_id}/export"
headers = self._auth_headers(peer)
ssl_fp = self._ssl_for_peer(peer)
try:
async with self._session.get(url, headers=headers, ssl=ssl_fp) as resp:
async with session.get(url, headers=headers, ssl=ssl_fp) as resp:
if resp.status == 200:
self._log_pinning_success(peer, ssl_fp)
return await resp.json()
Expand All @@ -167,12 +168,12 @@ async def join_campaign(
hostname: str,
) -> bool:
"""POST /api/campaigns/{id}/join on a peer."""
await self._ensure_session()
session = await self._ensure_session()
url = f"{peer.base_url}/api/campaigns/{campaign_id}/join"
headers = self._auth_headers(peer)
ssl_fp = self._ssl_for_peer(peer)
try:
async with self._session.post(
async with session.post(
url,
json={
"instance_id": instance_id,
Expand All @@ -199,12 +200,12 @@ async def claim_item(
hostname: str,
) -> bool:
"""POST /api/campaigns/{id}/items/{item_id}/claim on a peer."""
await self._ensure_session()
session = await self._ensure_session()
url = f"{peer.base_url}/api/campaigns/{campaign_id}/items/{item_id}/claim"
headers = self._auth_headers(peer)
ssl_fp = self._ssl_for_peer(peer)
try:
async with self._session.post(
async with session.post(
url,
json={
"instance_id": instance_id,
Expand All @@ -229,12 +230,12 @@ async def unclaim_item(
item_id: str,
) -> bool:
"""POST /api/campaigns/{id}/items/{item_id}/unclaim on a peer."""
await self._ensure_session()
session = await self._ensure_session()
url = f"{peer.base_url}/api/campaigns/{campaign_id}/items/{item_id}/unclaim"
headers = self._auth_headers(peer)
ssl_fp = self._ssl_for_peer(peer)
try:
async with self._session.post(url, headers=headers, ssl=ssl_fp) as resp:
async with session.post(url, headers=headers, ssl=ssl_fp) as resp:
if resp.status == 200:
self._log_pinning_success(peer, ssl_fp)
return resp.status == 200
Expand All @@ -253,15 +254,15 @@ async def update_item_status(
outcome: str | None = None,
) -> bool:
"""POST /api/campaigns/{id}/items/{item_id}/status on a peer."""
await self._ensure_session()
session = await self._ensure_session()
url = f"{peer.base_url}/api/campaigns/{campaign_id}/items/{item_id}/status"
body: dict[str, Any] = {"status": status}
if outcome is not None:
body["outcome"] = outcome
headers = self._auth_headers(peer)
ssl_fp = self._ssl_for_peer(peer)
try:
async with self._session.post(url, json=body, headers=headers, ssl=ssl_fp) as resp:
async with session.post(url, json=body, headers=headers, ssl=ssl_fp) as resp:
if resp.status == 200:
self._log_pinning_success(peer, ssl_fp)
return resp.status == 200
Expand Down Expand Up @@ -305,15 +306,15 @@ async def _pairing_request(
self, peer: PeerInfo, method: str, path: str, json_body: dict | None = None
) -> dict | None:
"""Make an HTTP request trying HTTPS first, then HTTP (for pre-pairing)."""
await self._ensure_session()
session = await self._ensure_session()
base = f"{peer.ip_address}:{peer.viz_port}"
for scheme in ("https", "http"):
url = f"{scheme}://{base}{path}"
try:
if method == "GET":
req = self._session.get(url)
req = session.get(url)
else:
req = self._session.post(url, json=json_body or {})
req = session.post(url, json=json_body or {})
async with req as resp:
if resp.status == 200:
return await resp.json()
Expand Down Expand Up @@ -350,12 +351,12 @@ async def confirm_pair_remote(

async def fetch_peer_sessions(self, peer: PeerInfo) -> list | None:
"""GET /api/data/sessions from a peer."""
await self._ensure_session()
session = await self._ensure_session()
url = f"{peer.base_url}/api/data/sessions"
headers = self._auth_headers(peer)
ssl_fp = self._ssl_for_peer(peer)
try:
async with self._session.get(url, headers=headers, ssl=ssl_fp) as resp:
async with session.get(url, headers=headers, ssl=ssl_fp) as resp:
if resp.status == 200:
self._log_pinning_success(peer, ssl_fp)
data = await resp.json()
Expand All @@ -368,12 +369,12 @@ async def fetch_peer_sessions(self, peer: PeerInfo) -> list | None:

async def fetch_peer_session_detail(self, peer: PeerInfo, session_id: str) -> dict | None:
"""GET /api/data/sessions/{id} from a peer."""
await self._ensure_session()
session = await self._ensure_session()
url = f"{peer.base_url}/api/data/sessions/{session_id}"
headers = self._auth_headers(peer)
ssl_fp = self._ssl_for_peer(peer)
try:
async with self._session.get(url, headers=headers, ssl=ssl_fp) as resp:
async with session.get(url, headers=headers, ssl=ssl_fp) as resp:
if resp.status == 200:
self._log_pinning_success(peer, ssl_fp)
return await resp.json()
Expand All @@ -385,12 +386,12 @@ async def fetch_peer_session_detail(self, peer: PeerInfo, session_id: str) -> di

async def fetch_peer_coverage(self, peer: PeerInfo) -> dict | None:
"""GET /api/data/coverage from a peer."""
await self._ensure_session()
session = await self._ensure_session()
url = f"{peer.base_url}/api/data/coverage"
headers = self._auth_headers(peer)
ssl_fp = self._ssl_for_peer(peer)
try:
async with self._session.get(url, headers=headers, ssl=ssl_fp) as resp:
async with session.get(url, headers=headers, ssl=ssl_fp) as resp:
if resp.status == 200:
self._log_pinning_success(peer, ssl_fp)
return await resp.json()
Expand All @@ -402,12 +403,12 @@ async def fetch_peer_coverage(self, peer: PeerInfo) -> dict | None:

async def fetch_peer_stage_distribution(self, peer: PeerInfo) -> dict | None:
"""GET /api/data/stages from a peer."""
await self._ensure_session()
session = await self._ensure_session()
url = f"{peer.base_url}/api/data/stages"
headers = self._auth_headers(peer)
ssl_fp = self._ssl_for_peer(peer)
try:
async with self._session.get(url, headers=headers, ssl=ssl_fp) as resp:
async with session.get(url, headers=headers, ssl=ssl_fp) as resp:
if resp.status == 200:
self._log_pinning_success(peer, ssl_fp)
return await resp.json()
Expand Down
3 changes: 0 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,6 @@ module = [
"gently.harness.session.timeline",
"gently.harness.tools.registry",
"gently.log_config",
"gently.mesh.capability_provider",
"gently.mesh.discovery",
"gently.mesh.peer_client",
"gently.ml.data_loader",
"gently.ml.federated",
"gently.organisms.celegans.developmental_tracker",
Expand Down
Loading