Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion api/index.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import os
import sys
from pathlib import Path

Expand All @@ -15,10 +16,14 @@
orchestrate_mirror_shadow_dwell,
)
from mirror_digital_make import forward_mirror_event
from peacock_core import IdempotencyGuard, verify_webhook_sha1_signature
from stripe_inauguration import create_inauguration_checkout_session

app = Flask(__name__)

# Guardia de idempotencia compartida para prevenir procesamiento duplicado de eventos.
_idempotency_guard = IdempotencyGuard()


@app.route("/")
def home():
Expand Down Expand Up @@ -78,7 +83,29 @@ def mirror_digital_event_options():
@app.route("/api/mirror_digital_event", methods=["POST"])
@app.route("/mirror_digital_event", methods=["POST"])
def mirror_digital_event():
body = request.get_json(force=True, silent=True) or {}
raw_body = request.get_data(cache=True)

# Verificación de firma HMAC-SHA1 cuando el secreto está configurado.
webhook_secret = os.environ.get("MAKE_WEBHOOK_SECRET", "").strip()
if webhook_secret:
sig = request.headers.get("X-Hub-Signature", "")
if not verify_webhook_sha1_signature(raw_body, sig, webhook_secret):
return _cors(jsonify({"status": "error", "message": "invalid_signature"})), 401

try:
body = json.loads(raw_body) if raw_body else {}
if not isinstance(body, dict):
body = {}
except (json.JSONDecodeError, ValueError):
body = {}

# Guardia de idempotencia: rechaza eventos ya procesados recientemente.
event_id = str(body.get("event_id") or "").strip()
if event_id:
if _idempotency_guard.is_duplicate(event_id):
return _cors(jsonify({"status": "ok", "duplicate": True, "skipped": True})), 200
_idempotency_guard.mark_seen(event_id)

payload, code = forward_mirror_event(body)
return _cors(jsonify(payload)), code

Expand Down
84 changes: 84 additions & 0 deletions api/peacock_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,16 @@
Reglas:
- Webhooks HTTP prohibidos hacia abvetos.com (activación de licencia interna / manual).
- Presupuesto de latencia crítica Zero-Size (API / handshake): ver ZERO_SIZE_LATENCY_BUDGET_MS.
- Verificación de firma HMAC-SHA1 para webhooks entrantes (Make.com).
- Guardia de idempotencia para prevenir procesamiento concurrente duplicado.
"""

from __future__ import annotations

import hashlib
import hmac
import threading
import time
from urllib.parse import urlparse

ZERO_SIZE_LATENCY_BUDGET_MS = 25
Expand All @@ -33,3 +39,81 @@ def is_webhook_destination_forbidden(url: str) -> bool:
if frag in host:
return True
return False


def verify_webhook_sha1_signature(
payload: bytes,
signature_header: str,
secret: str,
) -> bool:
"""Verifica una firma HMAC-SHA1 de un webhook entrante.

Args:
payload: Cuerpo crudo de la petición (bytes).
signature_header: Valor de la cabecera ``X-Hub-Signature`` en formato
``sha1=<hex_digest>``.
secret: Secreto compartido configurado en el emisor del webhook.

Returns:
``True`` si la firma es válida, ``False`` en cualquier otro caso.
"""
if not payload or not signature_header or not secret:
return False
if not signature_header.startswith("sha1="):
return False
received_digest = signature_header[len("sha1="):]
expected_digest = hmac.new(
secret.encode("utf-8"),
payload,
hashlib.sha1,
).hexdigest()
return hmac.compare_digest(received_digest.lower(), expected_digest.lower())


class IdempotencyGuard:
"""Previene el procesamiento concurrente duplicado de eventos webhook.

Mantiene un registro en memoria de los identificadores de evento procesados
recientemente. Las entradas expiran después de ``ttl_seconds`` para evitar
un crecimiento ilimitado de la memoria.

Args:
ttl_seconds: Tiempo de vida (segundos) de cada entrada. Por defecto 300 s.
max_size: Número máximo de entradas. Si se supera se eliminan las más
antiguas. Por defecto 10 000.
"""

def __init__(self, ttl_seconds: float = 300.0, max_size: int = 10_000) -> None:
self._ttl = ttl_seconds
self._max_size = max_size
self._seen: dict[str, float] = {}
self._lock = threading.Lock()

def is_duplicate(self, event_id: str) -> bool:
"""Devuelve ``True`` si *event_id* ya fue procesado y su TTL sigue activo."""
if not event_id:
return False
now = time.monotonic()
with self._lock:
self._evict(now)
return event_id in self._seen

def mark_seen(self, event_id: str) -> None:
"""Registra *event_id* como procesado."""
if not event_id:
return
now = time.monotonic()
with self._lock:
self._evict(now)
if len(self._seen) >= self._max_size:
# Elimina la entrada más antigua
oldest = min(self._seen, key=lambda k: self._seen[k])
del self._seen[oldest]
self._seen[event_id] = now

def _evict(self, now: float) -> None:
"""Elimina entradas expiradas (debe llamarse con el lock adquirido)."""
cutoff = now - self._ttl
expired = [k for k, ts in self._seen.items() if ts < cutoff]
for k in expired:
del self._seen[k]
101 changes: 100 additions & 1 deletion tests/test_peacock_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,23 @@

from __future__ import annotations

import hashlib
import hmac
import os
import sys
import time
import unittest

_API = os.path.normpath(os.path.join(os.path.dirname(__file__), "..", "api"))
if _API not in sys.path:
sys.path.insert(0, _API)

from peacock_core import ZERO_SIZE_LATENCY_BUDGET_MS, is_webhook_destination_forbidden
from peacock_core import (
ZERO_SIZE_LATENCY_BUDGET_MS,
IdempotencyGuard,
is_webhook_destination_forbidden,
verify_webhook_sha1_signature,
)


class TestPeacockCoreIntegration(unittest.TestCase):
Expand All @@ -34,5 +42,96 @@ def test_make_and_slack_like_urls_allowed(self) -> None:
)


# ---------------------------------------------------------------------------
# verify_webhook_sha1_signature
# ---------------------------------------------------------------------------

def _make_sig(payload: bytes, secret: str) -> str:
"""Helper: genera una firma HMAC-SHA1 válida."""
digest = hmac.new(secret.encode("utf-8"), payload, hashlib.sha1).hexdigest()
return f"sha1={digest}"


class TestVerifyWebhookSha1Signature(unittest.TestCase):
_SECRET = "test-secret-key"
_PAYLOAD = b'{"event": "balmain_click"}'

def test_valid_signature_accepted(self) -> None:
sig = _make_sig(self._PAYLOAD, self._SECRET)
self.assertTrue(verify_webhook_sha1_signature(self._PAYLOAD, sig, self._SECRET))

def test_wrong_secret_rejected(self) -> None:
sig = _make_sig(self._PAYLOAD, "other-secret")
self.assertFalse(verify_webhook_sha1_signature(self._PAYLOAD, sig, self._SECRET))

def test_tampered_payload_rejected(self) -> None:
sig = _make_sig(self._PAYLOAD, self._SECRET)
tampered = b'{"event": "balmain_click", "injected": true}'
self.assertFalse(verify_webhook_sha1_signature(tampered, sig, self._SECRET))

def test_missing_prefix_rejected(self) -> None:
digest = hmac.new(self._SECRET.encode(), self._PAYLOAD, hashlib.sha1).hexdigest()
self.assertFalse(verify_webhook_sha1_signature(self._PAYLOAD, digest, self._SECRET))

def test_empty_payload_rejected(self) -> None:
sig = _make_sig(self._PAYLOAD, self._SECRET)
self.assertFalse(verify_webhook_sha1_signature(b"", sig, self._SECRET))

def test_empty_signature_rejected(self) -> None:
self.assertFalse(verify_webhook_sha1_signature(self._PAYLOAD, "", self._SECRET))

def test_empty_secret_rejected(self) -> None:
sig = _make_sig(self._PAYLOAD, self._SECRET)
self.assertFalse(verify_webhook_sha1_signature(self._PAYLOAD, sig, ""))

def test_case_insensitive_digest(self) -> None:
digest = hmac.new(self._SECRET.encode(), self._PAYLOAD, hashlib.sha1).hexdigest().upper()
sig = f"sha1={digest}"
self.assertTrue(verify_webhook_sha1_signature(self._PAYLOAD, sig, self._SECRET))


# ---------------------------------------------------------------------------
# IdempotencyGuard
# ---------------------------------------------------------------------------

class TestIdempotencyGuard(unittest.TestCase):
def setUp(self) -> None:
self.guard = IdempotencyGuard(ttl_seconds=10.0, max_size=5)

def test_new_event_is_not_duplicate(self) -> None:
self.assertFalse(self.guard.is_duplicate("evt-001"))

def test_seen_event_is_duplicate(self) -> None:
self.guard.mark_seen("evt-001")
self.assertTrue(self.guard.is_duplicate("evt-001"))

def test_different_event_ids_independent(self) -> None:
self.guard.mark_seen("evt-001")
self.assertFalse(self.guard.is_duplicate("evt-002"))

def test_empty_event_id_never_duplicate(self) -> None:
self.guard.mark_seen("")
self.assertFalse(self.guard.is_duplicate(""))

def test_expired_entry_not_duplicate(self) -> None:
guard = IdempotencyGuard(ttl_seconds=0.05)
guard.mark_seen("evt-exp")
time.sleep(0.1)
self.assertFalse(guard.is_duplicate("evt-exp"))

def test_max_size_evicts_oldest(self) -> None:
for i in range(5):
self.guard.mark_seen(f"evt-{i:03d}")
# Adding a 6th entry should evict the oldest
self.guard.mark_seen("evt-005")
# Guard should still work correctly for recent entries
self.assertTrue(self.guard.is_duplicate("evt-005"))

def test_mark_seen_then_is_duplicate(self) -> None:
self.guard.mark_seen("make-event-xyz")
self.assertTrue(self.guard.is_duplicate("make-event-xyz"))
self.assertFalse(self.guard.is_duplicate("make-event-xyz-other"))


if __name__ == "__main__":
unittest.main()