diff --git a/Meshflow/meshcore_packets/serializers.py b/Meshflow/meshcore_packets/serializers.py index f5ad844..e9f62e6 100644 --- a/Meshflow/meshcore_packets/serializers.py +++ b/Meshflow/meshcore_packets/serializers.py @@ -13,6 +13,7 @@ ) from meshcore_packets.services.channel import resolve_mc_channel from meshcore_packets.services.dedup import find_existing_packet +from meshcore_packets.services.dedup_key import resolve_ingest_dedup_key from meshcore_packets.services.path_hashes import enrich_validated_data_paths from meshcore_packets.services.path_twin import sync_path_from_rx_log_twin, sync_path_to_channel_text_twin @@ -90,13 +91,14 @@ def create(self, validated_data): elif from_prefix: from_prefix = normalize_mc_pubkey_prefix(from_prefix) - pkt_hash = validated_data.get("pkt_hash") - existing = find_existing_packet( - pkt_hash=pkt_hash, - rx_time=rx_time, - event_type=validated_data.get("event_type"), - raw_payload=str(validated_data.get("raw", "")), + channel_idx = validated_data.get("channel_idx") + channel = resolve_mc_channel(observer, channel_idx) if channel_idx is not None else None + dedup_key = resolve_ingest_dedup_key( + validated_data=validated_data, + observer=observer, + channel=channel, ) + existing = find_existing_packet(dedup_key=dedup_key, rx_time=rx_time) if existing: self.instance = existing self._ensure_observation(existing, observer, validated_data, rx_time) @@ -110,8 +112,6 @@ def create(self, validated_data): "raw": MeshCorePayloadType.RAW, } ptype = payload_type_map[validated_data["payload_type"]] - channel_idx = validated_data.get("channel_idx") - channel = resolve_mc_channel(observer, channel_idx) if channel_idx is not None else None base_fields = { "observer": observer, @@ -119,7 +119,7 @@ def create(self, validated_data): "event_type": validated_data["event_type"], "from_pubkey": from_pubkey, "from_pubkey_prefix": from_prefix, - "pkt_hash": pkt_hash, + "pkt_hash": dedup_key, "rx_time": rx_time, "rx_rssi": validated_data.get("rx_rssi"), "rx_snr": validated_data.get("rx_snr"), diff --git a/Meshflow/meshcore_packets/services/dedup.py b/Meshflow/meshcore_packets/services/dedup.py index 0bc97e1..e920a0a 100644 --- a/Meshflow/meshcore_packets/services/dedup.py +++ b/Meshflow/meshcore_packets/services/dedup.py @@ -9,6 +9,23 @@ from meshcore_packets.models import MeshCoreRawPacket +# PostgreSQL BIGINT is signed; Django BigIntegerField maps to the same range. +SIGNED_BIGINT_MAX = (1 << 63) - 1 +SIGNED_BIGINT_MIN = -(1 << 63) + + +def digest_to_signed_bigint(digest_hex: str) -> int: + """Map SHA-256 hex (first 16 nibbles) into storable signed BIGINT range.""" + return int(digest_hex[:16], 16) & SIGNED_BIGINT_MAX + + +def normalize_stored_pkt_hash(value: int) -> int: + """Ensure wire or computed keys fit PostgreSQL BIGINT.""" + v = int(value) + if SIGNED_BIGINT_MIN <= v <= SIGNED_BIGINT_MAX: + return v + return v & SIGNED_BIGINT_MAX + def dedup_window() -> timedelta: minutes = getattr(settings, "MESHCORE_PACKET_DEDUP_WINDOW_MINUTES", 10) @@ -22,26 +39,34 @@ def decoded_twin_window() -> timedelta: def surrogate_pkt_hash(*, event_type: str, raw_payload: str) -> int: digest = hashlib.sha256(f"{event_type}|{raw_payload}".encode()).hexdigest() - return int(digest[:16], 16) + return digest_to_signed_bigint(digest) def find_existing_packet( *, - pkt_hash: int | None, - rx_time, + dedup_key: int | None = None, + pkt_hash: int | None = None, + rx_time=None, event_type: str | None = None, raw_payload: str | None = None, ) -> MeshCoreRawPacket | None: - """Find duplicate within the dedup window.""" - if pkt_hash is None: + """ + Find duplicate within the dedup window. + + Callers should pass ``dedup_key`` from ``resolve_ingest_dedup_key`` (stored on + ``MeshCoreRawPacket.pkt_hash``). Legacy ``pkt_hash`` / surrogate args remain for + tests that have not migrated yet. + """ + key = dedup_key if dedup_key is not None else pkt_hash + if key is None: if not event_type or raw_payload is None: return None - pkt_hash = surrogate_pkt_hash(event_type=event_type, raw_payload=raw_payload) + key = surrogate_pkt_hash(event_type=event_type, raw_payload=raw_payload) window = dedup_window() return ( MeshCoreRawPacket.objects.filter( - pkt_hash=pkt_hash, + pkt_hash=key, rx_time__gte=rx_time - window, rx_time__lte=rx_time + window, ) diff --git a/Meshflow/meshcore_packets/services/dedup_key.py b/Meshflow/meshcore_packets/services/dedup_key.py new file mode 100644 index 0000000..e991aca --- /dev/null +++ b/Meshflow/meshcore_packets/services/dedup_key.py @@ -0,0 +1,81 @@ +"""Resolve MeshCore ingest deduplication keys (ADR-0004 partial, #387).""" + +from __future__ import annotations + +import hashlib +from typing import Any + +from constellations.models import MessageChannel +from meshcore_packets.services.dedup import digest_to_signed_bigint, normalize_stored_pkt_hash, surrogate_pkt_hash +from nodes.models import ManagedNode + + +def _hash_payload(payload: str) -> int: + digest = hashlib.sha256(payload.encode()).hexdigest() + return digest_to_signed_bigint(digest) + + +def extract_sender_timestamp(validated_data: dict[str, Any]) -> int | None: + """Read sender_timestamp from nested capture envelope when present.""" + raw = validated_data.get("raw") + if not isinstance(raw, dict): + return None + envelope = raw + nested = raw.get("raw") + if isinstance(nested, dict): + envelope = nested + payload = envelope.get("payload") + if not isinstance(payload, dict): + return None + value = payload.get("sender_timestamp") + if value is None: + return None + try: + return int(value) + except TypeError, ValueError: + return None + + +def channel_text_dedup_key( + *, + constellation_id, + message_channel_id, + text: str, + sender_timestamp: int | None, +) -> int: + """Canonical key for the same on-air channel broadcast across feeders.""" + ts_part = sender_timestamp if sender_timestamp is not None else 0 + normalized_text = (text or "").strip() + payload = f"channel_text|{constellation_id}|{message_channel_id}|{ts_part}|{normalized_text}" + return _hash_payload(payload) + + +def resolve_ingest_dedup_key( + *, + validated_data: dict[str, Any], + observer: ManagedNode, + channel: MessageChannel | None, +) -> int: + """ + Return the dedup key to store on MeshCoreRawPacket.pkt_hash and use for lookup. + + Wire pkt_hash wins when provided. channel_text uses constellation + canonical + channel + sender_timestamp + text. Other types fall back to envelope surrogate hash. + """ + wire_hash = validated_data.get("pkt_hash") + if wire_hash is not None: + return normalize_stored_pkt_hash(wire_hash) + + payload_type = validated_data.get("payload_type") + if payload_type == "channel_text" and channel is not None: + return channel_text_dedup_key( + constellation_id=observer.constellation_id, + message_channel_id=channel.id, + text=validated_data.get("text") or "", + sender_timestamp=extract_sender_timestamp(validated_data), + ) + + return surrogate_pkt_hash( + event_type=str(validated_data.get("event_type", "")), + raw_payload=str(validated_data.get("raw", "")), + ) diff --git a/Meshflow/meshcore_packets/tests/test_canonical_channels.py b/Meshflow/meshcore_packets/tests/test_canonical_channels.py index babbe32..f9bbc7e 100644 --- a/Meshflow/meshcore_packets/tests/test_canonical_channels.py +++ b/Meshflow/meshcore_packets/tests/test_canonical_channels.py @@ -6,11 +6,13 @@ from common.protocol import Protocol from constellations.models import MessageChannel -from meshcore_packets.models import MeshCorePayloadType, MeshCoreTextPacket +from meshcore_packets.models import MeshCorePacketObservation, MeshCorePayloadType, MeshCoreTextPacket from meshcore_packets.services.channel import resolve_mc_channel from meshcore_packets.services.channel_sync import reconcile_mc_channels +from meshcore_packets.services.dedup_key import channel_text_dedup_key from meshcore_packets.services.text_message import MeshCoreTextMessageService from nodes.models import NodeAuth +from text_messages.models import TextMessage @pytest.mark.django_db @@ -71,7 +73,16 @@ def test_two_feeders_ingest_same_text_same_channel_id(meshcore_feeder, create_ma now = timezone.now() channel_a = resolve_mc_channel(meshcore_feeder["node"], 1) - packet_a = MeshCoreTextPacket.objects.create( + channel_b = resolve_mc_channel(feeder_b, 2) + assert channel_a.id == channel_b.id + + dedup_key = channel_text_dedup_key( + constellation_id=constellation.id, + message_channel_id=channel_a.id, + text="hello #test", + sender_timestamp=1780409317, + ) + packet = MeshCoreTextPacket.objects.create( observer=meshcore_feeder["node"], payload_type=MeshCorePayloadType.CHANNEL_TEXT, event_type="channel_message", @@ -79,36 +90,23 @@ def test_two_feeders_ingest_same_text_same_channel_id(meshcore_feeder, create_ma raw_json={}, text="hello #test", channel=channel_a, - pkt_hash=111, + pkt_hash=dedup_key, ) - from meshcore_packets.models import MeshCorePacketObservation - obs_a = MeshCorePacketObservation.objects.create( - packet=packet_a, + packet=packet, observer=meshcore_feeder["node"], channel=channel_a, rx_time=now, ) - msg_a = MeshCoreTextMessageService().process_packet(packet_a, meshcore_feeder["node"], obs_a) - assert msg_a is not None + msg = MeshCoreTextMessageService().process_packet(packet, meshcore_feeder["node"], obs_a) + assert msg is not None - channel_b = resolve_mc_channel(feeder_b, 2) - packet_b = MeshCoreTextPacket.objects.create( - observer=feeder_b, - payload_type=MeshCorePayloadType.CHANNEL_TEXT, - event_type="channel_message", - rx_time=now, - raw_json={}, - text="hello #test", - channel=channel_b, - pkt_hash=222, - ) obs_b = MeshCorePacketObservation.objects.create( - packet=packet_b, + packet=packet, observer=feeder_b, channel=channel_b, rx_time=now, ) - msg_b = MeshCoreTextMessageService().process_packet(packet_b, feeder_b, obs_b) - assert msg_b is not None - assert msg_a.channel_id == msg_b.channel_id + assert MeshCoreTextMessageService().process_packet(packet, feeder_b, obs_b) is None + assert TextMessage.objects.filter(message_text="hello #test").count() == 1 + assert msg.channel_id == channel_a.id diff --git a/Meshflow/meshcore_packets/tests/test_cross_feeder_dedup.py b/Meshflow/meshcore_packets/tests/test_cross_feeder_dedup.py new file mode 100644 index 0000000..135a488 --- /dev/null +++ b/Meshflow/meshcore_packets/tests/test_cross_feeder_dedup.py @@ -0,0 +1,123 @@ +"""Cross-feeder channel_text dedup (#387).""" + +from django.urls import reverse +from django.utils import timezone + +import pytest +from rest_framework.test import APIClient + +from common.protocol import Protocol +from meshcore_packets.models import ( + MeshCorePacketObservation, + MeshCorePayloadType, + MeshCoreRawPacket, +) +from meshcore_packets.services.channel_sync import reconcile_mc_channels +from meshcore_packets.tests.conftest import ( + FEEDER_B_MC_PUBKEY_PREFIX, + FEEDER_MC_PUBKEY_PREFIX, + feeder_url, +) +from nodes.models import NodeAuth +from text_messages.models import TextMessage + +SENDER_TIMESTAMP = 1780409317 +MESSAGE_TEXT = "PDY4 Paddy Mobile 4: Ping" + + +def _channel_text_payload(*, channel_idx: int, rx_time, rx_rssi: float): + return { + "event_type": "channel_message", + "payload_type": "channel_text", + "channel_idx": channel_idx, + "rx_time": rx_time.timestamp(), + "rx_rssi": rx_rssi, + "text": MESSAGE_TEXT, + "raw": { + "protocol": "meshcore", + "event_type": "channel_message", + "payload": { + "type": "CHAN", + "channel_idx": channel_idx, + "sender_timestamp": SENDER_TIMESTAMP, + "text": MESSAGE_TEXT, + }, + }, + } + + +@pytest.fixture +def second_mc_feeder(meshcore_feeder, create_managed_node, create_node_api_key): + from meshcore_packets.tests.conftest import FEEDER_B_MC_PUBKEY + + constellation = meshcore_feeder["node"].constellation + node = create_managed_node( + meshtastic_node_id=None, + protocol=Protocol.MESHCORE, + name="MC Feeder B", + mc_pubkey=FEEDER_B_MC_PUBKEY, + constellation=constellation, + ) + api_key = create_node_api_key(constellation=constellation) + NodeAuth.objects.create(api_key=api_key, node=node) + return {"node": node, "api_key": api_key} + + +def _setup_hashtag_channels(meshcore_feeder, second_mc_feeder): + reconcile_mc_channels( + meshcore_feeder["node"], + [{"mc_channel_idx": 1, "name": "test", "mc_channel_type": "HASHTAG", "mc_hashtag": "test"}], + ) + reconcile_mc_channels( + second_mc_feeder["node"], + [{"mc_channel_idx": 2, "name": "test", "mc_channel_type": "HASHTAG", "mc_hashtag": "test"}], + ) + + +@pytest.mark.django_db +@pytest.mark.parametrize("first_feeder", ("a", "b")) +def test_cross_feeder_channel_text_one_packet_two_observations( + meshcore_feeder, + second_mc_feeder, + first_feeder, +): + """Same on-air channel post from two feeders → one packet, one TextMessage, heard×2.""" + _setup_hashtag_channels(meshcore_feeder, second_mc_feeder) + now = timezone.now() + feeders = { + "a": (meshcore_feeder, 1, FEEDER_MC_PUBKEY_PREFIX), + "b": (second_mc_feeder, 2, FEEDER_B_MC_PUBKEY_PREFIX), + } + first, second = (feeders["a"], feeders["b"]) if first_feeder == "a" else (feeders["b"], feeders["a"]) + + def post(feeder_info, rssi): + feeder, channel_idx, prefix = feeder_info + client = APIClient() + client.credentials(HTTP_X_API_KEY=feeder["api_key"].key) + url = feeder_url("meshcore-feeder-packet-ingest", prefix) + return client.post( + url, + _channel_text_payload(channel_idx=channel_idx, rx_time=now, rx_rssi=rssi), + format="json", + ) + + r1 = post(first, -40.0) + r2 = post(second, -90.0) + assert r1.status_code == 201 + assert r2.status_code == 201 + packet_id_a = r1.data["packet_id"] + packet_id_b = r2.data["packet_id"] + assert packet_id_a == packet_id_b + + assert MeshCoreRawPacket.objects.filter(payload_type=MeshCorePayloadType.CHANNEL_TEXT).count() == 1 + packet = MeshCoreRawPacket.objects.get(id=packet_id_a) + assert packet.pkt_hash is not None + assert MeshCorePacketObservation.objects.filter(packet=packet).count() == 2 + assert TextMessage.objects.filter(message_text=MESSAGE_TEXT).count() == 1 + + tm = TextMessage.objects.get(message_text=MESSAGE_TEXT) + list_url = reverse("textmessage-list") + response = APIClient().get(list_url, {"channel_id": tm.channel_id}) + assert response.status_code == 200 + row = next(item for item in response.data["results"] if item["id"] == str(tm.id)) + assert len(row["heard"]) == 2 diff --git a/Meshflow/meshcore_packets/tests/test_dedup_key.py b/Meshflow/meshcore_packets/tests/test_dedup_key.py new file mode 100644 index 0000000..4336ec8 --- /dev/null +++ b/Meshflow/meshcore_packets/tests/test_dedup_key.py @@ -0,0 +1,104 @@ +"""Tests for meshcore_packets.services.dedup_key.""" + +import pytest + +from meshcore_packets.services.channel_sync import reconcile_mc_channels +from meshcore_packets.services.dedup import SIGNED_BIGINT_MAX, surrogate_pkt_hash +from meshcore_packets.services.dedup_key import ( + channel_text_dedup_key, + extract_sender_timestamp, + resolve_ingest_dedup_key, +) + + +def test_extract_sender_timestamp_from_nested_envelope(): + data = { + "raw": { + "protocol": "meshcore", + "event_type": "channel_message", + "payload": {"sender_timestamp": 1780409317, "text": "hello"}, + }, + } + assert extract_sender_timestamp(data) == 1780409317 + + +def test_surrogate_pkt_hash_fits_postgresql_bigint(): + key = surrogate_pkt_hash(event_type="channel_message", raw_payload="x" * 5000) + assert 0 <= key <= SIGNED_BIGINT_MAX + + +def test_channel_text_dedup_key_stable_across_feeder_envelopes(): + constellation_id = "11111111-1111-1111-1111-111111111111" + channel_id = "22222222-2222-2222-2222-222222222222" + text = "PDY4 Paddy Mobile 4: Ping" + ts = 1780409317 + key_a = channel_text_dedup_key( + constellation_id=constellation_id, + message_channel_id=channel_id, + text=text, + sender_timestamp=ts, + ) + key_b = channel_text_dedup_key( + constellation_id=constellation_id, + message_channel_id=channel_id, + text=text, + sender_timestamp=ts, + ) + assert key_a == key_b + + surrogate_a = surrogate_pkt_hash( + event_type="channel_message", + raw_payload=str({"rssi": -40, "text": text}), + ) + surrogate_b = surrogate_pkt_hash( + event_type="channel_message", + raw_payload=str({"rssi": -90, "text": text}), + ) + assert key_a != surrogate_a + assert key_a != surrogate_b + + +def test_resolve_ingest_dedup_key_prefers_wire_hash(): + wire = 42424242 + + class _Observer: + constellation_id = None + + key = resolve_ingest_dedup_key( + validated_data={ + "payload_type": "channel_text", + "event_type": "channel_message", + "pkt_hash": wire, + "text": "ignored for wire", + "raw": {}, + }, + observer=_Observer(), + channel=None, + ) + assert key == wire + + +@pytest.mark.django_db +def test_resolve_ingest_dedup_key_channel_content(meshcore_feeder): + reconcile_mc_channels( + meshcore_feeder["node"], + [{"mc_channel_idx": 0, "name": "Public", "mc_channel_type": "PUBLIC"}], + ) + channel = meshcore_feeder["node"].mc_channels.first() + key = resolve_ingest_dedup_key( + validated_data={ + "payload_type": "channel_text", + "event_type": "channel_message", + "text": "hello mesh", + "raw": {"payload": {"sender_timestamp": 99}}, + }, + observer=meshcore_feeder["node"], + channel=channel, + ) + expected = channel_text_dedup_key( + constellation_id=meshcore_feeder["node"].constellation_id, + message_channel_id=channel.id, + text="hello mesh", + sender_timestamp=99, + ) + assert key == expected diff --git a/Meshflow/meshcore_packets/tests/test_ingest.py b/Meshflow/meshcore_packets/tests/test_ingest.py index 2a41ead..2842f15 100644 --- a/Meshflow/meshcore_packets/tests/test_ingest.py +++ b/Meshflow/meshcore_packets/tests/test_ingest.py @@ -8,7 +8,7 @@ from rest_framework.test import APIClient from common.protocol import Protocol -from meshcore_packets.models import MeshCoreRawPacket +from meshcore_packets.models import MeshCorePayloadType, MeshCoreRawPacket from meshcore_packets.services.channel_sync import reconcile_mc_channels from meshcore_packets.tests.conftest import FEEDER_MC_PUBKEY_PREFIX, feeder_url from nodes.models import MeshCoreLocationSource, NodeAuth, ObservedNode, Position @@ -224,6 +224,37 @@ def test_meshcore_contact_text_ingest(ingest_client): assert ObservedNode.objects.filter(protocol=Protocol.MESHCORE, mc_pubkey_prefix=PREFIX).exists() +@pytest.mark.django_db +def test_meshcore_channel_text_without_pkt_hash_dedup_same_feeder(ingest_client, meshcore_feeder): + reconcile_mc_channels( + meshcore_feeder["node"], + [{"mc_channel_idx": 0, "name": "Public", "mc_channel_type": "PUBLIC"}], + ) + now = timezone.now() + payload = { + "event_type": "channel_message", + "payload_type": "channel_text", + "channel_idx": 0, + "rx_time": now.timestamp(), + "text": "dedup ping", + "raw": { + "protocol": "meshcore", + "event_type": "channel_message", + "payload": {"sender_timestamp": 1234567890, "text": "dedup ping"}, + }, + } + url = feeder_url("meshcore-feeder-packet-ingest", FEEDER_MC_PUBKEY_PREFIX) + r1 = ingest_client.post(url, payload, format="json") + r2 = ingest_client.post(url, payload, format="json") + assert r1.status_code == 201 + assert r2.status_code == 201 + assert r1.data["packet_id"] == r2.data["packet_id"] + assert MeshCoreRawPacket.objects.filter(payload_type=MeshCorePayloadType.CHANNEL_TEXT).count() == 1 + packet = MeshCoreRawPacket.objects.get(id=r1.data["packet_id"]) + assert packet.pkt_hash is not None + assert TextMessage.objects.filter(message_text="dedup ping").count() == 1 + + @pytest.mark.django_db def test_meshcore_dedup_returns_existing(ingest_client): now = timezone.now() diff --git a/docs/features/meshcore/phase-3-outstanding.md b/docs/features/meshcore/phase-3-outstanding.md index 3769054..2783446 100644 --- a/docs/features/meshcore/phase-3-outstanding.md +++ b/docs/features/meshcore/phase-3-outstanding.md @@ -30,6 +30,7 @@ Gap analysis (Jun 2026): channel **Heard** is the user-facing “map view” for ## Passive path +- [ ] **Cross-feeder channel message dedup** — [#387](https://github.com/pskillen/meshflow-api/issues/387): one `TextMessage` + N observations when multiple feeders hear the same `channel_text` (PR in flight). Pre-prod duplicate rows are not backfilled. - [ ] **Tier 1 — message path data chain** — [#385](https://github.com/pskillen/meshflow-api/issues/385): `path_hashes` on observation tied to `TextMessage.original_mc_packet` for channel traffic. Detail: [packet-path-tracing-outstanding.md § Message path data chain](./packet-path-tracing/packet-path-tracing-outstanding.md#message-path-data-chain-confirmed--pre-prod-jun-2026). - [ ] **Passive packet path subsystem (M1+)** — rollups, resolution table, Neo4j export, realtime/history UI ([ADR-0001](./packet-path-tracing/adr/0001-meshcore-packet-path-tracing-subsystem.md)); merge/deploy PRs [#378](https://github.com/pskillen/meshflow-api/pull/378), [bot#122](https://github.com/pskillen/meshflow-bot/pull/122), [ui#310](https://github.com/pskillen/meshflow-ui/pull/310). - [ ] **Tier 2 — `heard[]` → segment resolution table** — augment `bulk_format_path_hops` in `text_messages/views.py` with `MeshCorePathSegmentResolution` (manual + resolved rows). diff --git a/docs/features/meshcore/text-message-channels.md b/docs/features/meshcore/text-message-channels.md index c6afea1..9590451 100644 --- a/docs/features/meshcore/text-message-channels.md +++ b/docs/features/meshcore/text-message-channels.md @@ -286,7 +286,7 @@ Per-feeder sighting of a packet, with optional `channel` FK (same `MessageChanne | `recipient_meshtastic_node_id` | MT broadcast sentinel; null for MC broadcast | | `sent_at` | MC uses packet `rx_time` where applicable | -**Dedup:** one `TextMessage` per raw packet (`original_packet` or `original_mc_packet`). +**Dedup:** one `TextMessage` per **on-air channel transmission** (deduped `MeshCoreTextPacket`; multiple feeders add observations only — [#387](https://github.com/pskillen/meshflow-api/issues/387)). MT: one per deduped `MtRawPacket` via `original_packet`. **History API (planned):** existing `GET /api/messages/` list stays **channel broadcast** only (like MT today): include MC rows with `protocol=MESHCORE`, `sender` null, channel set; **store** contact/DM rows but expose them via a future DM endpoint. @@ -300,8 +300,8 @@ Per-feeder sighting of a packet, with optional `channel` FK (same `MessageChanne Flow for text packets: 1. Validate envelope (`payload_type` `channel_text` or `contact_text`, `text` required). -2. Dedup by `pkt_hash` + time window ([`dedup.py`](../../../Meshflow/meshcore_packets/services/dedup.py)). -3. **`resolve_mc_channel(observer, channel_idx)`** — [`channel.py`](../../../Meshflow/meshcore_packets/services/channel.py). +2. **`resolve_mc_channel(observer, channel_idx)`** — [`channel.py`](../../../Meshflow/meshcore_packets/services/channel.py) (before dedup key so canonical channel id is shared across feeders). +3. Resolve dedup key ([`dedup_key.py`](../../../Meshflow/meshcore_packets/services/dedup_key.py)); lookup + persist on [`dedup.py`](../../../Meshflow/meshcore_packets/services/dedup.py) / `pkt_hash` column. ### `resolve_mc_channel` diff --git a/docs/features/packet-ingestion/adr/0004-mc-dedup-key.md b/docs/features/packet-ingestion/adr/0004-mc-dedup-key.md index aa95550..ad341a2 100644 --- a/docs/features/packet-ingestion/adr/0004-mc-dedup-key.md +++ b/docs/features/packet-ingestion/adr/0004-mc-dedup-key.md @@ -1,8 +1,8 @@ # ADR-0004 — MeshCore deduplication key -**Status:** Proposed +**Status:** Accepted (partial — [#387](https://github.com/pskillen/meshflow-api/issues/387)) **Date:** 2026-05-12 -**Tracking:** [meshflow-api#276](https://github.com/pskillen/meshflow-api/issues/276) +**Tracking:** [meshflow-api#276](https://github.com/pskillen/meshflow-api/issues/276); cross-feeder channel dedup [#387](https://github.com/pskillen/meshflow-api/issues/387) ## Context @@ -36,6 +36,15 @@ The previous backend-migration plan proposed `(from_pubkey_hash, packet_hash, rx 5. **Non-text non-`rx_log_data` events** (`advertisement`, `path_update`, `discover_response`, `control_data`, `messages_waiting`, `trace_data`) follow the same surrogate-hash rule: their dedup key is `(event_type, sha256(canonical payload), rx_time window)`. These are infrequent enough that the collision risk of the surrogate hash is irrelevant in practice. 6. **Per-observer idempotency** (same as MT): `MeshCorePacketObservation` has a unique constraint on `(packet, observer)` so retries from the bot never create duplicates. +## Implementation ([#387](https://github.com/pskillen/meshflow-api/issues/387), partial) + +Shipped in `meshcore_packets/services/dedup_key.py` + ingest serializer: + +- **`MeshCoreRawPacket.pkt_hash`** stores the **resolved dedup key** on create (wire `pkt_hash` when present, otherwise computed). +- **`channel_text`** without wire hash: content key `SHA-256(channel_text|constellation_id|message_channel_id|sender_timestamp|text)` (canonical `MessageChannel.id` after `resolve_mc_channel`; `sender_timestamp` from nested capture payload, `0` if missing). +- **Cross-feeder:** same on-air channel post → one `MeshCoreTextPacket`, N `MeshCorePacketObservation`, one `TextMessage` (Meshtastic parity). +- **Not yet:** separate `surrogate_hash` column, partial unique index, `raw_packet_fk` decoded-twin linkage ([#276](https://github.com/pskillen/meshflow-api/issues/276)); `contact_text` cross-feeder dedup (follow-up). + ## Consequences - **`pkt_hash` is signed in JSON.** The sample shows a positive value, but the companion's encoding can produce values outside the `int32` positive range. Store as `BigIntegerField` and treat as opaque. Do not reinterpret the sign. diff --git a/docs/features/packet-ingestion/meshcore.md b/docs/features/packet-ingestion/meshcore.md index b997a10..b471c94 100644 --- a/docs/features/packet-ingestion/meshcore.md +++ b/docs/features/packet-ingestion/meshcore.md @@ -48,7 +48,7 @@ Code: `meshflow-bot/src/meshcore/serializers.py`, `src/api/StorageAPI.py` (`stor Also creates or updates **MeshCorePacketObservation** per `(packet, observer)` (deduped). Repeater **path_hashes** are stored on the observation row only (not on deduped `MeshCoreRawPacket`), so two feeders reporting the same `pkt_hash` can keep different paths. See [ADR-0001 (path hash resolution)](../traceroute/adr/0001-mc-path-hash-resolution.md). -Dedup: `meshcore_packets/services/dedup.py` — `pkt_hash` + time window (see [adr/0004-mc-dedup-key.md](adr/0004-mc-dedup-key.md)). +Dedup: `meshcore_packets/services/dedup_key.py` resolves the key; `dedup.py` looks up by stored `pkt_hash` + time window (see [adr/0004-mc-dedup-key.md](adr/0004-mc-dedup-key.md)). **Multi-feeder `channel_text`:** same on-air transmission (wire `pkt_hash` or content key) → one `MeshCoreTextPacket`, one `TextMessage`, N observations — Heard shows all feeders ([#387](https://github.com/pskillen/meshflow-api/issues/387)). ### API short-circuits