Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Meshflow/Meshflow/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ def _rf_env_bool(name: str, default: bool) -> bool:
# Packet deduplication: time window (minutes) within which same sender+packet_id is treated as duplicate
PACKET_DEDUP_WINDOW_MINUTES = int(os.environ.get("PACKET_DEDUP_WINDOW_MINUTES", "10"))
MESHCORE_PACKET_DEDUP_WINDOW_MINUTES = int(os.environ.get("MESHCORE_PACKET_DEDUP_WINDOW_MINUTES", "10"))
MESHCORE_DECODED_TWIN_WINDOW_SECONDS = int(os.environ.get("MESHCORE_DECODED_TWIN_WINDOW_SECONDS", "30"))
MESHCORE_DECODED_TWIN_WINDOW_SECONDS = int(os.environ.get("MESHCORE_DECODED_TWIN_WINDOW_SECONDS", "120"))

REST_FRAMEWORK = {
"DEFAULT_PERMISSION_CLASSES": [
Expand Down
93 changes: 80 additions & 13 deletions Meshflow/meshcore_packets/services/path_resolution.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
"""MeshCore path segment display (v1: no hash→ObservedNode linking)."""
"""MeshCore path segment display and resolution via meshcore_packet_path."""

from __future__ import annotations

from typing import Any, Iterable

from meshcore_packet_path.models import MeshCorePathSegmentResolution, SegmentStatus
from text_messages.map_helpers import observed_node_map_position

HOP_STATUS_UNKNOWN = "unknown"
HOP_STATUS_AMBIGUOUS = "ambiguous"
HOP_STATUS_RESOLVED = "resolved"
Expand All @@ -13,36 +16,100 @@ def _normalize_segment(segment: str) -> str:
return str(segment).strip().lower().replace("0x", "")


def format_path_hop(segment: str) -> dict[str, Any]:
"""One display hop for a wire path segment (v1: always unknown)."""
def _hop_from_resolution(segment: str, resolution: MeshCorePathSegmentResolution | None) -> dict[str, Any]:
normalized = _normalize_segment(segment)
if resolution is None:
return {
"hash": normalized,
"status": HOP_STATUS_UNKNOWN,
"node_id_str": None,
"internal_id": None,
"long_name": None,
"ambiguous": False,
"position": None,
}

if resolution.status == SegmentStatus.RESOLVED and resolution.observed_node_id:
node = resolution.observed_node
return {
"hash": normalized,
"status": HOP_STATUS_RESOLVED,
"node_id_str": node.node_id_str if node else None,
"internal_id": str(node.internal_id) if node else None,
"long_name": node.long_name if node else None,
"ambiguous": False,
"position": observed_node_map_position(node),
}

if resolution.status == SegmentStatus.AMBIGUOUS:
return {
"hash": normalized,
"status": HOP_STATUS_AMBIGUOUS,
"node_id_str": None,
"internal_id": None,
"long_name": None,
"ambiguous": True,
"position": None,
}

return {
"hash": normalized,
"status": HOP_STATUS_UNKNOWN,
"node_id_str": None,
"internal_id": None,
"long_name": None,
"ambiguous": False,
"position": None,
}


def format_path_hops(segments: list[str] | None) -> list[dict[str, Any]]:
def format_path_hop(segment: str, *, resolution_cache: dict[str, dict[str, Any]] | None = None) -> dict[str, Any]:
normalized = _normalize_segment(segment)
if resolution_cache is not None and normalized in resolution_cache:
return resolution_cache[normalized]
return _hop_from_resolution(segment, None)


def format_path_hops(
segments: list[str] | None,
*,
resolution_cache: dict[str, dict[str, Any]] | None = None,
) -> list[dict[str, Any]]:
if not segments:
return []
return [format_path_hop(segment) for segment in segments]
return [format_path_hop(segment, resolution_cache=resolution_cache) for segment in segments]


def bulk_format_path_hops(segments: Iterable[str]) -> dict[str, dict[str, Any]]:
"""Dedupe segments for a single message list response."""
cache: dict[str, dict[str, Any]] = {}
"""Dedupe segments and load MeshCorePathSegmentResolution rows when present."""
normalized_list: list[str] = []
for segment in segments:
normalized = _normalize_segment(segment)
if normalized and normalized not in cache:
cache[normalized] = format_path_hop(normalized)
if normalized and normalized not in normalized_list:
normalized_list.append(normalized)

if not normalized_list:
return {}

resolutions = {
_normalize_segment(row.segment_hash): row
for row in MeshCorePathSegmentResolution.objects.filter(
segment_hash__in=normalized_list,
).select_related("observed_node", "observed_node__latest_status")
}

cache: dict[str, dict[str, Any]] = {}
for normalized in normalized_list:
cache[normalized] = _hop_from_resolution(normalized, resolutions.get(normalized))
return cache


def path_known_for_segments(segments: list[str] | None) -> bool:
"""True only when all hops are resolved (v1 always false)."""
hops = format_path_hops(segments)
return bool(hops) and all(hop.get("status") == HOP_STATUS_RESOLVED for hop in hops)
def path_known_for_segments(
segments: list[str] | None,
*,
resolution_cache: dict[str, dict[str, Any]] | None = None,
) -> bool:
hops = format_path_hops(segments, resolution_cache=resolution_cache)
if not hops:
return False
return all(hop.get("status") == HOP_STATUS_RESOLVED and hop.get("position") is not None for hop in hops)
141 changes: 125 additions & 16 deletions Meshflow/meshcore_packets/services/path_twin.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import logging
from typing import Any

from meshcore_packets.models import (
MeshCorePacketObservation,
Expand All @@ -12,6 +13,10 @@
)
from meshcore_packets.services.channel import resolve_mc_channel
from meshcore_packets.services.dedup import decoded_twin_window
from meshcore_packets.services.dedup_key import (
channel_text_dedup_key,
extract_sender_timestamp,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -42,6 +47,48 @@ def channel_idx_from_packet_raw_json(packet: MeshCoreRawPacket) -> int | None:
return None


def _envelope_payload(raw_json: dict[str, Any] | None) -> dict[str, Any]:
if not isinstance(raw_json, dict):
return {}
envelope = raw_json
nested = raw_json.get("raw")
if isinstance(nested, dict):
envelope = nested
payload = envelope.get("payload")
return payload if isinstance(payload, dict) else {}


def _content_key_for_channel_text(packet: MeshCoreTextPacket, observer) -> int | None:
if packet.channel_id is None:
return None
validated = packet.raw_json if isinstance(packet.raw_json, dict) else {}
return channel_text_dedup_key(
constellation_id=observer.constellation_id,
message_channel_id=packet.channel_id,
text=packet.text or validated.get("text") or "",
sender_timestamp=extract_sender_timestamp(validated),
)


def _content_key_for_raw(packet: MeshCoreRawPacket, observer, *, channel_idx: int | None) -> int | None:
if channel_idx is None:
return None
channel = resolve_mc_channel(observer, channel_idx)
if channel is None:
return None
raw_json = packet.raw_json if isinstance(packet.raw_json, dict) else {}
payload = _envelope_payload(raw_json)
text = raw_json.get("text") or payload.get("text") or ""
if not str(text).strip():
return None
return channel_text_dedup_key(
constellation_id=observer.constellation_id,
message_channel_id=channel.id,
text=str(text),
sender_timestamp=extract_sender_timestamp(raw_json) or payload.get("sender_timestamp"),
)


def _path_fields_from_observation(observation: MeshCorePacketObservation) -> dict | None:
if not observation.path_hashes:
return None
Expand Down Expand Up @@ -94,26 +141,65 @@ def apply_path_to_text_observation(
return True


def _pick_channel_text_twin(*, observer, anchor_time, channel_idx: int | None):
def _channel_text_candidates_for_observer(*, observer, anchor_time):
"""Channel text packets this feeder observed in the decoded-twin window."""
window = decoded_twin_window()
candidates = MeshCoreTextPacket.objects.filter(
observer=observer,
payload_type=MeshCorePayloadType.CHANNEL_TEXT,
rx_time__gte=anchor_time - window,
rx_time__lte=anchor_time + window,
).order_by("-rx_time")
count = candidates.count()
if count == 0:
return (
MeshCoreTextPacket.objects.filter(
observations__observer=observer,
payload_type=MeshCorePayloadType.CHANNEL_TEXT,
rx_time__gte=anchor_time - window,
rx_time__lte=anchor_time + window,
)
.distinct()
.order_by("-rx_time")
)


def _pick_by_content_key(*, candidates, content_key: int | None, observer):
if content_key is None:
return None
if count == 1:
return candidates.first()
matches = []
for packet in candidates:
if _content_key_for_channel_text(packet, observer) == content_key:
matches.append(packet)
if len(matches) == 1:
return matches[0]
if len(matches) > 1:
logger.debug("path_twin: %s content-key matches; skip merge", len(matches))
return None


def _pick_channel_text_twin(
*,
observer,
anchor_time,
channel_idx: int | None,
raw_packet: MeshCoreRawPacket | None = None,
):
candidates = list(_channel_text_candidates_for_observer(observer=observer, anchor_time=anchor_time))
if not candidates:
return None

content_key = None
if raw_packet is not None:
content_key = _content_key_for_raw(raw_packet, observer, channel_idx=channel_idx)
if content_key is not None:
twin = _pick_by_content_key(candidates=candidates, content_key=content_key, observer=observer)
if twin:
return twin

if len(candidates) == 1:
return candidates[0]

if channel_idx is not None:
channel = resolve_mc_channel(observer, channel_idx)
if channel:
narrowed = candidates.filter(channel=channel)
if narrowed.count() == 1:
return narrowed.first()
logger.debug("path_twin: %s channel_text candidates in window; skip merge", count)
narrowed = [p for p in candidates if p.channel_id == channel.id]
if len(narrowed) == 1:
return narrowed[0]

logger.debug("path_twin: %s channel_text candidates in window; skip merge", len(candidates))
return None


Expand All @@ -130,10 +216,12 @@ def sync_path_to_channel_text_twin(
fields = _path_fields_from_observation(observation)
if not fields:
return False
channel_idx = channel_idx_from_packet_raw_json(packet)
twin = _pick_channel_text_twin(
observer=observer,
anchor_time=packet.rx_time,
channel_idx=channel_idx_from_packet_raw_json(packet),
channel_idx=channel_idx,
raw_packet=packet,
)
if not twin:
return False
Expand All @@ -144,6 +232,7 @@ def sync_path_from_rx_log_twin(*, packet: MeshCoreTextPacket, observer) -> bool:
"""After ingesting channel_text, copy path from a nearby rx_log TEXT_MSG/PATH observation."""
if packet.payload_type != MeshCorePayloadType.CHANNEL_TEXT:
return False
content_key = _content_key_for_channel_text(packet, observer)
window = decoded_twin_window()
raw_packets = MeshCoreRawPacket.objects.filter(
observer=observer,
Expand All @@ -152,6 +241,26 @@ def sync_path_from_rx_log_twin(*, packet: MeshCoreTextPacket, observer) -> bool:
rx_time__gte=packet.rx_time - window,
rx_time__lte=packet.rx_time + window,
).order_by("-rx_time")

if content_key is not None:
for raw in raw_packets:
if rx_log_payload_typename(raw) not in PATH_RX_TYPENAMES:
continue
raw_key = _content_key_for_raw(
raw,
observer,
channel_idx=channel_idx_from_packet_raw_json(raw),
)
if raw_key != content_key:
continue
obs = MeshCorePacketObservation.objects.filter(packet=raw, observer=observer).first()
if not obs:
continue
fields = _path_fields_from_observation(obs)
if not fields:
continue
return apply_path_to_text_observation(text_packet=packet, observer=observer, **fields)

for raw in raw_packets:
if rx_log_payload_typename(raw) not in PATH_RX_TYPENAMES:
continue
Expand Down
Loading