Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions Meshflow/meshcore_packets/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
)
from meshcore_packets.services.channel import resolve_mc_channel
from meshcore_packets.services.dedup import find_existing_packet
from meshcore_packets.services.dedup_key import resolve_ingest_dedup_key
from meshcore_packets.services.path_hashes import enrich_validated_data_paths
from meshcore_packets.services.path_twin import sync_path_from_rx_log_twin, sync_path_to_channel_text_twin

Expand Down Expand Up @@ -90,13 +91,14 @@ def create(self, validated_data):
elif from_prefix:
from_prefix = normalize_mc_pubkey_prefix(from_prefix)

pkt_hash = validated_data.get("pkt_hash")
existing = find_existing_packet(
pkt_hash=pkt_hash,
rx_time=rx_time,
event_type=validated_data.get("event_type"),
raw_payload=str(validated_data.get("raw", "")),
channel_idx = validated_data.get("channel_idx")
channel = resolve_mc_channel(observer, channel_idx) if channel_idx is not None else None
dedup_key = resolve_ingest_dedup_key(
validated_data=validated_data,
observer=observer,
channel=channel,
)
existing = find_existing_packet(dedup_key=dedup_key, rx_time=rx_time)
if existing:
self.instance = existing
self._ensure_observation(existing, observer, validated_data, rx_time)
Expand All @@ -110,16 +112,14 @@ def create(self, validated_data):
"raw": MeshCorePayloadType.RAW,
}
ptype = payload_type_map[validated_data["payload_type"]]
channel_idx = validated_data.get("channel_idx")
channel = resolve_mc_channel(observer, channel_idx) if channel_idx is not None else None

base_fields = {
"observer": observer,
"payload_type": ptype,
"event_type": validated_data["event_type"],
"from_pubkey": from_pubkey,
"from_pubkey_prefix": from_prefix,
"pkt_hash": pkt_hash,
"pkt_hash": dedup_key,
"rx_time": rx_time,
"rx_rssi": validated_data.get("rx_rssi"),
"rx_snr": validated_data.get("rx_snr"),
Expand Down
39 changes: 32 additions & 7 deletions Meshflow/meshcore_packets/services/dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,23 @@

from meshcore_packets.models import MeshCoreRawPacket

# PostgreSQL BIGINT is signed; Django BigIntegerField maps to the same range.
SIGNED_BIGINT_MAX = (1 << 63) - 1
SIGNED_BIGINT_MIN = -(1 << 63)


def digest_to_signed_bigint(digest_hex: str) -> int:
"""Map SHA-256 hex (first 16 nibbles) into storable signed BIGINT range."""
return int(digest_hex[:16], 16) & SIGNED_BIGINT_MAX


def normalize_stored_pkt_hash(value: int) -> int:
"""Ensure wire or computed keys fit PostgreSQL BIGINT."""
v = int(value)
if SIGNED_BIGINT_MIN <= v <= SIGNED_BIGINT_MAX:
return v
return v & SIGNED_BIGINT_MAX


def dedup_window() -> timedelta:
minutes = getattr(settings, "MESHCORE_PACKET_DEDUP_WINDOW_MINUTES", 10)
Expand All @@ -22,26 +39,34 @@ def decoded_twin_window() -> timedelta:

def surrogate_pkt_hash(*, event_type: str, raw_payload: str) -> int:
digest = hashlib.sha256(f"{event_type}|{raw_payload}".encode()).hexdigest()
return int(digest[:16], 16)
return digest_to_signed_bigint(digest)


def find_existing_packet(
*,
pkt_hash: int | None,
rx_time,
dedup_key: int | None = None,
pkt_hash: int | None = None,
rx_time=None,
event_type: str | None = None,
raw_payload: str | None = None,
) -> MeshCoreRawPacket | None:
"""Find duplicate within the dedup window."""
if pkt_hash is None:
"""
Find duplicate within the dedup window.

Callers should pass ``dedup_key`` from ``resolve_ingest_dedup_key`` (stored on
``MeshCoreRawPacket.pkt_hash``). Legacy ``pkt_hash`` / surrogate args remain for
tests that have not migrated yet.
"""
key = dedup_key if dedup_key is not None else pkt_hash
if key is None:
if not event_type or raw_payload is None:
return None
pkt_hash = surrogate_pkt_hash(event_type=event_type, raw_payload=raw_payload)
key = surrogate_pkt_hash(event_type=event_type, raw_payload=raw_payload)

window = dedup_window()
return (
MeshCoreRawPacket.objects.filter(
pkt_hash=pkt_hash,
pkt_hash=key,
rx_time__gte=rx_time - window,
rx_time__lte=rx_time + window,
)
Expand Down
81 changes: 81 additions & 0 deletions Meshflow/meshcore_packets/services/dedup_key.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""Resolve MeshCore ingest deduplication keys (ADR-0004 partial, #387)."""

from __future__ import annotations

import hashlib
from typing import Any

from constellations.models import MessageChannel
from meshcore_packets.services.dedup import digest_to_signed_bigint, normalize_stored_pkt_hash, surrogate_pkt_hash
from nodes.models import ManagedNode


def _hash_payload(payload: str) -> int:
digest = hashlib.sha256(payload.encode()).hexdigest()
return digest_to_signed_bigint(digest)


def extract_sender_timestamp(validated_data: dict[str, Any]) -> int | None:
"""Read sender_timestamp from nested capture envelope when present."""
raw = validated_data.get("raw")
if not isinstance(raw, dict):
return None
envelope = raw
nested = raw.get("raw")
if isinstance(nested, dict):
envelope = nested
payload = envelope.get("payload")
if not isinstance(payload, dict):
return None
value = payload.get("sender_timestamp")
if value is None:
return None
try:
return int(value)
except TypeError, ValueError:
return None


def channel_text_dedup_key(
*,
constellation_id,
message_channel_id,
text: str,
sender_timestamp: int | None,
) -> int:
"""Canonical key for the same on-air channel broadcast across feeders."""
ts_part = sender_timestamp if sender_timestamp is not None else 0
normalized_text = (text or "").strip()
payload = f"channel_text|{constellation_id}|{message_channel_id}|{ts_part}|{normalized_text}"
return _hash_payload(payload)


def resolve_ingest_dedup_key(
*,
validated_data: dict[str, Any],
observer: ManagedNode,
channel: MessageChannel | None,
) -> int:
"""
Return the dedup key to store on MeshCoreRawPacket.pkt_hash and use for lookup.

Wire pkt_hash wins when provided. channel_text uses constellation + canonical
channel + sender_timestamp + text. Other types fall back to envelope surrogate hash.
"""
wire_hash = validated_data.get("pkt_hash")
if wire_hash is not None:
return normalize_stored_pkt_hash(wire_hash)

payload_type = validated_data.get("payload_type")
if payload_type == "channel_text" and channel is not None:
return channel_text_dedup_key(
constellation_id=observer.constellation_id,
message_channel_id=channel.id,
text=validated_data.get("text") or "",
sender_timestamp=extract_sender_timestamp(validated_data),
)

return surrogate_pkt_hash(
event_type=str(validated_data.get("event_type", "")),
raw_payload=str(validated_data.get("raw", "")),
)
44 changes: 21 additions & 23 deletions Meshflow/meshcore_packets/tests/test_canonical_channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@

from common.protocol import Protocol
from constellations.models import MessageChannel
from meshcore_packets.models import MeshCorePayloadType, MeshCoreTextPacket
from meshcore_packets.models import MeshCorePacketObservation, MeshCorePayloadType, MeshCoreTextPacket
from meshcore_packets.services.channel import resolve_mc_channel
from meshcore_packets.services.channel_sync import reconcile_mc_channels
from meshcore_packets.services.dedup_key import channel_text_dedup_key
from meshcore_packets.services.text_message import MeshCoreTextMessageService
from nodes.models import NodeAuth
from text_messages.models import TextMessage


@pytest.mark.django_db
Expand Down Expand Up @@ -71,44 +73,40 @@ def test_two_feeders_ingest_same_text_same_channel_id(meshcore_feeder, create_ma

now = timezone.now()
channel_a = resolve_mc_channel(meshcore_feeder["node"], 1)
packet_a = MeshCoreTextPacket.objects.create(
channel_b = resolve_mc_channel(feeder_b, 2)
assert channel_a.id == channel_b.id

dedup_key = channel_text_dedup_key(
constellation_id=constellation.id,
message_channel_id=channel_a.id,
text="hello #test",
sender_timestamp=1780409317,
)
packet = MeshCoreTextPacket.objects.create(
observer=meshcore_feeder["node"],
payload_type=MeshCorePayloadType.CHANNEL_TEXT,
event_type="channel_message",
rx_time=now,
raw_json={},
text="hello #test",
channel=channel_a,
pkt_hash=111,
pkt_hash=dedup_key,
)
from meshcore_packets.models import MeshCorePacketObservation

obs_a = MeshCorePacketObservation.objects.create(
packet=packet_a,
packet=packet,
observer=meshcore_feeder["node"],
channel=channel_a,
rx_time=now,
)
msg_a = MeshCoreTextMessageService().process_packet(packet_a, meshcore_feeder["node"], obs_a)
assert msg_a is not None
msg = MeshCoreTextMessageService().process_packet(packet, meshcore_feeder["node"], obs_a)
assert msg is not None

channel_b = resolve_mc_channel(feeder_b, 2)
packet_b = MeshCoreTextPacket.objects.create(
observer=feeder_b,
payload_type=MeshCorePayloadType.CHANNEL_TEXT,
event_type="channel_message",
rx_time=now,
raw_json={},
text="hello #test",
channel=channel_b,
pkt_hash=222,
)
obs_b = MeshCorePacketObservation.objects.create(
packet=packet_b,
packet=packet,
observer=feeder_b,
channel=channel_b,
rx_time=now,
)
msg_b = MeshCoreTextMessageService().process_packet(packet_b, feeder_b, obs_b)
assert msg_b is not None
assert msg_a.channel_id == msg_b.channel_id
assert MeshCoreTextMessageService().process_packet(packet, feeder_b, obs_b) is None
assert TextMessage.objects.filter(message_text="hello #test").count() == 1
assert msg.channel_id == channel_a.id
123 changes: 123 additions & 0 deletions Meshflow/meshcore_packets/tests/test_cross_feeder_dedup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
"""Cross-feeder channel_text dedup (#387)."""

from django.urls import reverse
from django.utils import timezone

import pytest
from rest_framework.test import APIClient

from common.protocol import Protocol
from meshcore_packets.models import (
MeshCorePacketObservation,
MeshCorePayloadType,
MeshCoreRawPacket,
)
from meshcore_packets.services.channel_sync import reconcile_mc_channels
from meshcore_packets.tests.conftest import (
FEEDER_B_MC_PUBKEY_PREFIX,
FEEDER_MC_PUBKEY_PREFIX,
feeder_url,
)
from nodes.models import NodeAuth
from text_messages.models import TextMessage

SENDER_TIMESTAMP = 1780409317
MESSAGE_TEXT = "PDY4 Paddy Mobile 4: Ping"


def _channel_text_payload(*, channel_idx: int, rx_time, rx_rssi: float):
return {
"event_type": "channel_message",
"payload_type": "channel_text",
"channel_idx": channel_idx,
"rx_time": rx_time.timestamp(),
"rx_rssi": rx_rssi,
"text": MESSAGE_TEXT,
"raw": {
"protocol": "meshcore",
"event_type": "channel_message",
"payload": {
"type": "CHAN",
"channel_idx": channel_idx,
"sender_timestamp": SENDER_TIMESTAMP,
"text": MESSAGE_TEXT,
},
},
}


@pytest.fixture
def second_mc_feeder(meshcore_feeder, create_managed_node, create_node_api_key):
from meshcore_packets.tests.conftest import FEEDER_B_MC_PUBKEY

constellation = meshcore_feeder["node"].constellation
node = create_managed_node(
meshtastic_node_id=None,
protocol=Protocol.MESHCORE,
name="MC Feeder B",
mc_pubkey=FEEDER_B_MC_PUBKEY,
constellation=constellation,
)
api_key = create_node_api_key(constellation=constellation)
NodeAuth.objects.create(api_key=api_key, node=node)
return {"node": node, "api_key": api_key}


def _setup_hashtag_channels(meshcore_feeder, second_mc_feeder):
reconcile_mc_channels(
meshcore_feeder["node"],
[{"mc_channel_idx": 1, "name": "test", "mc_channel_type": "HASHTAG", "mc_hashtag": "test"}],
)
reconcile_mc_channels(
second_mc_feeder["node"],
[{"mc_channel_idx": 2, "name": "test", "mc_channel_type": "HASHTAG", "mc_hashtag": "test"}],
)


@pytest.mark.django_db
@pytest.mark.parametrize("first_feeder", ("a", "b"))
def test_cross_feeder_channel_text_one_packet_two_observations(
meshcore_feeder,
second_mc_feeder,
first_feeder,
):
"""Same on-air channel post from two feeders → one packet, one TextMessage, heard×2."""
_setup_hashtag_channels(meshcore_feeder, second_mc_feeder)
now = timezone.now()
feeders = {
"a": (meshcore_feeder, 1, FEEDER_MC_PUBKEY_PREFIX),
"b": (second_mc_feeder, 2, FEEDER_B_MC_PUBKEY_PREFIX),
}
first, second = (feeders["a"], feeders["b"]) if first_feeder == "a" else (feeders["b"], feeders["a"])

def post(feeder_info, rssi):
feeder, channel_idx, prefix = feeder_info
client = APIClient()
client.credentials(HTTP_X_API_KEY=feeder["api_key"].key)
url = feeder_url("meshcore-feeder-packet-ingest", prefix)
return client.post(
url,
_channel_text_payload(channel_idx=channel_idx, rx_time=now, rx_rssi=rssi),
format="json",
)

r1 = post(first, -40.0)
r2 = post(second, -90.0)
assert r1.status_code == 201
assert r2.status_code == 201
packet_id_a = r1.data["packet_id"]
packet_id_b = r2.data["packet_id"]
assert packet_id_a == packet_id_b

assert MeshCoreRawPacket.objects.filter(payload_type=MeshCorePayloadType.CHANNEL_TEXT).count() == 1
packet = MeshCoreRawPacket.objects.get(id=packet_id_a)
assert packet.pkt_hash is not None
assert MeshCorePacketObservation.objects.filter(packet=packet).count() == 2
assert TextMessage.objects.filter(message_text=MESSAGE_TEXT).count() == 1

tm = TextMessage.objects.get(message_text=MESSAGE_TEXT)
list_url = reverse("textmessage-list")
response = APIClient().get(list_url, {"channel_id": tm.channel_id})
assert response.status_code == 200
row = next(item for item in response.data["results"] if item["id"] == str(tm.id))
assert len(row["heard"]) == 2
Loading
Loading