Skip to content

Logging, Monitoring & Observability #11

@aniebietafia

Description

@aniebietafia

Feature: Implement Structured Logging, Monitoring & Observability

Problem
The FluentMeet backend currently has no centralized logging strategy, no performance metrics, and no alerting. Log output is unstructured, making it unsearchable in aggregation tools. There is no way to trace a single request or audio chunk across multiple services (WebSocket handler → Kafka → STT worker → Translation worker → TTS worker → egress), making latency bottlenecks and silent failures nearly impossible to diagnose in production. Without health checks and alerting, service degradations go unnoticed until users report them.

Proposed Solution
Implement a three-pillar observability stack tailored to FastAPI and the Kafka AI pipeline:

  1. Structured Logging — JSON-formatted logs with correlation IDs propagated across all services, collected and shipped to a log aggregation platform (e.g., Loki, Datadog, or CloudWatch).
  2. Metrics — Prometheus metrics exposed via a /metrics endpoint, scraped by a Prometheus server, and visualized in Grafana dashboards.
  3. Alerting — Alertmanager (or cloud-native equivalent) rules for critical failure conditions and latency SLO breaches.

User Stories

  • As a DevOps engineer, I want all logs to be structured JSON, so I can query and filter them with a log aggregation tool without writing regex parsers.
  • As a developer, I want a correlation_id (trace ID) automatically attached to every log line and propagated through the Kafka pipeline, so I can trace a single audio chunk end-to-end without guessing.
  • As a platform operator, I want Prometheus metrics for request rate, error rate, and translation latency, so I can set SLO-based alert thresholds and detect regressions before users do.
  • As an on-call engineer, I want to receive an alert when Kafka consumer lag exceeds a threshold or service error rate spikes, so I am notified of outages proactively rather than reactively.

Acceptance Criteria

  1. Structured Logging:

    • All log output is JSON-formatted using python-json-logger or structlog.
    • Every log record includes: timestamp, level, service, logger, message, correlation_id, and request_id (for HTTP requests).
    • A CorrelationIDMiddleware automatically generates and attaches a X-Correlation-ID header to every HTTP request and response.
    • The correlation_id is injected into the Kafka message envelope so it flows through all pipeline workers.
    • Log level is configurable via LOG_LEVEL environment variable (default: INFO).
  2. Prometheus Metrics — the following metrics are tracked and exposed at GET /metrics:

    Metric Type Labels Description
    http_requests_total Counter method, endpoint, status_code Total HTTP requests
    http_request_duration_seconds Histogram method, endpoint Request latency
    pipeline_stage_duration_seconds Histogram stage (stt, translate, tts) Per-stage AI latency
    pipeline_end_to_end_duration_seconds Histogram Full audio chunk latency
    kafka_consumer_lag Gauge topic, consumer_group Kafka consumer offset lag
    active_rooms_total Gauge Number of currently active meeting rooms
    active_ws_connections_total Gauge type (audio, captions, signaling) Active WebSocket connections
  3. Health Check Endpoints:

    • GET /health — basic liveness check (already implemented); returns {"status": "ok"}.
    • GET /health/ready — readiness check; verifies connectivity to PostgreSQL, Redis, and Kafka. Returns 503 if any dependency is unreachable.
  4. Alerting Rules — the following Alertmanager rules are configured:

    Alert Condition Severity
    HighErrorRate HTTP 5xx rate > 5% over 5 min Critical
    HighTranslationLatency P95 end-to-end pipeline > 2s over 5 min Warning
    KafkaConsumerLagHigh Consumer lag > 1000 messages Warning
    ServiceDown Health check returns non-200 Critical
  5. All Kafka pipeline workers log stage entry/exit with correlation_id at DEBUG level and errors at ERROR with full stack traces.

  6. The GET /metrics endpoint is protected — accessible only from internal/monitoring network ranges (not publicly exposed).

Proposed Technical Details

  • Logging Library: structlog configured with JSONRenderer for production; ConsoleRenderer for local development (detected via ENVIRONMENT setting).
  • Metrics Library: prometheus-fastapi-instrumentator for automatic HTTP metrics + manual prometheus_client gauges/histograms for pipeline and Kafka metrics.
  • Correlation ID: asgi-correlation-id middleware generates a UUID v4 per request, stored in a ContextVar and accessible throughout the async call stack.
  • Kafka Lag Metric: A background task polls the Kafka broker for consumer group lag every 30 seconds and updates the kafka_consumer_lag Gauge.
  • New/Modified Files:
    • app/core/logging.py — structlog configuration [NEW]
    • app/core/middleware.py — add CorrelationIDMiddleware [MODIFY]
    • app/core/metrics.py — Prometheus metric definitions and helpers [NEW]
    • app/api/v1/endpoints/health.py — extend with /health/ready [NEW]
    • app/main.py — register logging, middleware, metrics, and health router [MODIFY]
    • infra/prometheus.yml — Prometheus scrape config [NEW]
    • infra/alerts.yml — Alertmanager alert rules [NEW]
    • infra/docker-compose.yml — add Prometheus and Grafana services [MODIFY]

Tasks

  • Configure structlog in app/core/logging.py with JSON output for production and pretty output for development.
  • Implement CorrelationIDMiddleware in app/core/middleware.py to generate and propagate X-Correlation-ID.
  • Inject correlation_id into Kafka message envelopes in app/schemas/pipeline.py.
  • Add correlation_id logging in all Kafka worker services (stt_worker, translation_worker, tts_worker).
  • Implement Prometheus metric definitions in app/core/metrics.py.
  • Integrate prometheus-fastapi-instrumentator for automatic HTTP metrics.
  • Instrument per-stage and end-to-end pipeline latency in each worker.
  • Implement the Kafka consumer lag background polling task.
  • Implement GET /health/ready in app/api/v1/endpoints/health.py.
  • Add Prometheus and Grafana to infra/docker-compose.yml.
  • Write Prometheus alert rules in infra/alerts.yml.
  • Write unit tests for CorrelationIDMiddleware (assert header is present in response).
  • Write integration tests for GET /health/ready (mock unhealthy dependencies).

Open Questions/Considerations

  • Which log aggregation platform will be used in production — Loki (self-hosted), Datadog, or AWS CloudWatch? This affects the log shipping configuration.
  • Should the GET /metrics endpoint be protected by a shared secret header (e.g., Bearer token) or restricted by network policy only?
  • Should we implement distributed tracing (OpenTelemetry + Jaeger/Tempo) in addition to correlation IDs, for a full trace waterfall view across services?
  • What is the agreed SLO for end-to-end translation latency (e.g., P95 < 500ms)? This determines the alert threshold.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions