From e6a03facf9229be7497d6f93c5f855b1f3e47911 Mon Sep 17 00:00:00 2001 From: Patrick Skillen Date: Tue, 2 Jun 2026 18:23:02 +0100 Subject: [PATCH 1/4] fix(meshcore): path twin via observation observer and content key After cross-feeder dedup, match channel_text twins using the ingesting feeder MeshCorePacketObservation observer, not packet.observer. Prefer sender_timestamp+channel+text correlation; widen default twin window to 120s. --- Meshflow/Meshflow/settings/base.py | 2 +- .../meshcore_packets/services/path_twin.py | 141 ++++++++++++++++-- .../tests/test_cross_feeder_dedup.py | 68 +++++++++ .../meshcore_packets/tests/test_path_twin.py | 51 +++++++ 4 files changed, 245 insertions(+), 17 deletions(-) diff --git a/Meshflow/Meshflow/settings/base.py b/Meshflow/Meshflow/settings/base.py index 41d2fed..ef13503 100644 --- a/Meshflow/Meshflow/settings/base.py +++ b/Meshflow/Meshflow/settings/base.py @@ -316,7 +316,7 @@ def _rf_env_bool(name: str, default: bool) -> bool: # Packet deduplication: time window (minutes) within which same sender+packet_id is treated as duplicate PACKET_DEDUP_WINDOW_MINUTES = int(os.environ.get("PACKET_DEDUP_WINDOW_MINUTES", "10")) MESHCORE_PACKET_DEDUP_WINDOW_MINUTES = int(os.environ.get("MESHCORE_PACKET_DEDUP_WINDOW_MINUTES", "10")) -MESHCORE_DECODED_TWIN_WINDOW_SECONDS = int(os.environ.get("MESHCORE_DECODED_TWIN_WINDOW_SECONDS", "30")) +MESHCORE_DECODED_TWIN_WINDOW_SECONDS = int(os.environ.get("MESHCORE_DECODED_TWIN_WINDOW_SECONDS", "120")) REST_FRAMEWORK = { "DEFAULT_PERMISSION_CLASSES": [ diff --git a/Meshflow/meshcore_packets/services/path_twin.py b/Meshflow/meshcore_packets/services/path_twin.py index 3eb106b..cdf68d4 100644 --- a/Meshflow/meshcore_packets/services/path_twin.py +++ b/Meshflow/meshcore_packets/services/path_twin.py @@ -3,6 +3,7 @@ from __future__ import annotations import logging +from typing import Any from meshcore_packets.models import ( MeshCorePacketObservation, @@ -12,6 +13,10 @@ ) from meshcore_packets.services.channel import resolve_mc_channel from meshcore_packets.services.dedup import decoded_twin_window +from meshcore_packets.services.dedup_key import ( + channel_text_dedup_key, + extract_sender_timestamp, +) logger = logging.getLogger(__name__) @@ -42,6 +47,48 @@ def channel_idx_from_packet_raw_json(packet: MeshCoreRawPacket) -> int | None: return None +def _envelope_payload(raw_json: dict[str, Any] | None) -> dict[str, Any]: + if not isinstance(raw_json, dict): + return {} + envelope = raw_json + nested = raw_json.get("raw") + if isinstance(nested, dict): + envelope = nested + payload = envelope.get("payload") + return payload if isinstance(payload, dict) else {} + + +def _content_key_for_channel_text(packet: MeshCoreTextPacket, observer) -> int | None: + if packet.channel_id is None: + return None + validated = packet.raw_json if isinstance(packet.raw_json, dict) else {} + return channel_text_dedup_key( + constellation_id=observer.constellation_id, + message_channel_id=packet.channel_id, + text=packet.text or validated.get("text") or "", + sender_timestamp=extract_sender_timestamp(validated), + ) + + +def _content_key_for_raw(packet: MeshCoreRawPacket, observer, *, channel_idx: int | None) -> int | None: + if channel_idx is None: + return None + channel = resolve_mc_channel(observer, channel_idx) + if channel is None: + return None + raw_json = packet.raw_json if isinstance(packet.raw_json, dict) else {} + payload = _envelope_payload(raw_json) + text = raw_json.get("text") or payload.get("text") or "" + if not str(text).strip(): + return None + return channel_text_dedup_key( + constellation_id=observer.constellation_id, + message_channel_id=channel.id, + text=str(text), + sender_timestamp=extract_sender_timestamp(raw_json) or payload.get("sender_timestamp"), + ) + + def _path_fields_from_observation(observation: MeshCorePacketObservation) -> dict | None: if not observation.path_hashes: return None @@ -94,26 +141,65 @@ def apply_path_to_text_observation( return True -def _pick_channel_text_twin(*, observer, anchor_time, channel_idx: int | None): +def _channel_text_candidates_for_observer(*, observer, anchor_time): + """Channel text packets this feeder observed in the decoded-twin window.""" window = decoded_twin_window() - candidates = MeshCoreTextPacket.objects.filter( - observer=observer, - payload_type=MeshCorePayloadType.CHANNEL_TEXT, - rx_time__gte=anchor_time - window, - rx_time__lte=anchor_time + window, - ).order_by("-rx_time") - count = candidates.count() - if count == 0: + return ( + MeshCoreTextPacket.objects.filter( + observations__observer=observer, + payload_type=MeshCorePayloadType.CHANNEL_TEXT, + rx_time__gte=anchor_time - window, + rx_time__lte=anchor_time + window, + ) + .distinct() + .order_by("-rx_time") + ) + + +def _pick_by_content_key(*, candidates, content_key: int | None, observer): + if content_key is None: return None - if count == 1: - return candidates.first() + matches = [] + for packet in candidates: + if _content_key_for_channel_text(packet, observer) == content_key: + matches.append(packet) + if len(matches) == 1: + return matches[0] + if len(matches) > 1: + logger.debug("path_twin: %s content-key matches; skip merge", len(matches)) + return None + + +def _pick_channel_text_twin( + *, + observer, + anchor_time, + channel_idx: int | None, + raw_packet: MeshCoreRawPacket | None = None, +): + candidates = list(_channel_text_candidates_for_observer(observer=observer, anchor_time=anchor_time)) + if not candidates: + return None + + content_key = None + if raw_packet is not None: + content_key = _content_key_for_raw(raw_packet, observer, channel_idx=channel_idx) + if content_key is not None: + twin = _pick_by_content_key(candidates=candidates, content_key=content_key, observer=observer) + if twin: + return twin + + if len(candidates) == 1: + return candidates[0] + if channel_idx is not None: channel = resolve_mc_channel(observer, channel_idx) if channel: - narrowed = candidates.filter(channel=channel) - if narrowed.count() == 1: - return narrowed.first() - logger.debug("path_twin: %s channel_text candidates in window; skip merge", count) + narrowed = [p for p in candidates if p.channel_id == channel.id] + if len(narrowed) == 1: + return narrowed[0] + + logger.debug("path_twin: %s channel_text candidates in window; skip merge", len(candidates)) return None @@ -130,10 +216,12 @@ def sync_path_to_channel_text_twin( fields = _path_fields_from_observation(observation) if not fields: return False + channel_idx = channel_idx_from_packet_raw_json(packet) twin = _pick_channel_text_twin( observer=observer, anchor_time=packet.rx_time, - channel_idx=channel_idx_from_packet_raw_json(packet), + channel_idx=channel_idx, + raw_packet=packet, ) if not twin: return False @@ -144,6 +232,7 @@ def sync_path_from_rx_log_twin(*, packet: MeshCoreTextPacket, observer) -> bool: """After ingesting channel_text, copy path from a nearby rx_log TEXT_MSG/PATH observation.""" if packet.payload_type != MeshCorePayloadType.CHANNEL_TEXT: return False + content_key = _content_key_for_channel_text(packet, observer) window = decoded_twin_window() raw_packets = MeshCoreRawPacket.objects.filter( observer=observer, @@ -152,6 +241,26 @@ def sync_path_from_rx_log_twin(*, packet: MeshCoreTextPacket, observer) -> bool: rx_time__gte=packet.rx_time - window, rx_time__lte=packet.rx_time + window, ).order_by("-rx_time") + + if content_key is not None: + for raw in raw_packets: + if rx_log_payload_typename(raw) not in PATH_RX_TYPENAMES: + continue + raw_key = _content_key_for_raw( + raw, + observer, + channel_idx=channel_idx_from_packet_raw_json(raw), + ) + if raw_key != content_key: + continue + obs = MeshCorePacketObservation.objects.filter(packet=raw, observer=observer).first() + if not obs: + continue + fields = _path_fields_from_observation(obs) + if not fields: + continue + return apply_path_to_text_observation(text_packet=packet, observer=observer, **fields) + for raw in raw_packets: if rx_log_payload_typename(raw) not in PATH_RX_TYPENAMES: continue diff --git a/Meshflow/meshcore_packets/tests/test_cross_feeder_dedup.py b/Meshflow/meshcore_packets/tests/test_cross_feeder_dedup.py index 135a488..97db0b2 100644 --- a/Meshflow/meshcore_packets/tests/test_cross_feeder_dedup.py +++ b/Meshflow/meshcore_packets/tests/test_cross_feeder_dedup.py @@ -1,5 +1,8 @@ """Cross-feeder channel_text dedup (#387).""" +import json +from pathlib import Path + from django.urls import reverse from django.utils import timezone @@ -121,3 +124,68 @@ def post(feeder_info, rssi): assert response.status_code == 200 row = next(item for item in response.data["results"] if item["id"] == str(tm.id)) assert len(row["heard"]) == 2 + + +DOCS = Path(__file__).resolve().parents[3] / "docs" / "packets" / "meshcore" +PATH_DUMP = json.loads((DOCS / "rx_log_data_path.json").read_text()) + + +@pytest.mark.django_db +def test_cross_feeder_path_twin_on_second_feeder_observation(meshcore_feeder, second_mc_feeder): + """PATH on feeder B attaches to deduped packet via B's observation, not packet.observer only.""" + _setup_hashtag_channels(meshcore_feeder, second_mc_feeder) + now = timezone.now() + text = "cross feeder path twin test" + ts = 1780416000 + + def channel_payload(channel_idx, feeder_prefix, feeder_info): + client = APIClient() + client.credentials(HTTP_X_API_KEY=feeder_info["api_key"].key) + return client.post( + feeder_url("meshcore-feeder-packet-ingest", feeder_prefix), + { + "event_type": "channel_message", + "payload_type": "channel_text", + "channel_idx": channel_idx, + "rx_time": now.timestamp(), + "text": text, + "raw": { + "protocol": "meshcore", + "event_type": "channel_message", + "payload": { + "channel_idx": channel_idx, + "sender_timestamp": ts, + "text": text, + }, + }, + }, + format="json", + ) + + r_a = channel_payload(1, FEEDER_MC_PUBKEY_PREFIX, meshcore_feeder) + r_b = channel_payload(2, FEEDER_B_MC_PUBKEY_PREFIX, second_mc_feeder) + assert r_a.status_code == 201 + assert r_b.status_code == 201 + packet_id = r_a.data["packet_id"] + assert packet_id == r_b.data["packet_id"] + + client_b = APIClient() + client_b.credentials(HTTP_X_API_KEY=second_mc_feeder["api_key"].key) + r_path = client_b.post( + feeder_url("meshcore-feeder-packet-ingest", FEEDER_B_MC_PUBKEY_PREFIX), + { + "event_type": "rx_log_data", + "payload_type": "raw", + "pkt_hash": PATH_DUMP["payload"]["pkt_hash"], + "rx_time": now.timestamp(), + "path_hashes": ["6edc9b", "4cd741", "f3bcf1"], + "path_hash_size": 3, + "raw": PATH_DUMP, + }, + format="json", + ) + assert r_path.status_code == 201 + + packet = MeshCoreRawPacket.objects.get(id=packet_id) + obs_b = MeshCorePacketObservation.objects.get(packet=packet, observer=second_mc_feeder["node"]) + assert obs_b.path_hashes == ["6edc9b", "4cd741", "f3bcf1"] diff --git a/Meshflow/meshcore_packets/tests/test_path_twin.py b/Meshflow/meshcore_packets/tests/test_path_twin.py index 0086edd..f42c0ba 100644 --- a/Meshflow/meshcore_packets/tests/test_path_twin.py +++ b/Meshflow/meshcore_packets/tests/test_path_twin.py @@ -1,6 +1,7 @@ """Tests for rx_log → channel_text path twin merge.""" import json +from datetime import timedelta from pathlib import Path from django.utils import timezone @@ -151,3 +152,53 @@ def test_raw_path_without_channel_text_twin_leaves_no_text_path(meshcore_feeder, ) assert response.status_code == 201 assert MeshCoreTextPacket.objects.filter(payload_type=MeshCorePayloadType.CHANNEL_TEXT).count() == 0 + + +@pytest.mark.django_db +def test_raw_path_before_channel_text_outside_30s_within_120s_window(meshcore_feeder, ingest_client): + """PATH up to 90s before channel_message still twins when default window is 120s.""" + reconcile_mc_channels( + meshcore_feeder["node"], + [{"mc_channel_idx": 0, "name": "Public", "mc_channel_type": "PUBLIC"}], + ) + text_time = timezone.now() + path_time = text_time - timedelta(seconds=90) + url = feeder_url("meshcore-feeder-packet-ingest", FEEDER_MC_PUBKEY_PREFIX) + path_dump = json.loads((DOCS / "rx_log_data_path.json").read_text()) + channel_dump = _load(CHANNEL_MSG) + text = "delayed twin window test" + channel_dump = dict(channel_dump) + channel_dump["payload"] = dict(channel_dump["payload"]) + channel_dump["payload"]["text"] = text + channel_dump["payload"]["sender_timestamp"] = 1780416123 + + ingest_client.post( + url, + { + "event_type": "rx_log_data", + "payload_type": "raw", + "pkt_hash": path_dump["payload"]["pkt_hash"], + "rx_time": path_time.timestamp(), + "path_hashes": ["aa", "bb"], + "path_hash_size": 2, + "raw": path_dump, + }, + format="json", + ) + + ingest_client.post( + url, + { + "event_type": "channel_message", + "payload_type": "channel_text", + "channel_idx": 0, + "rx_time": text_time.timestamp(), + "text": text, + "raw": channel_dump, + }, + format="json", + ) + + text_packet = MeshCoreTextPacket.objects.get(text=text) + text_obs = MeshCorePacketObservation.objects.get(packet=text_packet) + assert text_obs.path_hashes == ["aa", "bb"] From c0758467752c266e71ea386eb20966ee2b827751 Mon Sep 17 00:00:00 2001 From: Patrick Skillen Date: Tue, 2 Jun 2026 18:23:10 +0100 Subject: [PATCH 2/4] feat(meshcore): resolve heard path hops from segment table Load MeshCorePathSegmentResolution for heard resolved_path and positions. path_known requires all hops resolved with coordinates. Update OpenAPI ResolvedHop.position. --- .../services/path_resolution.py | 93 +++++++++++++--- .../tests/test_path_resolution.py | 36 +++++- Meshflow/text_messages/serializers.py | 2 +- .../text_messages/tests/test_heard_api.py | 103 ++++++++++++++++++ openapi.yaml | 8 +- 5 files changed, 224 insertions(+), 18 deletions(-) diff --git a/Meshflow/meshcore_packets/services/path_resolution.py b/Meshflow/meshcore_packets/services/path_resolution.py index 6928d37..0eff291 100644 --- a/Meshflow/meshcore_packets/services/path_resolution.py +++ b/Meshflow/meshcore_packets/services/path_resolution.py @@ -1,9 +1,12 @@ -"""MeshCore path segment display (v1: no hash→ObservedNode linking).""" +"""MeshCore path segment display and resolution via meshcore_packet_path.""" from __future__ import annotations from typing import Any, Iterable +from meshcore_packet_path.models import MeshCorePathSegmentResolution, SegmentStatus +from text_messages.map_helpers import observed_node_map_position + HOP_STATUS_UNKNOWN = "unknown" HOP_STATUS_AMBIGUOUS = "ambiguous" HOP_STATUS_RESOLVED = "resolved" @@ -13,9 +16,42 @@ def _normalize_segment(segment: str) -> str: return str(segment).strip().lower().replace("0x", "") -def format_path_hop(segment: str) -> dict[str, Any]: - """One display hop for a wire path segment (v1: always unknown).""" +def _hop_from_resolution(segment: str, resolution: MeshCorePathSegmentResolution | None) -> dict[str, Any]: normalized = _normalize_segment(segment) + if resolution is None: + return { + "hash": normalized, + "status": HOP_STATUS_UNKNOWN, + "node_id_str": None, + "internal_id": None, + "long_name": None, + "ambiguous": False, + "position": None, + } + + if resolution.status == SegmentStatus.RESOLVED and resolution.observed_node_id: + node = resolution.observed_node + return { + "hash": normalized, + "status": HOP_STATUS_RESOLVED, + "node_id_str": node.node_id_str if node else None, + "internal_id": str(node.internal_id) if node else None, + "long_name": node.long_name if node else None, + "ambiguous": False, + "position": observed_node_map_position(node), + } + + if resolution.status == SegmentStatus.AMBIGUOUS: + return { + "hash": normalized, + "status": HOP_STATUS_AMBIGUOUS, + "node_id_str": None, + "internal_id": None, + "long_name": None, + "ambiguous": True, + "position": None, + } + return { "hash": normalized, "status": HOP_STATUS_UNKNOWN, @@ -23,26 +59,57 @@ def format_path_hop(segment: str) -> dict[str, Any]: "internal_id": None, "long_name": None, "ambiguous": False, + "position": None, } -def format_path_hops(segments: list[str] | None) -> list[dict[str, Any]]: +def format_path_hop(segment: str, *, resolution_cache: dict[str, dict[str, Any]] | None = None) -> dict[str, Any]: + normalized = _normalize_segment(segment) + if resolution_cache is not None and normalized in resolution_cache: + return resolution_cache[normalized] + return _hop_from_resolution(segment, None) + + +def format_path_hops( + segments: list[str] | None, + *, + resolution_cache: dict[str, dict[str, Any]] | None = None, +) -> list[dict[str, Any]]: if not segments: return [] - return [format_path_hop(segment) for segment in segments] + return [format_path_hop(segment, resolution_cache=resolution_cache) for segment in segments] def bulk_format_path_hops(segments: Iterable[str]) -> dict[str, dict[str, Any]]: - """Dedupe segments for a single message list response.""" - cache: dict[str, dict[str, Any]] = {} + """Dedupe segments and load MeshCorePathSegmentResolution rows when present.""" + normalized_list: list[str] = [] for segment in segments: normalized = _normalize_segment(segment) - if normalized and normalized not in cache: - cache[normalized] = format_path_hop(normalized) + if normalized and normalized not in normalized_list: + normalized_list.append(normalized) + + if not normalized_list: + return {} + + resolutions = { + _normalize_segment(row.segment_hash): row + for row in MeshCorePathSegmentResolution.objects.filter( + segment_hash__in=normalized_list, + ).select_related("observed_node", "observed_node__latest_status") + } + + cache: dict[str, dict[str, Any]] = {} + for normalized in normalized_list: + cache[normalized] = _hop_from_resolution(normalized, resolutions.get(normalized)) return cache -def path_known_for_segments(segments: list[str] | None) -> bool: - """True only when all hops are resolved (v1 always false).""" - hops = format_path_hops(segments) - return bool(hops) and all(hop.get("status") == HOP_STATUS_RESOLVED for hop in hops) +def path_known_for_segments( + segments: list[str] | None, + *, + resolution_cache: dict[str, dict[str, Any]] | None = None, +) -> bool: + hops = format_path_hops(segments, resolution_cache=resolution_cache) + if not hops: + return False + return all(hop.get("status") == HOP_STATUS_RESOLVED and hop.get("position") is not None for hop in hops) diff --git a/Meshflow/meshcore_packets/tests/test_path_resolution.py b/Meshflow/meshcore_packets/tests/test_path_resolution.py index b540764..939c760 100644 --- a/Meshflow/meshcore_packets/tests/test_path_resolution.py +++ b/Meshflow/meshcore_packets/tests/test_path_resolution.py @@ -1,10 +1,15 @@ -"""Tests for MeshCore path segment display (v1).""" +"""Tests for MeshCore path segment display and segment resolution lookup.""" +import pytest + +from common.protocol import Protocol +from meshcore_packet_path.models import MeshCorePathSegmentResolution, SegmentStatus from meshcore_packets.services.path_resolution import ( bulk_format_path_hops, format_path_hops, path_known_for_segments, ) +from nodes.models import ObservedNode def test_format_path_hops_unknown_status(): @@ -17,6 +22,7 @@ def test_format_path_hops_unknown_status(): "internal_id": None, "long_name": None, "ambiguous": False, + "position": None, } assert hops[1]["hash"] == "f1" @@ -26,12 +32,36 @@ def test_format_path_hops_normalizes_hex(): assert hops[0]["hash"] == "f3bc" +@pytest.mark.django_db def test_bulk_format_path_hops_dedupes(): cache = bulk_format_path_hops(["aa", "aa", "bb"]) assert set(cache.keys()) == {"aa", "bb"} assert cache["aa"]["status"] == "unknown" -def test_path_known_false_in_v1(): - assert path_known_for_segments(["aa", "bb"]) is False +@pytest.mark.django_db +def test_path_known_false_when_unresolved(): + cache = bulk_format_path_hops(["aa", "bb"]) + assert path_known_for_segments(["aa", "bb"], resolution_cache=cache) is False assert path_known_for_segments(None) is False + + +@pytest.mark.django_db +def test_bulk_format_path_hops_uses_segment_resolution_table(): + node = ObservedNode.objects.create( + protocol=Protocol.MESHCORE, + mc_pubkey="a" * 64, + mc_pubkey_prefix="a" * 12, + long_name="Resolved Hop", + ) + MeshCorePathSegmentResolution.objects.create( + segment_hash="f3bc", + hash_size=2, + status=SegmentStatus.RESOLVED, + observed_node=node, + ) + cache = bulk_format_path_hops(["f3bc"]) + hop = cache["f3bc"] + assert hop["status"] == "resolved" + assert hop["node_id_str"] == node.node_id_str + assert hop["long_name"] == "Resolved Hop" diff --git a/Meshflow/text_messages/serializers.py b/Meshflow/text_messages/serializers.py index 55fda01..71ca5c1 100644 --- a/Meshflow/text_messages/serializers.py +++ b/Meshflow/text_messages/serializers.py @@ -141,7 +141,7 @@ def get_heard(self, obj): "rx_snr": obs.rx_snr, "path_hashes": segments, "resolved_path": _resolved_path_from_cache(segments, path_hop_cache), - "path_known": path_known_for_segments(segments), + "path_known": path_known_for_segments(segments, resolution_cache=path_hop_cache), } ) return heard diff --git a/Meshflow/text_messages/tests/test_heard_api.py b/Meshflow/text_messages/tests/test_heard_api.py index 4f69135..81b1a85 100644 --- a/Meshflow/text_messages/tests/test_heard_api.py +++ b/Meshflow/text_messages/tests/test_heard_api.py @@ -8,8 +8,11 @@ import pytest from rest_framework.test import APIClient +from common.protocol import Protocol +from meshcore_packet_path.models import MeshCorePathSegmentResolution, SegmentStatus from meshcore_packets.services.channel_sync import reconcile_mc_channels from meshcore_packets.tests.conftest import FEEDER_MC_PUBKEY_PREFIX, feeder_url +from nodes.models import NodeLatestStatus, ObservedNode from text_messages.models import TextMessage @@ -143,3 +146,103 @@ def test_mc_message_heard_path_via_rx_log_twin(meshcore_feeder, ingest_client): response = client.get(list_url, {"channel_id": tm.channel_id}) row = next(item for item in response.data["results"] if item["id"] == str(tm.id)) assert row["heard"][0]["path_hashes"] == ["ab", "cd"] + + +@pytest.mark.django_db +def test_mc_message_heard_resolved_path_from_segment_table(meshcore_feeder, ingest_client): + reconcile_mc_channels( + meshcore_feeder["node"], + [{"mc_channel_idx": 0, "name": "Public", "mc_channel_type": "PUBLIC"}], + ) + node = ObservedNode.objects.create( + protocol=Protocol.MESHCORE, + mc_pubkey="b" * 64, + mc_pubkey_prefix="b" * 12, + long_name="Hop Node", + ) + NodeLatestStatus.objects.create( + node=node, + latitude=55.95, + longitude=-4.25, + ) + MeshCorePathSegmentResolution.objects.create( + segment_hash="ab", + hash_size=2, + status=SegmentStatus.RESOLVED, + observed_node=node, + ) + now = timezone.now() + url = feeder_url("meshcore-feeder-packet-ingest", FEEDER_MC_PUBKEY_PREFIX) + ingest_client.post( + url, + { + "event_type": "channel_message", + "payload_type": "channel_text", + "channel_idx": 0, + "pkt_hash": 88011, + "rx_time": now.timestamp(), + "text": "resolved hop map test", + "path_hashes": ["ab"], + "raw": {}, + }, + format="json", + ) + tm = TextMessage.objects.get(message_text="resolved hop map test") + client = APIClient() + response = client.get(reverse("textmessage-list"), {"channel_id": tm.channel_id}) + row = next(item for item in response.data["results"] if item["id"] == str(tm.id)) + hop = row["heard"][0]["resolved_path"][0] + assert hop["status"] == "resolved" + assert hop["node_id_str"] == node.node_id_str + assert hop["position"]["latitude"] == 55.95 + assert row["heard"][0]["path_known"] is True + + +@pytest.mark.django_db +def test_mc_message_heard_resolved_path_from_segment_table(meshcore_feeder, ingest_client): + reconcile_mc_channels( + meshcore_feeder["node"], + [{"mc_channel_idx": 0, "name": "Public", "mc_channel_type": "PUBLIC"}], + ) + node = ObservedNode.objects.create( + protocol=Protocol.MESHCORE, + mc_pubkey="b" * 64, + mc_pubkey_prefix="b" * 12, + long_name="Hop Node", + ) + NodeLatestStatus.objects.create( + node=node, + latitude=55.95, + longitude=-4.25, + ) + MeshCorePathSegmentResolution.objects.create( + segment_hash="ab", + hash_size=2, + status=SegmentStatus.RESOLVED, + observed_node=node, + ) + now = timezone.now() + url = feeder_url("meshcore-feeder-packet-ingest", FEEDER_MC_PUBKEY_PREFIX) + ingest_client.post( + url, + { + "event_type": "channel_message", + "payload_type": "channel_text", + "channel_idx": 0, + "pkt_hash": 88011, + "rx_time": now.timestamp(), + "text": "resolved hop map test", + "path_hashes": ["ab"], + "raw": {}, + }, + format="json", + ) + tm = TextMessage.objects.get(message_text="resolved hop map test") + client = APIClient() + response = client.get(reverse("textmessage-list"), {"channel_id": tm.channel_id}) + row = next(item for item in response.data["results"] if item["id"] == str(tm.id)) + hop = row["heard"][0]["resolved_path"][0] + assert hop["status"] == "resolved" + assert hop["node_id_str"] == node.node_id_str + assert hop["position"]["latitude"] == 55.95 + assert row["heard"][0]["path_known"] is True diff --git a/openapi.yaml b/openapi.yaml index c1e7395..6cfb4c0 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -2777,6 +2777,10 @@ components: nullable: true ambiguous: type: boolean + position: + $ref: '#/components/schemas/MapPosition' + nullable: true + description: Present when status is resolved and the linked ObservedNode has a latest position HeardObserver: type: object @@ -2845,7 +2849,9 @@ components: $ref: '#/components/schemas/ResolvedHop' path_known: type: boolean - description: False in v1 (hop positions not linked to map coordinates) + description: > + True when every hop in resolved_path is resolved and has a position (from + MeshCorePathSegmentResolution / staff annotation). TextMessage: type: object From 9f998bc0c18d1520eded37edb4932b4f2bee039e Mon Sep 17 00:00:00 2001 From: Patrick Skillen Date: Tue, 2 Jun 2026 18:23:11 +0100 Subject: [PATCH 3/4] docs(meshcore): tier 1 twin matching and tier 2 heard resolution --- docs/ENV_VARS.md | 2 +- .../tier-1-message-path-twin.md | 18 +++++++++------- .../tier-2-heard-resolution.md | 21 +++++++++++++++++++ docs/features/meshcore/phase-3-outstanding.md | 20 ++++++++++-------- 4 files changed, 43 insertions(+), 18 deletions(-) create mode 100644 docs/features/meshcore/packet-path-tracing/tier-2-heard-resolution.md diff --git a/docs/ENV_VARS.md b/docs/ENV_VARS.md index 45f388a..e0c598f 100644 --- a/docs/ENV_VARS.md +++ b/docs/ENV_VARS.md @@ -85,7 +85,7 @@ Discord **login** uses the OAuth pair above. **DM notifications** (test message |-------------------------------|---------|-------------------------------------------------------------------|-------------------------| | `PACKET_DEDUP_WINDOW_MINUTES` | `10` | Time window (minutes) within which same sender+packet_id is treated as duplicate. | Integer (string) | | `MESHCORE_PACKET_DEDUP_WINDOW_MINUTES` | `10` | MeshCore dedup window (minutes) for `(pkt_hash, rx_time)` per ADR-0004. | Integer (string) | -| `MESHCORE_DECODED_TWIN_WINDOW_SECONDS` | `30` | Optional window to collapse decoded-twin MeshCore frames (ADR-0004). | Integer (string) | +| `MESHCORE_DECODED_TWIN_WINDOW_SECONDS` | `120` | Max `rx_time` skew for rx_log PATH/TEXT_MSG ↔ `channel_text` path twin merge (ADR-0004). | Integer (string) | --- diff --git a/docs/features/meshcore/packet-path-tracing/tier-1-message-path-twin.md b/docs/features/meshcore/packet-path-tracing/tier-1-message-path-twin.md index a0b2aaa..b324d23 100644 --- a/docs/features/meshcore/packet-path-tracing/tier-1-message-path-twin.md +++ b/docs/features/meshcore/packet-path-tracing/tier-1-message-path-twin.md @@ -10,33 +10,35 @@ Decoded `channel_message` ingest (`channel_text`) usually has `path_hash_mode` / **Thin bot:** upload TEXT_MSG and PATH `rx_log_data` as `payload_type: raw` (forward envelope + `path_hashes` when present). -**Fat API:** after ingest, **bidirectional twin merge** within `MESHCORE_DECODED_TWIN_WINDOW_SECONDS` (default 30s, see `meshcore_packets.services.dedup.decoded_twin_window`): +**Fat API:** after ingest, **bidirectional twin merge** within `MESHCORE_DECODED_TWIN_WINDOW_SECONDS` (default **120s**, see `meshcore_packets.services.dedup.decoded_twin_window`): | Order | Action | | --- | --- | | `channel_text` then `raw` TEXT_MSG/PATH | On `raw` ingest, copy path fields onto the matching `MeshCoreTextPacket` observation (same feeder) | | `raw` then `channel_text` | On `channel_text` ingest, copy path from recent matching `raw` observation | -Matching rules (MVP): +Matching rules: -- Same `observer` (feeder `ManagedNode`). -- `rx_time` within decoded-twin window. -- Target packet: `CHANNEL_TEXT` only (not DMs). -- If multiple candidates: narrow by `channel_idx` in `raw_json` vs text packet / observation `channel` when available; if still ambiguous, skip merge (debug log). +- Same **feeder** (`ManagedNode`) via `MeshCorePacketObservation` (required for cross-feeder dedup [#387](https://github.com/pskillen/meshflow-api/issues/387): twin targets packets this feeder observed, not only `packet.observer`). +- `rx_time` within decoded-twin window (outer bound). +- Prefer **content dedup key** when `sender_timestamp` + canonical `MessageChannel` + text are available (same key as [#387](https://github.com/pskillen/meshflow-api/issues/387) `channel_text` dedup). +- Fall back to `channel_idx` / canonical channel when multiple time-only candidates. +- If still ambiguous, skip merge (debug log). Raw rows are still stored (M1 rollups / debugging). Heard uses the **text** packet observation after merge. ## Failure modes - No twin in window → `path_hashes` stay empty on text observation; heard schematic empty. -- `channel_message` without companion `rx_log_data` on that feeder → no path (ops / library gap). +- `channel_message` without companion `rx_log_data` on that feeder → no path (restart bot on `main`; confirm RAW rows per feeder). +- `path_len: 0` on wire → empty hop list is valid (e.g. in-room direct). - Full `raw_packet_fk` linkage → deferred to [ADR-0004](../../packet-ingestion/adr/0004-mc-dedup-key.md) / [#276](https://github.com/pskillen/meshflow-api/issues/276). ## Implementation - `meshcore_packets.services.path_hashes` — server-side `path` hex split when bot omits `path_hashes`. - `meshcore_packets.services.path_twin` — `sync_path_to_channel_text_twin`, `sync_path_from_rx_log_twin`. -- Wired from `MeshCorePacketIngestSerializer.create` after `_ensure_observation`. +- Wired from `MeshCorePacketIngestSerializer.create` after `_ensure_observation` (including dedup hit). ## Verification diff --git a/docs/features/meshcore/packet-path-tracing/tier-2-heard-resolution.md b/docs/features/meshcore/packet-path-tracing/tier-2-heard-resolution.md new file mode 100644 index 0000000..89e6af5 --- /dev/null +++ b/docs/features/meshcore/packet-path-tracing/tier-2-heard-resolution.md @@ -0,0 +1,21 @@ +# Tier 2 — Heard map resolution + +**Tracking:** [#373](https://github.com/pskillen/meshflow-api/issues/373), [#374](https://github.com/pskillen/meshflow-api/issues/374) + +## Behaviour + +`GET /api/messages/text/` `heard[].resolved_path` is built from: + +1. **`MeshCorePathSegmentResolution`** (M1) — staff `PATCH /api/meshcore/path-tracing/segments/` or rollup from passive traffic. +2. **No automatic suffix/recency heuristics** in v1 (await ADR #373 for proven matcher). + +When a segment row is `status=resolved` with `observed_node` set, each hop includes `node_id_str`, `long_name`, and `position` (from `NodeLatestStatus`). `path_known` is true only when **every** hop is resolved **and** has a position. + +## UI + +[meshflow-ui](https://github.com/pskillen/meshflow-ui) `HeardPathGeoMap` draws polylines when `path_known` and hop `position` values are present (same pattern as Meshtastic heard paths). + +## Code + +- `meshcore_packets.services.path_resolution.bulk_format_path_hops` — segment table lookup. +- `text_messages` list view — bulk cache per page of messages. diff --git a/docs/features/meshcore/phase-3-outstanding.md b/docs/features/meshcore/phase-3-outstanding.md index 2783446..771ce3b 100644 --- a/docs/features/meshcore/phase-3-outstanding.md +++ b/docs/features/meshcore/phase-3-outstanding.md @@ -13,11 +13,11 @@ Gap analysis (Jun 2026): channel **Heard** is the user-facing “map view” for | Tier | User outcome | Tracking | | --- | --- | --- | -| **Tier 1** (ship next) | Real `#channel` messages show **non-empty `path_hashes`** in Heard (unknown hop labels OK) | **[#385](https://github.com/pskillen/meshflow-api/issues/385)** | -| **Tier 2** | Hop **polylines on Leaflet** when hashes map to node positions | M2/M3 matcher ([#373](https://github.com/pskillen/meshflow-api/issues/373), [#374](https://github.com/pskillen/meshflow-api/issues/374)); wire `heard[]` to M1 `MeshCorePathSegmentResolution`; UI draw MC waypoints with `position` | +| **Tier 1** | Non-empty `path_hashes` per feeder in Heard (twin + bot RAW upload) | [#385](https://github.com/pskillen/meshflow-api/issues/385) — twin hardening on `main` (observation match, content key, 120s window) | +| **Tier 2** | Hop **polylines on Leaflet** when segment rows resolve to positioned nodes | [tier-2-heard-resolution.md](./packet-path-tracing/tier-2-heard-resolution.md); auto-matcher ADR [#373](https://github.com/pskillen/meshflow-api/issues/373) still open | | **Tier 3** | Full path tracing product (Neo4j, realtime WS, M7 topology) | [#372](https://github.com/pskillen/meshflow-api/issues/372) milestones M4–M7, [meshflow-ui#309](https://github.com/pskillen/meshflow-ui/issues/309) | -**Tier 1 blocker (pre-prod):** `channel_text` linked to `TextMessage` has empty `path_hashes`; PATH/TEXT_MSG `rx_log_data` with `path` is not uploaded. Precursor + UI ([#369](https://github.com/pskillen/meshflow-api/issues/369), [#304](https://github.com/pskillen/meshflow-ui/issues/304), [#311](https://github.com/pskillen/meshflow-ui/issues/311)) already work when data exists — **no new UI for Tier 1**. +**Tier 1 ops:** Both feeders must run **meshflow-bot `main`** (PATH/TEXT_MSG upload) and post to API on **`main`**. If `path_hashes` stay empty, check per-feeder `RAW` row counts and twin window — not missing API merge on `main` alone. **Recommendations** @@ -30,18 +30,20 @@ Gap analysis (Jun 2026): channel **Heard** is the user-facing “map view” for ## Passive path -- [ ] **Cross-feeder channel message dedup** — [#387](https://github.com/pskillen/meshflow-api/issues/387): one `TextMessage` + N observations when multiple feeders hear the same `channel_text` (PR in flight). Pre-prod duplicate rows are not backfilled. -- [ ] **Tier 1 — message path data chain** — [#385](https://github.com/pskillen/meshflow-api/issues/385): `path_hashes` on observation tied to `TextMessage.original_mc_packet` for channel traffic. Detail: [packet-path-tracing-outstanding.md § Message path data chain](./packet-path-tracing/packet-path-tracing-outstanding.md#message-path-data-chain-confirmed--pre-prod-jun-2026). -- [ ] **Passive packet path subsystem (M1+)** — rollups, resolution table, Neo4j export, realtime/history UI ([ADR-0001](./packet-path-tracing/adr/0001-meshcore-packet-path-tracing-subsystem.md)); merge/deploy PRs [#378](https://github.com/pskillen/meshflow-api/pull/378), [bot#122](https://github.com/pskillen/meshflow-bot/pull/122), [ui#310](https://github.com/pskillen/meshflow-ui/pull/310). -- [ ] **Tier 2 — `heard[]` → segment resolution table** — augment `bulk_format_path_hops` in `text_messages/views.py` with `MeshCorePathSegmentResolution` (manual + resolved rows). -- [ ] **Proven hash → node matcher** — per [traceroute ADR §A](../traceroute/adr/0001-mc-path-hash-resolution.md); no unsafe heuristics in v1 ([#373](https://github.com/pskillen/meshflow-api/issues/373)). +- [x] **Cross-feeder channel message dedup** — [#387](https://github.com/pskillen/meshflow-api/issues/387) on `main`. +- [ ] **Tier 1 — path twin hardening** — observation-based twin after dedup; content-key match; default 120s window ([tier-1 doc](./packet-path-tracing/tier-1-message-path-twin.md)). +- [ ] **Tier 1 — ops** — both feeders uploading `RAW` PATH/TEXT_MSG to pre-prod API. +- [ ] **Passive packet path subsystem (M1+)** — rollups, resolution table, Neo4j export, realtime/history UI ([ADR-0001](./packet-path-tracing/adr/0001-meshcore-packet-path-tracing-subsystem.md)); deploy [#378](https://github.com/pskillen/meshflow-api/pull/378) on pre-prod if not already. +- [x] **Tier 2 — `heard[]` → segment resolution table** — `bulk_format_path_hops` uses `MeshCorePathSegmentResolution` ([tier-2 doc](./packet-path-tracing/tier-2-heard-resolution.md)). +- [ ] **Proven hash → node matcher** — per [traceroute ADR §A](../traceroute/adr/0001-mc-path-hash-resolution.md); v1 uses manual/rollup rows only ([#373](https://github.com/pskillen/meshflow-api/issues/373)). +- [ ] **Tier 2 — UI MC polylines** — meshflow-ui `HeardPathGeoMap` when `resolved_path[].position` set. - [ ] **`GET /meshcore/packets/`** — optional `resolved_path` on list/detail (deferred). ### Bot ([meshflow-bot#119](https://github.com/pskillen/meshflow-bot/issues/119)) Prefer **thin bot / fat server** — tracked under [#385](https://github.com/pskillen/meshflow-api/issues/385). -- [ ] Upload `rx_log_data` TEXT_MSG / PATH (or raw pass-through) — **required for Tier 1**; no bot-side correlation. +- [x] Upload `rx_log_data` TEXT_MSG / PATH — [bot#124](https://github.com/pskillen/meshflow-bot/pull/124) on `main`; verify per-feeder in ops. - [ ] Unit tests for `_path_hashes()` (1/2/3-byte `path_hash_size`) when wire includes `path`. --- From 851b7809c746470db619d8be319d11acc957448e Mon Sep 17 00:00:00 2001 From: Patrick Skillen Date: Tue, 2 Jun 2026 21:03:14 +0100 Subject: [PATCH 4/4] test(messages): remove duplicate heard segment resolution test --- .../text_messages/tests/test_heard_api.py | 50 ------------------- 1 file changed, 50 deletions(-) diff --git a/Meshflow/text_messages/tests/test_heard_api.py b/Meshflow/text_messages/tests/test_heard_api.py index 81b1a85..083e6d0 100644 --- a/Meshflow/text_messages/tests/test_heard_api.py +++ b/Meshflow/text_messages/tests/test_heard_api.py @@ -196,53 +196,3 @@ def test_mc_message_heard_resolved_path_from_segment_table(meshcore_feeder, inge assert hop["node_id_str"] == node.node_id_str assert hop["position"]["latitude"] == 55.95 assert row["heard"][0]["path_known"] is True - - -@pytest.mark.django_db -def test_mc_message_heard_resolved_path_from_segment_table(meshcore_feeder, ingest_client): - reconcile_mc_channels( - meshcore_feeder["node"], - [{"mc_channel_idx": 0, "name": "Public", "mc_channel_type": "PUBLIC"}], - ) - node = ObservedNode.objects.create( - protocol=Protocol.MESHCORE, - mc_pubkey="b" * 64, - mc_pubkey_prefix="b" * 12, - long_name="Hop Node", - ) - NodeLatestStatus.objects.create( - node=node, - latitude=55.95, - longitude=-4.25, - ) - MeshCorePathSegmentResolution.objects.create( - segment_hash="ab", - hash_size=2, - status=SegmentStatus.RESOLVED, - observed_node=node, - ) - now = timezone.now() - url = feeder_url("meshcore-feeder-packet-ingest", FEEDER_MC_PUBKEY_PREFIX) - ingest_client.post( - url, - { - "event_type": "channel_message", - "payload_type": "channel_text", - "channel_idx": 0, - "pkt_hash": 88011, - "rx_time": now.timestamp(), - "text": "resolved hop map test", - "path_hashes": ["ab"], - "raw": {}, - }, - format="json", - ) - tm = TextMessage.objects.get(message_text="resolved hop map test") - client = APIClient() - response = client.get(reverse("textmessage-list"), {"channel_id": tm.channel_id}) - row = next(item for item in response.data["results"] if item["id"] == str(tm.id)) - hop = row["heard"][0]["resolved_path"][0] - assert hop["status"] == "resolved" - assert hop["node_id_str"] == node.node_id_str - assert hop["position"]["latitude"] == 55.95 - assert row["heard"][0]["path_known"] is True