From d2f64c3a25553b4c80791e321af640400a9fd32f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 9 Apr 2026 03:31:29 +0000 Subject: [PATCH 1/2] Initial plan From 8e6e44eba9dd351f58a4a5c63056c1a14f3d86b4 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 9 Apr 2026 03:52:27 +0000 Subject: [PATCH 2/2] feat: integrate HMAC-SHA1 webhook signature verification and idempotency guard (#144) Agent-Logs-Url: https://github.com/Tryonme-com/tryonyou-app/sessions/c596bbd8-1257-4414-9582-b81ddd2e0f13 Co-authored-by: LVT-ENG <214667862+LVT-ENG@users.noreply.github.com> --- api/index.py | 29 ++++++++++- api/peacock_core.py | 84 ++++++++++++++++++++++++++++++ tests/test_peacock_core.py | 101 ++++++++++++++++++++++++++++++++++++- 3 files changed, 212 insertions(+), 2 deletions(-) diff --git a/api/index.py b/api/index.py index 565a8bfd..7c1d845a 100644 --- a/api/index.py +++ b/api/index.py @@ -1,4 +1,5 @@ import json +import os import sys from pathlib import Path @@ -15,10 +16,14 @@ orchestrate_mirror_shadow_dwell, ) from mirror_digital_make import forward_mirror_event +from peacock_core import IdempotencyGuard, verify_webhook_sha1_signature from stripe_inauguration import create_inauguration_checkout_session app = Flask(__name__) +# Guardia de idempotencia compartida para prevenir procesamiento duplicado de eventos. +_idempotency_guard = IdempotencyGuard() + @app.route("/") def home(): @@ -78,7 +83,29 @@ def mirror_digital_event_options(): @app.route("/api/mirror_digital_event", methods=["POST"]) @app.route("/mirror_digital_event", methods=["POST"]) def mirror_digital_event(): - body = request.get_json(force=True, silent=True) or {} + raw_body = request.get_data(cache=True) + + # Verificación de firma HMAC-SHA1 cuando el secreto está configurado. + webhook_secret = os.environ.get("MAKE_WEBHOOK_SECRET", "").strip() + if webhook_secret: + sig = request.headers.get("X-Hub-Signature", "") + if not verify_webhook_sha1_signature(raw_body, sig, webhook_secret): + return _cors(jsonify({"status": "error", "message": "invalid_signature"})), 401 + + try: + body = json.loads(raw_body) if raw_body else {} + if not isinstance(body, dict): + body = {} + except (json.JSONDecodeError, ValueError): + body = {} + + # Guardia de idempotencia: rechaza eventos ya procesados recientemente. + event_id = str(body.get("event_id") or "").strip() + if event_id: + if _idempotency_guard.is_duplicate(event_id): + return _cors(jsonify({"status": "ok", "duplicate": True, "skipped": True})), 200 + _idempotency_guard.mark_seen(event_id) + payload, code = forward_mirror_event(body) return _cors(jsonify(payload)), code diff --git a/api/peacock_core.py b/api/peacock_core.py index 0a6b35ba..6a941004 100644 --- a/api/peacock_core.py +++ b/api/peacock_core.py @@ -4,10 +4,16 @@ Reglas: - Webhooks HTTP prohibidos hacia abvetos.com (activación de licencia interna / manual). - Presupuesto de latencia crítica Zero-Size (API / handshake): ver ZERO_SIZE_LATENCY_BUDGET_MS. + - Verificación de firma HMAC-SHA1 para webhooks entrantes (Make.com). + - Guardia de idempotencia para prevenir procesamiento concurrente duplicado. """ from __future__ import annotations +import hashlib +import hmac +import threading +import time from urllib.parse import urlparse ZERO_SIZE_LATENCY_BUDGET_MS = 25 @@ -33,3 +39,81 @@ def is_webhook_destination_forbidden(url: str) -> bool: if frag in host: return True return False + + +def verify_webhook_sha1_signature( + payload: bytes, + signature_header: str, + secret: str, +) -> bool: + """Verifica una firma HMAC-SHA1 de un webhook entrante. + + Args: + payload: Cuerpo crudo de la petición (bytes). + signature_header: Valor de la cabecera ``X-Hub-Signature`` en formato + ``sha1=``. + secret: Secreto compartido configurado en el emisor del webhook. + + Returns: + ``True`` si la firma es válida, ``False`` en cualquier otro caso. + """ + if not payload or not signature_header or not secret: + return False + if not signature_header.startswith("sha1="): + return False + received_digest = signature_header[len("sha1="):] + expected_digest = hmac.new( + secret.encode("utf-8"), + payload, + hashlib.sha1, + ).hexdigest() + return hmac.compare_digest(received_digest.lower(), expected_digest.lower()) + + +class IdempotencyGuard: + """Previene el procesamiento concurrente duplicado de eventos webhook. + + Mantiene un registro en memoria de los identificadores de evento procesados + recientemente. Las entradas expiran después de ``ttl_seconds`` para evitar + un crecimiento ilimitado de la memoria. + + Args: + ttl_seconds: Tiempo de vida (segundos) de cada entrada. Por defecto 300 s. + max_size: Número máximo de entradas. Si se supera se eliminan las más + antiguas. Por defecto 10 000. + """ + + def __init__(self, ttl_seconds: float = 300.0, max_size: int = 10_000) -> None: + self._ttl = ttl_seconds + self._max_size = max_size + self._seen: dict[str, float] = {} + self._lock = threading.Lock() + + def is_duplicate(self, event_id: str) -> bool: + """Devuelve ``True`` si *event_id* ya fue procesado y su TTL sigue activo.""" + if not event_id: + return False + now = time.monotonic() + with self._lock: + self._evict(now) + return event_id in self._seen + + def mark_seen(self, event_id: str) -> None: + """Registra *event_id* como procesado.""" + if not event_id: + return + now = time.monotonic() + with self._lock: + self._evict(now) + if len(self._seen) >= self._max_size: + # Elimina la entrada más antigua + oldest = min(self._seen, key=lambda k: self._seen[k]) + del self._seen[oldest] + self._seen[event_id] = now + + def _evict(self, now: float) -> None: + """Elimina entradas expiradas (debe llamarse con el lock adquirido).""" + cutoff = now - self._ttl + expired = [k for k, ts in self._seen.items() if ts < cutoff] + for k in expired: + del self._seen[k] diff --git a/tests/test_peacock_core.py b/tests/test_peacock_core.py index 2141ea0f..ae6b1e4e 100644 --- a/tests/test_peacock_core.py +++ b/tests/test_peacock_core.py @@ -2,15 +2,23 @@ from __future__ import annotations +import hashlib +import hmac import os import sys +import time import unittest _API = os.path.normpath(os.path.join(os.path.dirname(__file__), "..", "api")) if _API not in sys.path: sys.path.insert(0, _API) -from peacock_core import ZERO_SIZE_LATENCY_BUDGET_MS, is_webhook_destination_forbidden +from peacock_core import ( + ZERO_SIZE_LATENCY_BUDGET_MS, + IdempotencyGuard, + is_webhook_destination_forbidden, + verify_webhook_sha1_signature, +) class TestPeacockCoreIntegration(unittest.TestCase): @@ -34,5 +42,96 @@ def test_make_and_slack_like_urls_allowed(self) -> None: ) +# --------------------------------------------------------------------------- +# verify_webhook_sha1_signature +# --------------------------------------------------------------------------- + +def _make_sig(payload: bytes, secret: str) -> str: + """Helper: genera una firma HMAC-SHA1 válida.""" + digest = hmac.new(secret.encode("utf-8"), payload, hashlib.sha1).hexdigest() + return f"sha1={digest}" + + +class TestVerifyWebhookSha1Signature(unittest.TestCase): + _SECRET = "test-secret-key" + _PAYLOAD = b'{"event": "balmain_click"}' + + def test_valid_signature_accepted(self) -> None: + sig = _make_sig(self._PAYLOAD, self._SECRET) + self.assertTrue(verify_webhook_sha1_signature(self._PAYLOAD, sig, self._SECRET)) + + def test_wrong_secret_rejected(self) -> None: + sig = _make_sig(self._PAYLOAD, "other-secret") + self.assertFalse(verify_webhook_sha1_signature(self._PAYLOAD, sig, self._SECRET)) + + def test_tampered_payload_rejected(self) -> None: + sig = _make_sig(self._PAYLOAD, self._SECRET) + tampered = b'{"event": "balmain_click", "injected": true}' + self.assertFalse(verify_webhook_sha1_signature(tampered, sig, self._SECRET)) + + def test_missing_prefix_rejected(self) -> None: + digest = hmac.new(self._SECRET.encode(), self._PAYLOAD, hashlib.sha1).hexdigest() + self.assertFalse(verify_webhook_sha1_signature(self._PAYLOAD, digest, self._SECRET)) + + def test_empty_payload_rejected(self) -> None: + sig = _make_sig(self._PAYLOAD, self._SECRET) + self.assertFalse(verify_webhook_sha1_signature(b"", sig, self._SECRET)) + + def test_empty_signature_rejected(self) -> None: + self.assertFalse(verify_webhook_sha1_signature(self._PAYLOAD, "", self._SECRET)) + + def test_empty_secret_rejected(self) -> None: + sig = _make_sig(self._PAYLOAD, self._SECRET) + self.assertFalse(verify_webhook_sha1_signature(self._PAYLOAD, sig, "")) + + def test_case_insensitive_digest(self) -> None: + digest = hmac.new(self._SECRET.encode(), self._PAYLOAD, hashlib.sha1).hexdigest().upper() + sig = f"sha1={digest}" + self.assertTrue(verify_webhook_sha1_signature(self._PAYLOAD, sig, self._SECRET)) + + +# --------------------------------------------------------------------------- +# IdempotencyGuard +# --------------------------------------------------------------------------- + +class TestIdempotencyGuard(unittest.TestCase): + def setUp(self) -> None: + self.guard = IdempotencyGuard(ttl_seconds=10.0, max_size=5) + + def test_new_event_is_not_duplicate(self) -> None: + self.assertFalse(self.guard.is_duplicate("evt-001")) + + def test_seen_event_is_duplicate(self) -> None: + self.guard.mark_seen("evt-001") + self.assertTrue(self.guard.is_duplicate("evt-001")) + + def test_different_event_ids_independent(self) -> None: + self.guard.mark_seen("evt-001") + self.assertFalse(self.guard.is_duplicate("evt-002")) + + def test_empty_event_id_never_duplicate(self) -> None: + self.guard.mark_seen("") + self.assertFalse(self.guard.is_duplicate("")) + + def test_expired_entry_not_duplicate(self) -> None: + guard = IdempotencyGuard(ttl_seconds=0.05) + guard.mark_seen("evt-exp") + time.sleep(0.1) + self.assertFalse(guard.is_duplicate("evt-exp")) + + def test_max_size_evicts_oldest(self) -> None: + for i in range(5): + self.guard.mark_seen(f"evt-{i:03d}") + # Adding a 6th entry should evict the oldest + self.guard.mark_seen("evt-005") + # Guard should still work correctly for recent entries + self.assertTrue(self.guard.is_duplicate("evt-005")) + + def test_mark_seen_then_is_duplicate(self) -> None: + self.guard.mark_seen("make-event-xyz") + self.assertTrue(self.guard.is_duplicate("make-event-xyz")) + self.assertFalse(self.guard.is_duplicate("make-event-xyz-other")) + + if __name__ == "__main__": unittest.main()