From fc3318baa939e6eba7fcf94aa9ac840cfb23aaae Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 2 Dec 2025 18:56:44 +0000 Subject: [PATCH 01/16] Initial plan From a45196a47896a1fb2f3cfd2a5114d47d55b9942e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 2 Dec 2025 19:12:05 +0000 Subject: [PATCH 02/16] feat: Add dedicated /metrics endpoints for Gateway and LLM services (issue #39) - Add /metrics endpoint to gateway service with request counts, latencies, error breakdowns, WebSocket connection stats, and LLM integration stats - Add /metrics endpoint to LLM service with request counts, latencies, error breakdowns, provider-level stats, and token usage tracking - Update K8s deployments to change prometheus.io/path from /healthz to /metrics for gateway and LLM - Update ServiceMonitors to scrape /metrics path for gateway and LLM services - Update Deploy_GEngine_To_Kubernetes.md documentation with health vs. metrics distinction and example metrics for all three services - Add comprehensive tests for new metrics functionality (9 gateway tests, 11 LLM tests) Co-authored-by: SorraTheOrc <250240+SorraTheOrc@users.noreply.github.com> --- docs/gengine/Deploy_GEngine_To_Kubernetes.md | 128 +++++++++++--- gamedev-agent-thoughts.txt | 85 ++++++++++ k8s/base/gateway-deployment.yaml | 2 +- k8s/base/llm-deployment.yaml | 2 +- k8s/base/servicemonitor.yaml | 4 +- src/gengine/echoes/gateway/app.py | 150 ++++++++++++++++- src/gengine/echoes/llm/app.py | 138 ++++++++++++++++ tests/echoes/test_gateway_service.py | 165 ++++++++++++++++++- tests/echoes/test_llm_app.py | 151 ++++++++++++++++- 9 files changed, 798 insertions(+), 27 deletions(-) diff --git a/docs/gengine/Deploy_GEngine_To_Kubernetes.md b/docs/gengine/Deploy_GEngine_To_Kubernetes.md index 6058bb49..489aeff5 100644 --- a/docs/gengine/Deploy_GEngine_To_Kubernetes.md +++ b/docs/gengine/Deploy_GEngine_To_Kubernetes.md @@ -592,17 +592,115 @@ If using LLM service extensively, increase memory for context buffering: ## Monitoring and Observability GEngine services are instrumented with Prometheus-compatible metrics endpoints -for monitoring and alerting. +for monitoring and alerting. Health checks (`/healthz`) are separate from +metrics collection (`/metrics`) to allow independent control of readiness +probes and observability scraping. + +### Health Check Endpoints + +Health checks are used for Kubernetes liveness and readiness probes: + +| Service | Port | Health Endpoint | Description | +| ---------- | ---- | --------------- | ------------------------------------- | +| Simulation | 8000 | `/healthz` | Returns `{"status": "ok"}` | +| Gateway | 8100 | `/healthz` | Returns status and upstream URLs | +| LLM | 8001 | `/healthz` | Returns status, provider, and model | ### Metrics Endpoints -Each service exposes metrics that can be scraped by Prometheus: +Each service exposes dedicated metrics for Prometheus scraping: | Service | Port | Metrics Endpoint | Description | | ---------- | ---- | ---------------- | ---------------------------------- | | Simulation | 8000 | `/metrics` | Tick count, environment, profiling | -| Gateway | 8100 | `/healthz` | Service health and connection info | -| LLM | 8001 | `/healthz` | Service health status | +| Gateway | 8100 | `/metrics` | Request counts, latencies, connections, LLM integration | +| LLM | 8001 | `/metrics` | Request counts, latencies, errors, provider stats, token usage | + +### Example Metrics Responses + +**Simulation Service** (`/metrics`): +```json +{ + "tick": 42, + "environment": { + "temperature": 0.5, + "instability": 0.2, + "tension": 0.3 + }, + "profiling": { + "tick_ms_p50": 12.5, + "tick_ms_p95": 25.0, + "tick_ms_max": 45.0 + } +} +``` + +**Gateway Service** (`/metrics`): +```json +{ + "service": "gateway", + "service_url": "http://simulation:8000", + "llm_service_url": "http://llm:8001", + "requests": { + "total": 150, + "by_type": {"command": 120, "natural_language": 30}, + "websocket_messages": 150, + "natural_language": 30, + "commands": 120 + }, + "errors": { + "total": 2, + "by_type": {"execution_error": 2} + }, + "latency_ms": { + "avg": 45.5, + "min": 10.2, + "max": 250.0, + "p50": 35.0, + "p95": 120.0 + }, + "connections": { + "active": 3, + "total": 25, + "disconnections": 22 + }, + "llm_integration": { + "requests": 30, + "errors": 0, + "latency_ms": {"avg": 150.0, "min": 80.0, "max": 500.0, "p50": 120.0, "p95": 350.0} + } +} +``` + +**LLM Service** (`/metrics`): +```json +{ + "service": "llm", + "requests": { + "total": 100, + "parse_intent": 80, + "narrate": 20 + }, + "errors": { + "total": 1, + "parse_intent": 1, + "narrate": 0, + "by_type": {"parse_intent:ValueError": 1} + }, + "latency_ms": { + "parse_intent": {"avg": 120.0, "min": 50.0, "max": 400.0, "p50": 100.0, "p95": 300.0}, + "narrate": {"avg": 200.0, "min": 100.0, "max": 600.0, "p50": 180.0, "p95": 450.0} + }, + "provider": { + "name": "openai", + "model": "gpt-4-turbo-preview" + }, + "token_usage": { + "total_input": 50000, + "total_output": 15000 + } +} +``` ### Prometheus Annotations @@ -612,7 +710,7 @@ All deployments are annotated for automatic Prometheus discovery: annotations: prometheus.io/scrape: "true" prometheus.io/port: "" - prometheus.io/path: "/metrics" # or "/healthz" + prometheus.io/path: "/metrics" ``` ### Verifying Prometheus Scraping @@ -624,25 +722,17 @@ To confirm Prometheus is scraping your services: if [[ "${GENGINE_DEPLOY_ENV}" == "local" ]]; then MINIKUBE_IP=$(minikube ip) curl -s "http://${MINIKUBE_IP}:30000/metrics" | jq . + curl -s "http://${MINIKUBE_IP}:30100/metrics" | jq . + curl -s "http://${MINIKUBE_IP}:30001/metrics" | jq . fi # Using kubectl proxy or port-forward kubectl port-forward -n "${GENGINE_NAMESPACE}" svc/simulation 8000:8000 & +kubectl port-forward -n "${GENGINE_NAMESPACE}" svc/gateway 8100:8100 & +kubectl port-forward -n "${GENGINE_NAMESPACE}" svc/llm 8001:8001 & curl -s http://localhost:8000/metrics | jq . -``` - -Expected output: - -```json -{ - "tick": 0, - "environment": { - "temperature": 0.0, - "instability": 0.0, - "tension": 0.0 - }, - "profiling": {} -} +curl -s http://localhost:8100/metrics | jq . +curl -s http://localhost:8001/metrics | jq . ``` ### Prometheus Operator Integration diff --git a/gamedev-agent-thoughts.txt b/gamedev-agent-thoughts.txt index cc629d51..ede0feff 100644 --- a/gamedev-agent-thoughts.txt +++ b/gamedev-agent-thoughts.txt @@ -525,3 +525,88 @@ All acceptance criteria for Issue #24 are met. The implementation is production- ### Task 9.3.1 Status: COMPLETED All acceptance criteria for Issue #34 are met. + + +## Issue #39 - Dedicated Metrics Endpoints for Gateway and LLM Services - 2025-12-02T19:00 UTC +========================================================= + +### Initial Assessment +- Working branch: copilot/add-dedicated-metrics-endpoints +- Current commit: fc3318b +- Repository synced and clean + +### Acceptance Criteria from Issue: +1. Gateway exposes `/metrics` endpoint with request counts, latencies, error breakdowns, provider-level stats +2. LLM service exposes `/metrics` endpoint with request counts, latencies, error breakdowns, provider-level stats +3. Prometheus annotations point to `/metrics` instead of `/healthz` +4. ServiceMonitor targets updated to metrics paths/ports +5. Documentation distinguishes health vs. metrics endpoints + +### Implementation Plan: +1. [ ] Add `/metrics` endpoint to gateway/app.py with tracking of: + - Request counts (total, by endpoint, by result type) + - Request latencies + - Error counts + - Active WebSocket connections + - LLM service integration stats +2. [ ] Add `/metrics` endpoint to llm/app.py with tracking of: + - Request counts (total, by endpoint) + - Request latencies + - Error counts + - Provider-level stats + - Token usage +3. [ ] Update k8s/base/gateway-deployment.yaml: prometheus.io/path → /metrics +4. [ ] Update k8s/base/llm-deployment.yaml: prometheus.io/path → /metrics +5. [ ] Update k8s/base/servicemonitor.yaml: gateway and LLM paths → /metrics +6. [ ] Update docs/gengine/Deploy_GEngine_To_Kubernetes.md +7. [ ] Run tests to verify no regressions + + +### Implementation Complete + +**Files Modified:** +1. `src/gengine/echoes/gateway/app.py` - Added GatewayMetrics class and /metrics endpoint + - GatewayMetrics tracks: request counts, latencies, errors, connections, LLM integration + - /metrics endpoint returns JSON with all metrics for Prometheus + - Metrics tracked during WebSocket handler execution + +2. `src/gengine/echoes/llm/app.py` - Added LLMMetrics class and /metrics endpoint + - LLMMetrics tracks: request counts, latencies, errors, provider stats, token usage + - /metrics endpoint returns JSON with all metrics for Prometheus + - Metrics tracked for both parse_intent and narrate endpoints + +3. `k8s/base/gateway-deployment.yaml` - Updated prometheus.io/path from /healthz to /metrics +4. `k8s/base/llm-deployment.yaml` - Updated prometheus.io/path from /healthz to /metrics +5. `k8s/base/servicemonitor.yaml` - Updated gateway and LLM paths from /healthz to /metrics + +6. `docs/gengine/Deploy_GEngine_To_Kubernetes.md` - Comprehensive documentation update + - Added Health Check Endpoints section explaining /healthz purpose + - Added Metrics Endpoints section explaining /metrics purpose + - Added Example Metrics Responses showing JSON structure for all 3 services + - Updated Prometheus Annotations section + - Updated verification commands + +7. `tests/echoes/test_gateway_service.py` - Added 9 new tests: + - test_gateway_metrics_endpoint + - test_gateway_metrics_track_websocket_connections + - test_gateway_metrics_track_commands + - TestGatewayMetrics class with 7 tests + +8. `tests/echoes/test_llm_app.py` - Added 11 new tests: + - test_metrics_endpoint + - test_metrics_track_parse_intent + - test_metrics_track_narrate + - TestLLMMetrics class with 8 tests + +**Test Results:** +- Gateway/LLM tests: 39 passed (19 original + 20 new) +- Coverage: gateway/app.py 89%, llm/app.py 91% + +**Acceptance Criteria Status:** +1. ✅ Gateway exposes /metrics endpoint with request counts, latencies, error breakdowns, connections, LLM integration stats +2. ✅ LLM service exposes /metrics endpoint with request counts, latencies, error breakdowns, provider stats, token usage +3. ✅ Prometheus annotations point to /metrics (updated gateway-deployment.yaml, llm-deployment.yaml) +4. ✅ ServiceMonitor targets updated to /metrics paths (updated servicemonitor.yaml) +5. ✅ Documentation distinguishes health vs. metrics endpoints with example responses + +### Task Complete: Issue #39 - Dedicated Metrics Endpoints for Gateway and LLM Services diff --git a/k8s/base/gateway-deployment.yaml b/k8s/base/gateway-deployment.yaml index cc648af6..6fef5b21 100644 --- a/k8s/base/gateway-deployment.yaml +++ b/k8s/base/gateway-deployment.yaml @@ -24,7 +24,7 @@ spec: annotations: prometheus.io/scrape: "true" prometheus.io/port: "8100" - prometheus.io/path: "/healthz" + prometheus.io/path: "/metrics" spec: containers: - name: gateway diff --git a/k8s/base/llm-deployment.yaml b/k8s/base/llm-deployment.yaml index 3fed9ce5..a567eb3a 100644 --- a/k8s/base/llm-deployment.yaml +++ b/k8s/base/llm-deployment.yaml @@ -24,7 +24,7 @@ spec: annotations: prometheus.io/scrape: "true" prometheus.io/port: "8001" - prometheus.io/path: "/healthz" + prometheus.io/path: "/metrics" spec: containers: - name: llm diff --git a/k8s/base/servicemonitor.yaml b/k8s/base/servicemonitor.yaml index 2d69d019..67f77507 100644 --- a/k8s/base/servicemonitor.yaml +++ b/k8s/base/servicemonitor.yaml @@ -39,7 +39,7 @@ spec: app.kubernetes.io/name: gateway endpoints: - port: http - path: /healthz + path: /metrics interval: 30s scrapeTimeout: 10s --- @@ -57,6 +57,6 @@ spec: app.kubernetes.io/name: llm endpoints: - port: http - path: /healthz + path: /metrics interval: 30s scrapeTimeout: 10s diff --git a/src/gengine/echoes/gateway/app.py b/src/gengine/echoes/gateway/app.py index 209188f7..6a69c45f 100644 --- a/src/gengine/echoes/gateway/app.py +++ b/src/gengine/echoes/gateway/app.py @@ -7,8 +7,9 @@ import json import logging import os -from dataclasses import dataclass -from typing import Callable +import time +from dataclasses import dataclass, field +from typing import Any, Callable from fastapi import FastAPI, WebSocket, WebSocketDisconnect @@ -22,6 +23,110 @@ BackendFactory = Callable[[], ShellBackend] +@dataclass +class GatewayMetrics: + """Metrics tracking for the gateway service.""" + + # Request counts + total_requests: int = 0 + requests_by_type: dict[str, int] = field(default_factory=dict) + websocket_messages: int = 0 + natural_language_requests: int = 0 + command_requests: int = 0 + + # Error tracking + total_errors: int = 0 + errors_by_type: dict[str, int] = field(default_factory=dict) + + # Latency tracking (in ms) + latencies: list[float] = field(default_factory=list) + max_latency_samples: int = 1000 + + # Connection tracking + active_connections: int = 0 + total_connections: int = 0 + total_disconnections: int = 0 + + # LLM integration stats + llm_requests: int = 0 + llm_errors: int = 0 + llm_latencies: list[float] = field(default_factory=list) + + def record_request(self, request_type: str, latency_ms: float) -> None: + """Record a request with its type and latency.""" + self.total_requests += 1 + self.requests_by_type[request_type] = self.requests_by_type.get(request_type, 0) + 1 + self._add_latency(latency_ms) + + def record_error(self, error_type: str) -> None: + """Record an error by type.""" + self.total_errors += 1 + self.errors_by_type[error_type] = self.errors_by_type.get(error_type, 0) + 1 + + def record_llm_request(self, latency_ms: float) -> None: + """Record an LLM service request.""" + self.llm_requests += 1 + if len(self.llm_latencies) >= self.max_latency_samples: + self.llm_latencies.pop(0) + self.llm_latencies.append(latency_ms) + + def record_llm_error(self) -> None: + """Record an LLM service error.""" + self.llm_errors += 1 + + def _add_latency(self, latency_ms: float) -> None: + """Add latency sample, maintaining max samples.""" + if len(self.latencies) >= self.max_latency_samples: + self.latencies.pop(0) + self.latencies.append(latency_ms) + + def to_dict(self) -> dict[str, Any]: + """Convert metrics to dictionary for JSON serialization.""" + latency_stats = self._calculate_latency_stats(self.latencies) + llm_latency_stats = self._calculate_latency_stats(self.llm_latencies) + + return { + "requests": { + "total": self.total_requests, + "by_type": dict(self.requests_by_type), + "websocket_messages": self.websocket_messages, + "natural_language": self.natural_language_requests, + "commands": self.command_requests, + }, + "errors": { + "total": self.total_errors, + "by_type": dict(self.errors_by_type), + }, + "latency_ms": latency_stats, + "connections": { + "active": self.active_connections, + "total": self.total_connections, + "disconnections": self.total_disconnections, + }, + "llm_integration": { + "requests": self.llm_requests, + "errors": self.llm_errors, + "latency_ms": llm_latency_stats, + }, + } + + def _calculate_latency_stats(self, latencies: list[float]) -> dict[str, float]: + """Calculate latency statistics from samples.""" + if not latencies: + return {"avg": 0.0, "min": 0.0, "max": 0.0, "p50": 0.0, "p95": 0.0} + + sorted_latencies = sorted(latencies) + n = len(sorted_latencies) + + return { + "avg": round(sum(latencies) / n, 2), + "min": round(min(latencies), 2), + "max": round(max(latencies), 2), + "p50": round(sorted_latencies[n // 2], 2), + "p95": round(sorted_latencies[int(n * 0.95)] if n >= 20 else sorted_latencies[-1], 2), + } + + @dataclass class GatewaySettings: """Configuration for the gateway service.""" @@ -56,12 +161,15 @@ def create_gateway_app( backend_factory = _service_backend_factory(active_settings.service_url) app = FastAPI(title="Echoes Gateway Service", version="0.1.0") + metrics = GatewayMetrics() manager = _GatewayManager( backend_factory, active_config, llm_service_url=active_settings.llm_service_url, + metrics=metrics, ) app.state.gateway_settings = active_settings + app.state.gateway_metrics = metrics @app.get("/healthz") def healthcheck() -> dict[str, str]: # pragma: no cover - trivial @@ -73,15 +181,30 @@ def healthcheck() -> dict[str, str]: # pragma: no cover - trivial health["llm_service_url"] = active_settings.llm_service_url return health + @app.get("/metrics") + def get_metrics() -> dict[str, Any]: + """Return gateway metrics for Prometheus scraping.""" + return { + "service": "gateway", + "service_url": active_settings.service_url, + "llm_service_url": active_settings.llm_service_url, + **metrics.to_dict(), + } + @app.websocket("/ws") async def websocket_handler(websocket: WebSocket) -> None: await websocket.accept() + metrics.active_connections += 1 + metrics.total_connections += 1 try: session = manager.open_session() except Exception as exc: # pragma: no cover - catastrophic setup failure LOGGER.exception("Gateway failed to open session: %s", exc) + metrics.record_error("session_open_failed") await websocket.send_json({"type": "error", "error": str(exc)}) await websocket.close() + metrics.active_connections -= 1 + metrics.total_disconnections += 1 return try: @@ -99,7 +222,9 @@ async def websocket_handler(websocket: WebSocket) -> None: message_data = await _receive_message(websocket) except WebSocketDisconnect: break + metrics.websocket_messages += 1 if message_data is None: + metrics.record_error("invalid_payload") await websocket.send_json( { "type": "error", @@ -113,6 +238,7 @@ async def websocket_handler(websocket: WebSocket) -> None: is_nl = message_data.get("natural_language", False) if command is None: + metrics.record_error("missing_command") await websocket.send_json( { "type": "error", @@ -121,13 +247,22 @@ async def websocket_handler(websocket: WebSocket) -> None: ) continue + start_time = time.perf_counter() try: if is_nl and session.llm_client: + metrics.natural_language_requests += 1 + llm_start = time.perf_counter() result = await asyncio.to_thread(session.execute_natural_language, command) + llm_latency = (time.perf_counter() - llm_start) * 1000 + metrics.record_llm_request(llm_latency) else: + metrics.command_requests += 1 result = await asyncio.to_thread(session.execute, command) except Exception as exc: # pragma: no cover - unexpected failure LOGGER.exception("Gateway session crashed: %s", exc) + metrics.record_error("execution_error") + if is_nl: + metrics.record_llm_error() await websocket.send_json( { "type": "error", @@ -135,6 +270,11 @@ async def websocket_handler(websocket: WebSocket) -> None: } ) continue + + latency_ms = (time.perf_counter() - start_time) * 1000 + request_type = "natural_language" if is_nl else "command" + metrics.record_request(request_type, latency_ms) + await websocket.send_json( { "type": "result", @@ -149,6 +289,8 @@ async def websocket_handler(websocket: WebSocket) -> None: except WebSocketDisconnect: LOGGER.info("Gateway session %s disconnected", session.session_id) finally: + metrics.active_connections -= 1 + metrics.total_disconnections += 1 await asyncio.to_thread(session.close) with contextlib.suppress(WebSocketDisconnect): await websocket.close() @@ -170,10 +312,12 @@ def __init__( backend_factory: BackendFactory, config: SimulationConfig, llm_service_url: str | None = None, + metrics: GatewayMetrics | None = None, ) -> None: self._backend_factory = backend_factory self._config = config self._llm_service_url = llm_service_url + self._metrics = metrics def open_session(self) -> GatewaySession: backend = self._backend_factory() @@ -183,6 +327,8 @@ def open_session(self) -> GatewaySession: # Check LLM service health if not llm_client.healthcheck(): LOGGER.warning("LLM service unhealthy at %s", self._llm_service_url) + if self._metrics: + self._metrics.record_llm_error() return GatewaySession(backend, limits=self._config.limits, llm_client=llm_client) diff --git a/src/gengine/echoes/llm/app.py b/src/gengine/echoes/llm/app.py index b7d6379b..3ef32c30 100644 --- a/src/gengine/echoes/llm/app.py +++ b/src/gengine/echoes/llm/app.py @@ -2,6 +2,8 @@ from __future__ import annotations +import time +from dataclasses import dataclass, field from typing import Any from fastapi import FastAPI, HTTPException @@ -11,6 +13,110 @@ from .settings import LLMSettings +@dataclass +class LLMMetrics: + """Metrics tracking for the LLM service.""" + + # Request counts + total_requests: int = 0 + parse_intent_requests: int = 0 + narrate_requests: int = 0 + + # Error tracking + total_errors: int = 0 + parse_intent_errors: int = 0 + narrate_errors: int = 0 + errors_by_type: dict[str, int] = field(default_factory=dict) + + # Latency tracking (in ms) + parse_intent_latencies: list[float] = field(default_factory=list) + narrate_latencies: list[float] = field(default_factory=list) + max_latency_samples: int = 1000 + + # Token usage (if available from provider) + total_input_tokens: int = 0 + total_output_tokens: int = 0 + + def record_parse_intent(self, latency_ms: float, input_tokens: int = 0, output_tokens: int = 0) -> None: + """Record a parse_intent request.""" + self.total_requests += 1 + self.parse_intent_requests += 1 + self._add_latency(self.parse_intent_latencies, latency_ms) + self.total_input_tokens += input_tokens + self.total_output_tokens += output_tokens + + def record_narrate(self, latency_ms: float, input_tokens: int = 0, output_tokens: int = 0) -> None: + """Record a narrate request.""" + self.total_requests += 1 + self.narrate_requests += 1 + self._add_latency(self.narrate_latencies, latency_ms) + self.total_input_tokens += input_tokens + self.total_output_tokens += output_tokens + + def record_error(self, endpoint: str, error_type: str) -> None: + """Record an error by endpoint and type.""" + self.total_errors += 1 + if endpoint == "parse_intent": + self.parse_intent_errors += 1 + elif endpoint == "narrate": + self.narrate_errors += 1 + key = f"{endpoint}:{error_type}" + self.errors_by_type[key] = self.errors_by_type.get(key, 0) + 1 + + def _add_latency(self, latencies: list[float], latency_ms: float) -> None: + """Add latency sample, maintaining max samples.""" + if len(latencies) >= self.max_latency_samples: + latencies.pop(0) + latencies.append(latency_ms) + + def to_dict(self, provider: str = "unknown", model: str | None = None) -> dict[str, Any]: + """Convert metrics to dictionary for JSON serialization.""" + parse_intent_stats = self._calculate_latency_stats(self.parse_intent_latencies) + narrate_stats = self._calculate_latency_stats(self.narrate_latencies) + + return { + "requests": { + "total": self.total_requests, + "parse_intent": self.parse_intent_requests, + "narrate": self.narrate_requests, + }, + "errors": { + "total": self.total_errors, + "parse_intent": self.parse_intent_errors, + "narrate": self.narrate_errors, + "by_type": dict(self.errors_by_type), + }, + "latency_ms": { + "parse_intent": parse_intent_stats, + "narrate": narrate_stats, + }, + "provider": { + "name": provider, + "model": model or "N/A", + }, + "token_usage": { + "total_input": self.total_input_tokens, + "total_output": self.total_output_tokens, + }, + } + + def _calculate_latency_stats(self, latencies: list[float]) -> dict[str, float]: + """Calculate latency statistics from samples.""" + if not latencies: + return {"avg": 0.0, "min": 0.0, "max": 0.0, "p50": 0.0, "p95": 0.0} + + sorted_latencies = sorted(latencies) + n = len(sorted_latencies) + + return { + "avg": round(sum(latencies) / n, 2), + "min": round(min(latencies), 2), + "max": round(max(latencies), 2), + "p50": round(sorted_latencies[n // 2], 2), + "p95": round(sorted_latencies[int(n * 0.95)] if n >= 20 else sorted_latencies[-1], 2), + } + + class ParseIntentRequest(BaseModel): """Request payload for /parse_intent endpoint.""" @@ -80,6 +186,10 @@ def create_llm_app( # Store provider in app state app.state.llm_provider = provider app.state.llm_settings = provider.settings + + # Initialize metrics + metrics = LLMMetrics() + app.state.llm_metrics = metrics @app.get("/healthz") async def health_check() -> dict[str, Any]: @@ -90,6 +200,17 @@ async def health_check() -> dict[str, Any]: "model": app.state.llm_settings.model or "N/A", } + @app.get("/metrics") + async def get_metrics() -> dict[str, Any]: + """Return LLM service metrics for Prometheus scraping.""" + return { + "service": "llm", + **metrics.to_dict( + provider=app.state.llm_settings.provider, + model=app.state.llm_settings.model, + ), + } + @app.post("/parse_intent", response_model=ParseIntentResponse) async def parse_intent(request: ParseIntentRequest) -> ParseIntentResponse: """Parse natural language input into structured intents. @@ -97,17 +218,24 @@ async def parse_intent(request: ParseIntentRequest) -> ParseIntentResponse: Takes user input and game context, returns structured intent objects that can be routed to the simulation service. """ + start_time = time.perf_counter() try: result = await app.state.llm_provider.parse_intent( request.user_input, request.context, ) + latency_ms = (time.perf_counter() - start_time) * 1000 + # Extract token usage from metadata if available + input_tokens = getattr(result, "input_tokens", 0) or 0 + output_tokens = getattr(result, "output_tokens", 0) or 0 + metrics.record_parse_intent(latency_ms, input_tokens, output_tokens) return ParseIntentResponse( intents=result.intents, raw_response=result.raw_response, confidence=result.confidence, ) except Exception as e: + metrics.record_error("parse_intent", type(e).__name__) raise HTTPException( status_code=500, detail=f"Intent parsing failed: {str(e)}", @@ -120,17 +248,27 @@ async def narrate(request: NarrateRequest) -> NarrateResponse: Takes game events and context, returns natural language narrative suitable for presenting to the player. """ + start_time = time.perf_counter() try: result = await app.state.llm_provider.narrate( request.events, request.context, ) + latency_ms = (time.perf_counter() - start_time) * 1000 + # Extract token usage from metadata if available + input_tokens = 0 + output_tokens = 0 + if result.metadata: + input_tokens = result.metadata.get("input_tokens", 0) or 0 + output_tokens = result.metadata.get("output_tokens", 0) or 0 + metrics.record_narrate(latency_ms, input_tokens, output_tokens) return NarrateResponse( narrative=result.narrative, raw_response=result.raw_response, metadata=result.metadata, ) except Exception as e: + metrics.record_error("narrate", type(e).__name__) raise HTTPException( status_code=500, detail=f"Narration failed: {str(e)}", diff --git a/tests/echoes/test_gateway_service.py b/tests/echoes/test_gateway_service.py index 262987ff..32f774f7 100644 --- a/tests/echoes/test_gateway_service.py +++ b/tests/echoes/test_gateway_service.py @@ -4,7 +4,7 @@ from fastapi.testclient import TestClient -from gengine.echoes.gateway.app import GatewaySettings, create_gateway_app +from gengine.echoes.gateway.app import GatewaySettings, GatewayMetrics, create_gateway_app from gengine.echoes.cli.shell import LocalBackend from gengine.echoes.sim import SimEngine @@ -23,6 +23,95 @@ def test_gateway_healthcheck(sim_config, gateway_settings) -> None: assert data["service_url"] == "local" +def test_gateway_metrics_endpoint(sim_config, gateway_settings) -> None: + """Verify that the /metrics endpoint returns expected structure.""" + app = create_gateway_app( + backend_factory=_local_backend_factory(sim_config), + config=sim_config, + settings=gateway_settings, + ) + client = TestClient(app) + response = client.get("/metrics") + assert response.status_code == 200 + data = response.json() + + # Check service identification + assert data["service"] == "gateway" + assert data["service_url"] == "local" + + # Check requests section + assert "requests" in data + assert data["requests"]["total"] == 0 + assert "by_type" in data["requests"] + + # Check errors section + assert "errors" in data + assert data["errors"]["total"] == 0 + + # Check latency section + assert "latency_ms" in data + assert "avg" in data["latency_ms"] + + # Check connections section + assert "connections" in data + assert data["connections"]["active"] == 0 + + # Check LLM integration section + assert "llm_integration" in data + + +def test_gateway_metrics_track_websocket_connections(sim_config, gateway_settings) -> None: + """Verify that WebSocket connections are tracked in metrics.""" + app = create_gateway_app( + backend_factory=_local_backend_factory(sim_config), + config=sim_config, + settings=gateway_settings, + ) + client = TestClient(app) + + # Initial metrics + response = client.get("/metrics") + initial = response.json() + assert initial["connections"]["total"] == 0 + + # Connect and disconnect + with client.websocket_connect("/ws") as websocket: + _ = websocket.receive_json() + websocket.send_json({"command": "exit"}) + _ = websocket.receive_json() + + # Check metrics after connection + response = client.get("/metrics") + data = response.json() + assert data["connections"]["total"] == 1 + assert data["connections"]["disconnections"] == 1 + + +def test_gateway_metrics_track_commands(sim_config, gateway_settings) -> None: + """Verify that commands are tracked in metrics.""" + app = create_gateway_app( + backend_factory=_local_backend_factory(sim_config), + config=sim_config, + settings=gateway_settings, + ) + client = TestClient(app) + + with client.websocket_connect("/ws") as websocket: + _ = websocket.receive_json() + websocket.send_json({"command": "summary"}) + _ = websocket.receive_json() + websocket.send_json({"command": "exit"}) + _ = websocket.receive_json() + + response = client.get("/metrics") + data = response.json() + + # Should have recorded the "summary" command (exit is not counted because it exits) + # Actually both are recorded + assert data["requests"]["total"] >= 1 + assert data["requests"]["commands"] >= 1 + + def test_gateway_websocket_summary_and_exit(sim_config, gateway_settings) -> None: app = create_gateway_app( backend_factory=_local_backend_factory(sim_config), @@ -221,3 +310,77 @@ def _factory() -> LocalBackend: return LocalBackend(engine) return _factory + + +class TestGatewayMetrics: + """Tests for GatewayMetrics class.""" + + def test_initial_state(self) -> None: + """Metrics start at zero.""" + metrics = GatewayMetrics() + assert metrics.total_requests == 0 + assert metrics.total_errors == 0 + assert metrics.active_connections == 0 + + def test_record_request(self) -> None: + """Recording a request increments counters and stores latency.""" + metrics = GatewayMetrics() + metrics.record_request("command", 50.0) + + assert metrics.total_requests == 1 + assert metrics.requests_by_type["command"] == 1 + assert len(metrics.latencies) == 1 + assert metrics.latencies[0] == 50.0 + + def test_record_error(self) -> None: + """Recording an error increments error counters.""" + metrics = GatewayMetrics() + metrics.record_error("execution_error") + + assert metrics.total_errors == 1 + assert metrics.errors_by_type["execution_error"] == 1 + + def test_record_llm_request(self) -> None: + """Recording an LLM request tracks separately.""" + metrics = GatewayMetrics() + metrics.record_llm_request(100.0) + + assert metrics.llm_requests == 1 + assert len(metrics.llm_latencies) == 1 + assert metrics.llm_latencies[0] == 100.0 + + def test_latency_stats_empty(self) -> None: + """Empty latencies return zeros.""" + metrics = GatewayMetrics() + data = metrics.to_dict() + + assert data["latency_ms"]["avg"] == 0.0 + assert data["latency_ms"]["min"] == 0.0 + assert data["latency_ms"]["max"] == 0.0 + + def test_latency_stats_calculated(self) -> None: + """Latency statistics are calculated correctly.""" + metrics = GatewayMetrics() + for i in range(10): + metrics.record_request("test", float(i * 10)) + + data = metrics.to_dict() + assert data["latency_ms"]["min"] == 0.0 + assert data["latency_ms"]["max"] == 90.0 + assert data["latency_ms"]["avg"] == 45.0 + + def test_to_dict_structure(self) -> None: + """to_dict returns expected structure.""" + metrics = GatewayMetrics() + metrics.record_request("command", 50.0) + metrics.record_error("test_error") + metrics.active_connections = 2 + + data = metrics.to_dict() + + assert "requests" in data + assert "errors" in data + assert "latency_ms" in data + assert "connections" in data + assert "llm_integration" in data + assert data["connections"]["active"] == 2 diff --git a/tests/echoes/test_llm_app.py b/tests/echoes/test_llm_app.py index 88234f03..5224b436 100644 --- a/tests/echoes/test_llm_app.py +++ b/tests/echoes/test_llm_app.py @@ -4,7 +4,7 @@ from fastapi.testclient import TestClient -from gengine.echoes.llm.app import create_llm_app +from gengine.echoes.llm.app import create_llm_app, LLMMetrics from gengine.echoes.llm.settings import LLMSettings @@ -22,6 +22,77 @@ def test_health_check(self) -> None: assert data["status"] == "ok" assert data["provider"] == "stub" + def test_metrics_endpoint(self) -> None: + """Verify that /metrics endpoint returns expected structure.""" + settings = LLMSettings(provider="stub") + app = create_llm_app(settings=settings) + client = TestClient(app) + + response = client.get("/metrics") + assert response.status_code == 200 + data = response.json() + + # Check service identification + assert data["service"] == "llm" + + # Check requests section + assert "requests" in data + assert data["requests"]["total"] == 0 + assert data["requests"]["parse_intent"] == 0 + assert data["requests"]["narrate"] == 0 + + # Check errors section + assert "errors" in data + assert data["errors"]["total"] == 0 + + # Check latency section + assert "latency_ms" in data + assert "parse_intent" in data["latency_ms"] + assert "narrate" in data["latency_ms"] + + # Check provider section + assert "provider" in data + assert data["provider"]["name"] == "stub" + + # Check token usage section + assert "token_usage" in data + + def test_metrics_track_parse_intent(self) -> None: + """Verify that parse_intent requests are tracked in metrics.""" + settings = LLMSettings(provider="stub") + app = create_llm_app(settings=settings) + client = TestClient(app) + + client.post( + "/parse_intent", + json={"user_input": "check status", "context": {}}, + ) + + response = client.get("/metrics") + data = response.json() + + assert data["requests"]["total"] == 1 + assert data["requests"]["parse_intent"] == 1 + assert data["latency_ms"]["parse_intent"]["avg"] > 0 + + def test_metrics_track_narrate(self) -> None: + """Verify that narrate requests are tracked in metrics.""" + settings = LLMSettings(provider="stub") + app = create_llm_app(settings=settings) + client = TestClient(app) + + client.post( + "/narrate", + json={"events": [{"type": "test"}], "context": {}}, + ) + + response = client.get("/metrics") + data = response.json() + + assert data["requests"]["total"] == 1 + assert data["requests"]["narrate"] == 1 + assert data["latency_ms"]["narrate"]["avg"] > 0 + def test_parse_intent_basic(self) -> None: settings = LLMSettings(provider="stub") app = create_llm_app(settings=settings) @@ -138,3 +209,81 @@ def test_narrate_validates_request(self) -> None: ) assert response.status_code == 422 # Validation error + + +class TestLLMMetrics: + """Tests for LLMMetrics class.""" + + def test_initial_state(self) -> None: + """Metrics start at zero.""" + metrics = LLMMetrics() + assert metrics.total_requests == 0 + assert metrics.total_errors == 0 + assert metrics.parse_intent_requests == 0 + assert metrics.narrate_requests == 0 + + def test_record_parse_intent(self) -> None: + """Recording a parse_intent request increments counters.""" + metrics = LLMMetrics() + metrics.record_parse_intent(50.0, input_tokens=100, output_tokens=50) + + assert metrics.total_requests == 1 + assert metrics.parse_intent_requests == 1 + assert len(metrics.parse_intent_latencies) == 1 + assert metrics.total_input_tokens == 100 + assert metrics.total_output_tokens == 50 + + def test_record_narrate(self) -> None: + """Recording a narrate request increments counters.""" + metrics = LLMMetrics() + metrics.record_narrate(75.0, input_tokens=200, output_tokens=100) + + assert metrics.total_requests == 1 + assert metrics.narrate_requests == 1 + assert len(metrics.narrate_latencies) == 1 + assert metrics.total_input_tokens == 200 + assert metrics.total_output_tokens == 100 + + def test_record_error(self) -> None: + """Recording an error increments error counters.""" + metrics = LLMMetrics() + metrics.record_error("parse_intent", "ValueError") + + assert metrics.total_errors == 1 + assert metrics.parse_intent_errors == 1 + assert metrics.errors_by_type["parse_intent:ValueError"] == 1 + + def test_latency_stats_empty(self) -> None: + """Empty latencies return zeros.""" + metrics = LLMMetrics() + data = metrics.to_dict() + + assert data["latency_ms"]["parse_intent"]["avg"] == 0.0 + assert data["latency_ms"]["narrate"]["avg"] == 0.0 + + def test_latency_stats_calculated(self) -> None: + """Latency statistics are calculated correctly.""" + metrics = LLMMetrics() + for i in range(10): + metrics.record_parse_intent(float(i * 10)) + + data = metrics.to_dict() + assert data["latency_ms"]["parse_intent"]["min"] == 0.0 + assert data["latency_ms"]["parse_intent"]["max"] == 90.0 + assert data["latency_ms"]["parse_intent"]["avg"] == 45.0 + + def test_to_dict_structure(self) -> None: + """to_dict returns expected structure.""" + metrics = LLMMetrics() + metrics.record_parse_intent(50.0) + metrics.record_error("parse_intent", "TestError") + + data = metrics.to_dict(provider="openai", model="gpt-4") + + assert "requests" in data + assert "errors" in data + assert "latency_ms" in data + assert "provider" in data + assert "token_usage" in data + assert data["provider"]["name"] == "openai" + assert data["provider"]["model"] == "gpt-4" From ddfa9f8f743936717600e395976890225c05c5ba Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 2 Dec 2025 19:17:00 +0000 Subject: [PATCH 03/16] refactor: Address code review feedback for metrics implementation - Extract token usage logic into _extract_token_usage helper function for consistency - Add documentation clarifying metrics counter relationships in GatewayMetrics - All 685 tests pass with 90.21% coverage Co-authored-by: SorraTheOrc <250240+SorraTheOrc@users.noreply.github.com> --- src/gengine/echoes/gateway/app.py | 23 +++++++++++++++------- src/gengine/echoes/llm/app.py | 32 ++++++++++++++++++++++--------- 2 files changed, 39 insertions(+), 16 deletions(-) diff --git a/src/gengine/echoes/gateway/app.py b/src/gengine/echoes/gateway/app.py index 6a69c45f..45bb4e80 100644 --- a/src/gengine/echoes/gateway/app.py +++ b/src/gengine/echoes/gateway/app.py @@ -25,14 +25,23 @@ @dataclass class GatewayMetrics: - """Metrics tracking for the gateway service.""" + """Metrics tracking for the gateway service. + + Note on message counters: + - websocket_messages: All messages received via WebSocket (including invalid) + - natural_language_requests: Valid natural language commands executed + - command_requests: Valid regular commands executed + + The sum of natural_language_requests + command_requests will be <= websocket_messages + since invalid messages are counted in websocket_messages but not the request counters. + """ # Request counts total_requests: int = 0 requests_by_type: dict[str, int] = field(default_factory=dict) - websocket_messages: int = 0 - natural_language_requests: int = 0 - command_requests: int = 0 + websocket_messages: int = 0 # All messages received (including invalid) + natural_language_requests: int = 0 # Valid NL commands processed + command_requests: int = 0 # Valid regular commands processed # Error tracking total_errors: int = 0 @@ -89,9 +98,9 @@ def to_dict(self) -> dict[str, Any]: "requests": { "total": self.total_requests, "by_type": dict(self.requests_by_type), - "websocket_messages": self.websocket_messages, - "natural_language": self.natural_language_requests, - "commands": self.command_requests, + "websocket_messages": self.websocket_messages, # All messages (including invalid) + "natural_language": self.natural_language_requests, # Valid NL commands + "commands": self.command_requests, # Valid regular commands }, "errors": { "total": self.total_errors, diff --git a/src/gengine/echoes/llm/app.py b/src/gengine/echoes/llm/app.py index 3ef32c30..7f0b6be9 100644 --- a/src/gengine/echoes/llm/app.py +++ b/src/gengine/echoes/llm/app.py @@ -117,6 +117,25 @@ def _calculate_latency_stats(self, latencies: list[float]) -> dict[str, float]: } +def _extract_token_usage(result: Any) -> tuple[int, int]: + """Extract token usage from LLM result. + + Tries result attributes first, then falls back to metadata dict. + Returns (input_tokens, output_tokens). + """ + # Try direct attributes first + input_tokens = getattr(result, "input_tokens", None) + output_tokens = getattr(result, "output_tokens", None) + + # Fall back to metadata dict if available + if input_tokens is None and hasattr(result, "metadata") and result.metadata: + input_tokens = result.metadata.get("input_tokens") + if output_tokens is None and hasattr(result, "metadata") and result.metadata: + output_tokens = result.metadata.get("output_tokens") + + return (input_tokens or 0, output_tokens or 0) + + class ParseIntentRequest(BaseModel): """Request payload for /parse_intent endpoint.""" @@ -225,9 +244,8 @@ async def parse_intent(request: ParseIntentRequest) -> ParseIntentResponse: request.context, ) latency_ms = (time.perf_counter() - start_time) * 1000 - # Extract token usage from metadata if available - input_tokens = getattr(result, "input_tokens", 0) or 0 - output_tokens = getattr(result, "output_tokens", 0) or 0 + # Extract token usage from result attributes or metadata + input_tokens, output_tokens = _extract_token_usage(result) metrics.record_parse_intent(latency_ms, input_tokens, output_tokens) return ParseIntentResponse( intents=result.intents, @@ -255,12 +273,8 @@ async def narrate(request: NarrateRequest) -> NarrateResponse: request.context, ) latency_ms = (time.perf_counter() - start_time) * 1000 - # Extract token usage from metadata if available - input_tokens = 0 - output_tokens = 0 - if result.metadata: - input_tokens = result.metadata.get("input_tokens", 0) or 0 - output_tokens = result.metadata.get("output_tokens", 0) or 0 + # Extract token usage from result attributes or metadata + input_tokens, output_tokens = _extract_token_usage(result) metrics.record_narrate(latency_ms, input_tokens, output_tokens) return NarrateResponse( narrative=result.narrative, From e249e305620e3090df22119cfc7b98c22bc5c2d0 Mon Sep 17 00:00:00 2001 From: Ross Gardler Date: Tue, 2 Dec 2025 11:46:25 -0800 Subject: [PATCH 04/16] fix(git_agent): ensure dev dependencies and venv activation before running tests --- .github/agents/git.agent.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/agents/git.agent.md b/.github/agents/git.agent.md index 883bc16d..09fe2dc6 100644 --- a/.github/agents/git.agent.md +++ b/.github/agents/git.agent.md @@ -38,6 +38,7 @@ as described in Atlassian's guide - **Pre-merge hygiene** - Ensure the working tree is clean before switching branches. - Verify there are no uncommitted changes that would be lost. + - Ensure dev dependencies are installed (to avoid pytest configuration errors). - Run tests (for example `pytest -v`) and basic checks before proposing a merge. @@ -77,6 +78,8 @@ arguments so they can run non-interactively. - `git rebase origin/main` - Run tests before merge: + - Activate virtual environment if present (e.g. `source .venv/bin/activate`) + - `pip install -e .[dev]` (ensure test dependencies like pytest-cov are present) - `pytest -v` - Merge feature branch into main locally: From 6c6f5f5672aa3e50be9873a6e84f337236eb821e Mon Sep 17 00:00:00 2001 From: Ross Gardler Date: Tue, 2 Dec 2025 11:49:43 -0800 Subject: [PATCH 05/16] fix(gateway): resolve syntax error after merge conflict resolution --- src/gengine/echoes/gateway/app.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/gengine/echoes/gateway/app.py b/src/gengine/echoes/gateway/app.py index 607b71c0..74a4b168 100644 --- a/src/gengine/echoes/gateway/app.py +++ b/src/gengine/echoes/gateway/app.py @@ -268,9 +268,6 @@ async def websocket_handler(websocket: WebSocket) -> None: else: metrics.command_requests += 1 result = await asyncio.to_thread(session.execute, command) - else: - metrics.command_requests += 1 - result = await asyncio.to_thread(session.execute, command) except Exception as exc: # pragma: no cover - unexpected failure LOGGER.exception("Gateway session crashed: %s", exc) metrics.record_error("execution_error") From 3a9a364c3bea59f90415abafbdbe246ac8fe9c53 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 2 Dec 2025 19:58:46 +0000 Subject: [PATCH 06/16] refactor: Use prometheus_client for Prometheus-compatible metrics format - Replace custom metrics classes with Prometheus Counter, Gauge, Histogram - Gateway /metrics now returns OpenMetrics text format - LLM /metrics now returns OpenMetrics text format - Make metrics endpoints async for thread safety - Add prometheus_client>=0.20.0 dependency - Update documentation with Prometheus format examples - Update tests to parse Prometheus text format Addresses review feedback: 1. Use prometheus_client for Prometheus-compatible format 2. Make get_metrics async for thread safety 3. Linting fixes (B904: raise from e) Co-authored-by: SorraTheOrc <250240+SorraTheOrc@users.noreply.github.com> --- docs/gengine/Deploy_GEngine_To_Kubernetes.md | 114 ++++---- pyproject.toml | 3 +- src/gengine/echoes/gateway/app.py | 276 +++++++++++-------- src/gengine/echoes/llm/app.py | 236 ++++++++-------- tests/echoes/test_gateway_service.py | 143 +++++----- tests/echoes/test_llm_app.py | 145 ++++------ 6 files changed, 460 insertions(+), 457 deletions(-) diff --git a/docs/gengine/Deploy_GEngine_To_Kubernetes.md b/docs/gengine/Deploy_GEngine_To_Kubernetes.md index 489aeff5..5b3496ca 100644 --- a/docs/gengine/Deploy_GEngine_To_Kubernetes.md +++ b/docs/gengine/Deploy_GEngine_To_Kubernetes.md @@ -635,71 +635,55 @@ Each service exposes dedicated metrics for Prometheus scraping: } ``` -**Gateway Service** (`/metrics`): -```json -{ - "service": "gateway", - "service_url": "http://simulation:8000", - "llm_service_url": "http://llm:8001", - "requests": { - "total": 150, - "by_type": {"command": 120, "natural_language": 30}, - "websocket_messages": 150, - "natural_language": 30, - "commands": 120 - }, - "errors": { - "total": 2, - "by_type": {"execution_error": 2} - }, - "latency_ms": { - "avg": 45.5, - "min": 10.2, - "max": 250.0, - "p50": 35.0, - "p95": 120.0 - }, - "connections": { - "active": 3, - "total": 25, - "disconnections": 22 - }, - "llm_integration": { - "requests": 30, - "errors": 0, - "latency_ms": {"avg": 150.0, "min": 80.0, "max": 500.0, "p50": 120.0, "p95": 350.0} - } -} -``` - -**LLM Service** (`/metrics`): -```json -{ - "service": "llm", - "requests": { - "total": 100, - "parse_intent": 80, - "narrate": 20 - }, - "errors": { - "total": 1, - "parse_intent": 1, - "narrate": 0, - "by_type": {"parse_intent:ValueError": 1} - }, - "latency_ms": { - "parse_intent": {"avg": 120.0, "min": 50.0, "max": 400.0, "p50": 100.0, "p95": 300.0}, - "narrate": {"avg": 200.0, "min": 100.0, "max": 600.0, "p50": 180.0, "p95": 450.0} - }, - "provider": { - "name": "openai", - "model": "gpt-4-turbo-preview" - }, - "token_usage": { - "total_input": 50000, - "total_output": 15000 - } -} +**Gateway Service** (`/metrics`) - Prometheus text format: +```text +# HELP gateway_requests_total Total number of requests processed +# TYPE gateway_requests_total counter +gateway_requests_total 150.0 +# HELP gateway_requests_by_type_total Requests by type +# TYPE gateway_requests_by_type_total counter +gateway_requests_by_type_total{request_type="command"} 120.0 +gateway_requests_by_type_total{request_type="natural_language"} 30.0 +# HELP gateway_errors_total Total number of errors +# TYPE gateway_errors_total counter +gateway_errors_total 2.0 +# HELP gateway_active_connections Number of active WebSocket connections +# TYPE gateway_active_connections gauge +gateway_active_connections 3.0 +# HELP gateway_request_latency_seconds Request latency in seconds +# TYPE gateway_request_latency_seconds histogram +gateway_request_latency_seconds_bucket{request_type="command",le="0.1"} 80.0 +gateway_request_latency_seconds_bucket{request_type="command",le="0.5"} 115.0 +gateway_request_latency_seconds_bucket{request_type="command",le="+Inf"} 120.0 +gateway_request_latency_seconds_count{request_type="command"} 120.0 +gateway_request_latency_seconds_sum{request_type="command"} 5.46 +``` + +**LLM Service** (`/metrics`) - Prometheus text format: +```text +# HELP llm_requests_total Total number of requests processed +# TYPE llm_requests_total counter +llm_requests_total 100.0 +# HELP llm_parse_intent_requests_total Total parse_intent requests +# TYPE llm_parse_intent_requests_total counter +llm_parse_intent_requests_total 80.0 +# HELP llm_narrate_requests_total Total narrate requests +# TYPE llm_narrate_requests_total counter +llm_narrate_requests_total 20.0 +# HELP llm_errors_total Total number of errors +# TYPE llm_errors_total counter +llm_errors_total 1.0 +# HELP llm_input_tokens_total Total input tokens used +# TYPE llm_input_tokens_total counter +llm_input_tokens_total 50000.0 +# HELP llm_output_tokens_total Total output tokens used +# TYPE llm_output_tokens_total counter +llm_output_tokens_total 15000.0 +# HELP llm_parse_intent_latency_seconds parse_intent request latency in seconds +# TYPE llm_parse_intent_latency_seconds histogram +llm_parse_intent_latency_seconds_bucket{le="1.0"} 75.0 +llm_parse_intent_latency_seconds_bucket{le="5.0"} 80.0 +llm_parse_intent_latency_seconds_bucket{le="+Inf"} 80.0 ``` ### Prometheus Annotations diff --git a/pyproject.toml b/pyproject.toml index 77f1dd0d..572585c9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,8 @@ dependencies = [ "httpx>=0.27.0,<0.28.0", "websockets>=12.0,<13.0", "openai>=1.0.0,<2.0.0", - "anthropic>=0.39.0,<1.0.0" + "anthropic>=0.39.0,<1.0.0", + "prometheus_client>=0.20.0,<1.0.0" ] [project.optional-dependencies] diff --git a/src/gengine/echoes/gateway/app.py b/src/gengine/echoes/gateway/app.py index 45bb4e80..c68ccdd6 100644 --- a/src/gengine/echoes/gateway/app.py +++ b/src/gengine/echoes/gateway/app.py @@ -7,13 +7,21 @@ import json import logging import os -import time -from dataclasses import dataclass, field -from typing import Any, Callable +from dataclasses import dataclass +from typing import Callable from fastapi import FastAPI, WebSocket, WebSocketDisconnect - -from ..cli.shell import PROMPT, ShellBackend, ServiceBackend +from fastapi.responses import Response +from prometheus_client import ( + CONTENT_TYPE_LATEST, + CollectorRegistry, + Counter, + Gauge, + Histogram, + generate_latest, +) + +from ..cli.shell import PROMPT, ServiceBackend, ShellBackend from ..client import SimServiceClient from ..settings import SimulationConfig, load_simulation_config from .llm_client import LLMClient @@ -23,117 +31,154 @@ BackendFactory = Callable[[], ShellBackend] -@dataclass class GatewayMetrics: - """Metrics tracking for the gateway service. + """Prometheus metrics tracking for the gateway service. Note on message counters: - websocket_messages: All messages received via WebSocket (including invalid) - natural_language_requests: Valid natural language commands executed - command_requests: Valid regular commands executed - The sum of natural_language_requests + command_requests will be <= websocket_messages - since invalid messages are counted in websocket_messages but not the request counters. + The sum of natural_language_requests + command_requests will be + <= websocket_messages since invalid messages are counted in + websocket_messages but not the request counters. """ - # Request counts - total_requests: int = 0 - requests_by_type: dict[str, int] = field(default_factory=dict) - websocket_messages: int = 0 # All messages received (including invalid) - natural_language_requests: int = 0 # Valid NL commands processed - command_requests: int = 0 # Valid regular commands processed + def __init__(self, registry: CollectorRegistry | None = None) -> None: + """Initialize Prometheus metrics with optional custom registry.""" + self._registry = registry or CollectorRegistry() + + # Request counters + self._total_requests = Counter( + "gateway_requests_total", + "Total number of requests processed", + registry=self._registry, + ) + self._requests_by_type = Counter( + "gateway_requests_by_type_total", + "Requests by type", + ["request_type"], + registry=self._registry, + ) + self._websocket_messages = Counter( + "gateway_websocket_messages_total", + "Total WebSocket messages received (including invalid)", + registry=self._registry, + ) + self._natural_language_requests = Counter( + "gateway_natural_language_requests_total", + "Valid natural language commands processed", + registry=self._registry, + ) + self._command_requests = Counter( + "gateway_command_requests_total", + "Valid regular commands processed", + registry=self._registry, + ) - # Error tracking - total_errors: int = 0 - errors_by_type: dict[str, int] = field(default_factory=dict) + # Error counters + self._total_errors = Counter( + "gateway_errors_total", + "Total number of errors", + registry=self._registry, + ) + self._errors_by_type = Counter( + "gateway_errors_by_type_total", + "Errors by type", + ["error_type"], + registry=self._registry, + ) - # Latency tracking (in ms) - latencies: list[float] = field(default_factory=list) - max_latency_samples: int = 1000 + # Latency histogram (in seconds for Prometheus convention) + self._request_latency = Histogram( + "gateway_request_latency_seconds", + "Request latency in seconds", + ["request_type"], + buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0], + registry=self._registry, + ) - # Connection tracking - active_connections: int = 0 - total_connections: int = 0 - total_disconnections: int = 0 + # Connection gauges + self._active_connections = Gauge( + "gateway_active_connections", + "Number of active WebSocket connections", + registry=self._registry, + ) + self._total_connections = Counter( + "gateway_connections_total", + "Total number of WebSocket connections", + registry=self._registry, + ) + self._total_disconnections = Counter( + "gateway_disconnections_total", + "Total number of WebSocket disconnections", + registry=self._registry, + ) - # LLM integration stats - llm_requests: int = 0 - llm_errors: int = 0 - llm_latencies: list[float] = field(default_factory=list) + # LLM integration metrics + self._llm_requests = Counter( + "gateway_llm_requests_total", + "Total LLM service requests", + registry=self._registry, + ) + self._llm_errors = Counter( + "gateway_llm_errors_total", + "Total LLM service errors", + registry=self._registry, + ) + self._llm_latency = Histogram( + "gateway_llm_latency_seconds", + "LLM service request latency in seconds", + buckets=[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0], + registry=self._registry, + ) - def record_request(self, request_type: str, latency_ms: float) -> None: + @property + def registry(self) -> CollectorRegistry: + """Return the Prometheus registry.""" + return self._registry + + def record_request(self, request_type: str, latency_seconds: float) -> None: """Record a request with its type and latency.""" - self.total_requests += 1 - self.requests_by_type[request_type] = self.requests_by_type.get(request_type, 0) + 1 - self._add_latency(latency_ms) + self._total_requests.inc() + self._requests_by_type.labels(request_type=request_type).inc() + self._request_latency.labels(request_type=request_type).observe(latency_seconds) def record_error(self, error_type: str) -> None: """Record an error by type.""" - self.total_errors += 1 - self.errors_by_type[error_type] = self.errors_by_type.get(error_type, 0) + 1 + self._total_errors.inc() + self._errors_by_type.labels(error_type=error_type).inc() + + def record_websocket_message(self) -> None: + """Record a WebSocket message received.""" + self._websocket_messages.inc() - def record_llm_request(self, latency_ms: float) -> None: + def record_natural_language_request(self) -> None: + """Record a natural language request.""" + self._natural_language_requests.inc() + + def record_command_request(self) -> None: + """Record a regular command request.""" + self._command_requests.inc() + + def record_llm_request(self, latency_seconds: float) -> None: """Record an LLM service request.""" - self.llm_requests += 1 - if len(self.llm_latencies) >= self.max_latency_samples: - self.llm_latencies.pop(0) - self.llm_latencies.append(latency_ms) + self._llm_requests.inc() + self._llm_latency.observe(latency_seconds) def record_llm_error(self) -> None: """Record an LLM service error.""" - self.llm_errors += 1 - - def _add_latency(self, latency_ms: float) -> None: - """Add latency sample, maintaining max samples.""" - if len(self.latencies) >= self.max_latency_samples: - self.latencies.pop(0) - self.latencies.append(latency_ms) - - def to_dict(self) -> dict[str, Any]: - """Convert metrics to dictionary for JSON serialization.""" - latency_stats = self._calculate_latency_stats(self.latencies) - llm_latency_stats = self._calculate_latency_stats(self.llm_latencies) - - return { - "requests": { - "total": self.total_requests, - "by_type": dict(self.requests_by_type), - "websocket_messages": self.websocket_messages, # All messages (including invalid) - "natural_language": self.natural_language_requests, # Valid NL commands - "commands": self.command_requests, # Valid regular commands - }, - "errors": { - "total": self.total_errors, - "by_type": dict(self.errors_by_type), - }, - "latency_ms": latency_stats, - "connections": { - "active": self.active_connections, - "total": self.total_connections, - "disconnections": self.total_disconnections, - }, - "llm_integration": { - "requests": self.llm_requests, - "errors": self.llm_errors, - "latency_ms": llm_latency_stats, - }, - } + self._llm_errors.inc() - def _calculate_latency_stats(self, latencies: list[float]) -> dict[str, float]: - """Calculate latency statistics from samples.""" - if not latencies: - return {"avg": 0.0, "min": 0.0, "max": 0.0, "p50": 0.0, "p95": 0.0} + def connection_opened(self) -> None: + """Record a new connection.""" + self._active_connections.inc() + self._total_connections.inc() - sorted_latencies = sorted(latencies) - n = len(sorted_latencies) - - return { - "avg": round(sum(latencies) / n, 2), - "min": round(min(latencies), 2), - "max": round(max(latencies), 2), - "p50": round(sorted_latencies[n // 2], 2), - "p95": round(sorted_latencies[int(n * 0.95)] if n >= 20 else sorted_latencies[-1], 2), - } + def connection_closed(self) -> None: + """Record a connection closure.""" + self._active_connections.dec() + self._total_disconnections.inc() @dataclass @@ -162,6 +207,7 @@ def create_gateway_app( settings: GatewaySettings | None = None, ) -> FastAPI: """Build the FastAPI application that fronts CLI sessions.""" + import time active_config = config or load_simulation_config() active_settings = settings or GatewaySettings.from_env() @@ -181,7 +227,7 @@ def create_gateway_app( app.state.gateway_metrics = metrics @app.get("/healthz") - def healthcheck() -> dict[str, str]: # pragma: no cover - trivial + async def healthcheck() -> dict[str, str]: # pragma: no cover - trivial health = { "status": "ok", "service_url": active_settings.service_url, @@ -191,20 +237,17 @@ def healthcheck() -> dict[str, str]: # pragma: no cover - trivial return health @app.get("/metrics") - def get_metrics() -> dict[str, Any]: - """Return gateway metrics for Prometheus scraping.""" - return { - "service": "gateway", - "service_url": active_settings.service_url, - "llm_service_url": active_settings.llm_service_url, - **metrics.to_dict(), - } + async def get_metrics() -> Response: + """Return gateway metrics in Prometheus text format.""" + return Response( + content=generate_latest(metrics.registry), + media_type=CONTENT_TYPE_LATEST, + ) @app.websocket("/ws") async def websocket_handler(websocket: WebSocket) -> None: await websocket.accept() - metrics.active_connections += 1 - metrics.total_connections += 1 + metrics.connection_opened() try: session = manager.open_session() except Exception as exc: # pragma: no cover - catastrophic setup failure @@ -212,8 +255,7 @@ async def websocket_handler(websocket: WebSocket) -> None: metrics.record_error("session_open_failed") await websocket.send_json({"type": "error", "error": str(exc)}) await websocket.close() - metrics.active_connections -= 1 - metrics.total_disconnections += 1 + metrics.connection_closed() return try: @@ -231,7 +273,7 @@ async def websocket_handler(websocket: WebSocket) -> None: message_data = await _receive_message(websocket) except WebSocketDisconnect: break - metrics.websocket_messages += 1 + metrics.record_websocket_message() if message_data is None: metrics.record_error("invalid_payload") await websocket.send_json( @@ -259,13 +301,15 @@ async def websocket_handler(websocket: WebSocket) -> None: start_time = time.perf_counter() try: if is_nl and session.llm_client: - metrics.natural_language_requests += 1 + metrics.record_natural_language_request() llm_start = time.perf_counter() - result = await asyncio.to_thread(session.execute_natural_language, command) - llm_latency = (time.perf_counter() - llm_start) * 1000 + result = await asyncio.to_thread( + session.execute_natural_language, command + ) + llm_latency = time.perf_counter() - llm_start metrics.record_llm_request(llm_latency) else: - metrics.command_requests += 1 + metrics.record_command_request() result = await asyncio.to_thread(session.execute, command) except Exception as exc: # pragma: no cover - unexpected failure LOGGER.exception("Gateway session crashed: %s", exc) @@ -280,9 +324,9 @@ async def websocket_handler(websocket: WebSocket) -> None: ) continue - latency_ms = (time.perf_counter() - start_time) * 1000 + latency_seconds = time.perf_counter() - start_time request_type = "natural_language" if is_nl else "command" - metrics.record_request(request_type, latency_ms) + metrics.record_request(request_type, latency_seconds) await websocket.send_json( { @@ -298,8 +342,7 @@ async def websocket_handler(websocket: WebSocket) -> None: except WebSocketDisconnect: LOGGER.info("Gateway session %s disconnected", session.session_id) finally: - metrics.active_connections -= 1 - metrics.total_disconnections += 1 + metrics.connection_closed() await asyncio.to_thread(session.close) with contextlib.suppress(WebSocketDisconnect): await websocket.close() @@ -338,7 +381,9 @@ def open_session(self) -> GatewaySession: LOGGER.warning("LLM service unhealthy at %s", self._llm_service_url) if self._metrics: self._metrics.record_llm_error() - return GatewaySession(backend, limits=self._config.limits, llm_client=llm_client) + return GatewaySession( + backend, limits=self._config.limits, llm_client=llm_client + ) async def _receive_message(websocket: WebSocket) -> dict[str, str | bool] | None: @@ -372,7 +417,8 @@ async def _receive_message(websocket: WebSocket) -> dict[str, str | bool] | None data = json.loads(payload.decode("utf-8")) except (json.JSONDecodeError, UnicodeDecodeError): # Binary decoded as text command - return {"command": payload.decode("utf-8", errors="ignore"), "natural_language": False} + decoded = payload.decode("utf-8", errors="ignore") + return {"command": decoded, "natural_language": False} if isinstance(data, dict): command = data.get("command") diff --git a/src/gengine/echoes/llm/app.py b/src/gengine/echoes/llm/app.py index 7f0b6be9..94af469a 100644 --- a/src/gengine/echoes/llm/app.py +++ b/src/gengine/echoes/llm/app.py @@ -3,118 +3,139 @@ from __future__ import annotations import time -from dataclasses import dataclass, field from typing import Any from fastapi import FastAPI, HTTPException +from fastapi.responses import Response +from prometheus_client import ( + CONTENT_TYPE_LATEST, + CollectorRegistry, + Counter, + Histogram, + generate_latest, +) from pydantic import BaseModel, Field from .providers import LLMProvider, create_provider from .settings import LLMSettings -@dataclass class LLMMetrics: - """Metrics tracking for the LLM service.""" + """Prometheus metrics tracking for the LLM service.""" - # Request counts - total_requests: int = 0 - parse_intent_requests: int = 0 - narrate_requests: int = 0 - - # Error tracking - total_errors: int = 0 - parse_intent_errors: int = 0 - narrate_errors: int = 0 - errors_by_type: dict[str, int] = field(default_factory=dict) - - # Latency tracking (in ms) - parse_intent_latencies: list[float] = field(default_factory=list) - narrate_latencies: list[float] = field(default_factory=list) - max_latency_samples: int = 1000 - - # Token usage (if available from provider) - total_input_tokens: int = 0 - total_output_tokens: int = 0 - - def record_parse_intent(self, latency_ms: float, input_tokens: int = 0, output_tokens: int = 0) -> None: + def __init__(self, registry: CollectorRegistry | None = None) -> None: + """Initialize Prometheus metrics with optional custom registry.""" + self._registry = registry or CollectorRegistry() + + # Request counters + self._total_requests = Counter( + "llm_requests_total", + "Total number of requests processed", + registry=self._registry, + ) + self._parse_intent_requests = Counter( + "llm_parse_intent_requests_total", + "Total parse_intent requests", + registry=self._registry, + ) + self._narrate_requests = Counter( + "llm_narrate_requests_total", + "Total narrate requests", + registry=self._registry, + ) + + # Error counters + self._total_errors = Counter( + "llm_errors_total", + "Total number of errors", + registry=self._registry, + ) + self._parse_intent_errors = Counter( + "llm_parse_intent_errors_total", + "Total parse_intent errors", + registry=self._registry, + ) + self._narrate_errors = Counter( + "llm_narrate_errors_total", + "Total narrate errors", + registry=self._registry, + ) + self._errors_by_type = Counter( + "llm_errors_by_type_total", + "Errors by endpoint and type", + ["endpoint", "error_type"], + registry=self._registry, + ) + + # Latency histograms (in seconds for Prometheus convention) + self._parse_intent_latency = Histogram( + "llm_parse_intent_latency_seconds", + "parse_intent request latency in seconds", + buckets=[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0], + registry=self._registry, + ) + self._narrate_latency = Histogram( + "llm_narrate_latency_seconds", + "narrate request latency in seconds", + buckets=[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0], + registry=self._registry, + ) + + # Token usage counters + self._total_input_tokens = Counter( + "llm_input_tokens_total", + "Total input tokens used", + registry=self._registry, + ) + self._total_output_tokens = Counter( + "llm_output_tokens_total", + "Total output tokens used", + registry=self._registry, + ) + + @property + def registry(self) -> CollectorRegistry: + """Return the Prometheus registry.""" + return self._registry + + def record_parse_intent( + self, + latency_seconds: float, + input_tokens: int = 0, + output_tokens: int = 0, + ) -> None: """Record a parse_intent request.""" - self.total_requests += 1 - self.parse_intent_requests += 1 - self._add_latency(self.parse_intent_latencies, latency_ms) - self.total_input_tokens += input_tokens - self.total_output_tokens += output_tokens - - def record_narrate(self, latency_ms: float, input_tokens: int = 0, output_tokens: int = 0) -> None: + self._total_requests.inc() + self._parse_intent_requests.inc() + self._parse_intent_latency.observe(latency_seconds) + if input_tokens > 0: + self._total_input_tokens.inc(input_tokens) + if output_tokens > 0: + self._total_output_tokens.inc(output_tokens) + + def record_narrate( + self, + latency_seconds: float, + input_tokens: int = 0, + output_tokens: int = 0, + ) -> None: """Record a narrate request.""" - self.total_requests += 1 - self.narrate_requests += 1 - self._add_latency(self.narrate_latencies, latency_ms) - self.total_input_tokens += input_tokens - self.total_output_tokens += output_tokens + self._total_requests.inc() + self._narrate_requests.inc() + self._narrate_latency.observe(latency_seconds) + if input_tokens > 0: + self._total_input_tokens.inc(input_tokens) + if output_tokens > 0: + self._total_output_tokens.inc(output_tokens) def record_error(self, endpoint: str, error_type: str) -> None: """Record an error by endpoint and type.""" - self.total_errors += 1 + self._total_errors.inc() if endpoint == "parse_intent": - self.parse_intent_errors += 1 + self._parse_intent_errors.inc() elif endpoint == "narrate": - self.narrate_errors += 1 - key = f"{endpoint}:{error_type}" - self.errors_by_type[key] = self.errors_by_type.get(key, 0) + 1 - - def _add_latency(self, latencies: list[float], latency_ms: float) -> None: - """Add latency sample, maintaining max samples.""" - if len(latencies) >= self.max_latency_samples: - latencies.pop(0) - latencies.append(latency_ms) - - def to_dict(self, provider: str = "unknown", model: str | None = None) -> dict[str, Any]: - """Convert metrics to dictionary for JSON serialization.""" - parse_intent_stats = self._calculate_latency_stats(self.parse_intent_latencies) - narrate_stats = self._calculate_latency_stats(self.narrate_latencies) - - return { - "requests": { - "total": self.total_requests, - "parse_intent": self.parse_intent_requests, - "narrate": self.narrate_requests, - }, - "errors": { - "total": self.total_errors, - "parse_intent": self.parse_intent_errors, - "narrate": self.narrate_errors, - "by_type": dict(self.errors_by_type), - }, - "latency_ms": { - "parse_intent": parse_intent_stats, - "narrate": narrate_stats, - }, - "provider": { - "name": provider, - "model": model or "N/A", - }, - "token_usage": { - "total_input": self.total_input_tokens, - "total_output": self.total_output_tokens, - }, - } - - def _calculate_latency_stats(self, latencies: list[float]) -> dict[str, float]: - """Calculate latency statistics from samples.""" - if not latencies: - return {"avg": 0.0, "min": 0.0, "max": 0.0, "p50": 0.0, "p95": 0.0} - - sorted_latencies = sorted(latencies) - n = len(sorted_latencies) - - return { - "avg": round(sum(latencies) / n, 2), - "min": round(min(latencies), 2), - "max": round(max(latencies), 2), - "p50": round(sorted_latencies[n // 2], 2), - "p95": round(sorted_latencies[int(n * 0.95)] if n >= 20 else sorted_latencies[-1], 2), - } + self._narrate_errors.inc() + self._errors_by_type.labels(endpoint=endpoint, error_type=error_type).inc() def _extract_token_usage(result: Any) -> tuple[int, int]: @@ -220,15 +241,12 @@ async def health_check() -> dict[str, Any]: } @app.get("/metrics") - async def get_metrics() -> dict[str, Any]: - """Return LLM service metrics for Prometheus scraping.""" - return { - "service": "llm", - **metrics.to_dict( - provider=app.state.llm_settings.provider, - model=app.state.llm_settings.model, - ), - } + async def get_metrics() -> Response: + """Return LLM service metrics in Prometheus text format.""" + return Response( + content=generate_latest(metrics.registry), + media_type=CONTENT_TYPE_LATEST, + ) @app.post("/parse_intent", response_model=ParseIntentResponse) async def parse_intent(request: ParseIntentRequest) -> ParseIntentResponse: @@ -243,10 +261,10 @@ async def parse_intent(request: ParseIntentRequest) -> ParseIntentResponse: request.user_input, request.context, ) - latency_ms = (time.perf_counter() - start_time) * 1000 + latency_seconds = time.perf_counter() - start_time # Extract token usage from result attributes or metadata input_tokens, output_tokens = _extract_token_usage(result) - metrics.record_parse_intent(latency_ms, input_tokens, output_tokens) + metrics.record_parse_intent(latency_seconds, input_tokens, output_tokens) return ParseIntentResponse( intents=result.intents, raw_response=result.raw_response, @@ -257,7 +275,7 @@ async def parse_intent(request: ParseIntentRequest) -> ParseIntentResponse: raise HTTPException( status_code=500, detail=f"Intent parsing failed: {str(e)}", - ) + ) from e @app.post("/narrate", response_model=NarrateResponse) async def narrate(request: NarrateRequest) -> NarrateResponse: @@ -272,10 +290,10 @@ async def narrate(request: NarrateRequest) -> NarrateResponse: request.events, request.context, ) - latency_ms = (time.perf_counter() - start_time) * 1000 + latency_seconds = time.perf_counter() - start_time # Extract token usage from result attributes or metadata input_tokens, output_tokens = _extract_token_usage(result) - metrics.record_narrate(latency_ms, input_tokens, output_tokens) + metrics.record_narrate(latency_seconds, input_tokens, output_tokens) return NarrateResponse( narrative=result.narrative, raw_response=result.raw_response, @@ -286,6 +304,6 @@ async def narrate(request: NarrateRequest) -> NarrateResponse: raise HTTPException( status_code=500, detail=f"Narration failed: {str(e)}", - ) + ) from e return app diff --git a/tests/echoes/test_gateway_service.py b/tests/echoes/test_gateway_service.py index 32f774f7..ec8a07b3 100644 --- a/tests/echoes/test_gateway_service.py +++ b/tests/echoes/test_gateway_service.py @@ -3,12 +3,31 @@ from __future__ import annotations from fastapi.testclient import TestClient +from prometheus_client import generate_latest from gengine.echoes.gateway.app import GatewaySettings, GatewayMetrics, create_gateway_app from gengine.echoes.cli.shell import LocalBackend from gengine.echoes.sim import SimEngine +def _parse_prometheus_metrics(text: str) -> dict[str, float]: + """Parse Prometheus text format into a dict of metric name -> value.""" + metrics = {} + for line in text.strip().split("\n"): + if line.startswith("#") or not line: + continue + # Parse lines like "gateway_requests_total 0.0" + parts = line.split() + if len(parts) >= 2: + name = parts[0] + try: + value = float(parts[-1]) + metrics[name] = value + except ValueError: + pass + return metrics + + def test_gateway_healthcheck(sim_config, gateway_settings) -> None: app = create_gateway_app( backend_factory=_local_backend_factory(sim_config), @@ -24,7 +43,7 @@ def test_gateway_healthcheck(sim_config, gateway_settings) -> None: def test_gateway_metrics_endpoint(sim_config, gateway_settings) -> None: - """Verify that the /metrics endpoint returns expected structure.""" + """Verify that the /metrics endpoint returns Prometheus format.""" app = create_gateway_app( backend_factory=_local_backend_factory(sim_config), config=sim_config, @@ -33,31 +52,17 @@ def test_gateway_metrics_endpoint(sim_config, gateway_settings) -> None: client = TestClient(app) response = client.get("/metrics") assert response.status_code == 200 - data = response.json() - - # Check service identification - assert data["service"] == "gateway" - assert data["service_url"] == "local" - - # Check requests section - assert "requests" in data - assert data["requests"]["total"] == 0 - assert "by_type" in data["requests"] - # Check errors section - assert "errors" in data - assert data["errors"]["total"] == 0 + # Check content type is Prometheus text format + assert "text/plain" in response.headers.get("content-type", "") - # Check latency section - assert "latency_ms" in data - assert "avg" in data["latency_ms"] + # Parse Prometheus format + metrics = _parse_prometheus_metrics(response.text) - # Check connections section - assert "connections" in data - assert data["connections"]["active"] == 0 - - # Check LLM integration section - assert "llm_integration" in data + # Check key metrics exist + assert "gateway_requests_total" in metrics + assert "gateway_errors_total" in metrics + assert "gateway_active_connections" in metrics def test_gateway_metrics_track_websocket_connections(sim_config, gateway_settings) -> None: @@ -71,8 +76,8 @@ def test_gateway_metrics_track_websocket_connections(sim_config, gateway_setting # Initial metrics response = client.get("/metrics") - initial = response.json() - assert initial["connections"]["total"] == 0 + initial = _parse_prometheus_metrics(response.text) + assert initial.get("gateway_connections_total", 0) == 0 # Connect and disconnect with client.websocket_connect("/ws") as websocket: @@ -82,9 +87,9 @@ def test_gateway_metrics_track_websocket_connections(sim_config, gateway_setting # Check metrics after connection response = client.get("/metrics") - data = response.json() - assert data["connections"]["total"] == 1 - assert data["connections"]["disconnections"] == 1 + data = _parse_prometheus_metrics(response.text) + assert data.get("gateway_connections_total", 0) == 1 + assert data.get("gateway_disconnections_total", 0) == 1 def test_gateway_metrics_track_commands(sim_config, gateway_settings) -> None: @@ -104,12 +109,12 @@ def test_gateway_metrics_track_commands(sim_config, gateway_settings) -> None: _ = websocket.receive_json() response = client.get("/metrics") - data = response.json() + data = _parse_prometheus_metrics(response.text) # Should have recorded the "summary" command (exit is not counted because it exits) # Actually both are recorded - assert data["requests"]["total"] >= 1 - assert data["requests"]["commands"] >= 1 + assert data.get("gateway_requests_total", 0) >= 1 + assert data.get("gateway_command_requests_total", 0) >= 1 def test_gateway_websocket_summary_and_exit(sim_config, gateway_settings) -> None: @@ -313,74 +318,50 @@ def _factory() -> LocalBackend: class TestGatewayMetrics: - """Tests for GatewayMetrics class.""" - - def test_initial_state(self) -> None: - """Metrics start at zero.""" - metrics = GatewayMetrics() - assert metrics.total_requests == 0 - assert metrics.total_errors == 0 - assert metrics.active_connections == 0 + """Tests for GatewayMetrics class with Prometheus.""" def test_record_request(self) -> None: - """Recording a request increments counters and stores latency.""" + """Recording a request increments counters.""" metrics = GatewayMetrics() - metrics.record_request("command", 50.0) + metrics.record_request("command", 0.050) # 50ms in seconds - assert metrics.total_requests == 1 - assert metrics.requests_by_type["command"] == 1 - assert len(metrics.latencies) == 1 - assert metrics.latencies[0] == 50.0 + # Verify by checking Prometheus output + output = generate_latest(metrics.registry).decode("utf-8") + assert "gateway_requests_total 1.0" in output + assert 'gateway_requests_by_type_total{request_type="command"} 1.0' in output def test_record_error(self) -> None: """Recording an error increments error counters.""" metrics = GatewayMetrics() metrics.record_error("execution_error") - assert metrics.total_errors == 1 - assert metrics.errors_by_type["execution_error"] == 1 + output = generate_latest(metrics.registry).decode("utf-8") + assert "gateway_errors_total 1.0" in output + assert 'gateway_errors_by_type_total{error_type="execution_error"} 1.0' in output def test_record_llm_request(self) -> None: """Recording an LLM request tracks separately.""" metrics = GatewayMetrics() - metrics.record_llm_request(100.0) + metrics.record_llm_request(0.100) # 100ms in seconds - assert metrics.llm_requests == 1 - assert len(metrics.llm_latencies) == 1 - assert metrics.llm_latencies[0] == 100.0 + output = generate_latest(metrics.registry).decode("utf-8") + assert "gateway_llm_requests_total 1.0" in output - def test_latency_stats_empty(self) -> None: - """Empty latencies return zeros.""" + def test_connection_tracking(self) -> None: + """Connection open/close updates gauges and counters.""" metrics = GatewayMetrics() - data = metrics.to_dict() + metrics.connection_opened() - assert data["latency_ms"]["avg"] == 0.0 - assert data["latency_ms"]["min"] == 0.0 - assert data["latency_ms"]["max"] == 0.0 - - def test_latency_stats_calculated(self) -> None: - """Latency statistics are calculated correctly.""" - metrics = GatewayMetrics() - for i in range(10): - metrics.record_request("test", float(i * 10)) + output = generate_latest(metrics.registry).decode("utf-8") + assert "gateway_active_connections 1.0" in output + assert "gateway_connections_total 1.0" in output - data = metrics.to_dict() - assert data["latency_ms"]["min"] == 0.0 - assert data["latency_ms"]["max"] == 90.0 - assert data["latency_ms"]["avg"] == 45.0 + metrics.connection_closed() + output = generate_latest(metrics.registry).decode("utf-8") + assert "gateway_active_connections 0.0" in output + assert "gateway_disconnections_total 1.0" in output - def test_to_dict_structure(self) -> None: - """to_dict returns expected structure.""" + def test_registry_property(self) -> None: + """Registry property returns the collector registry.""" metrics = GatewayMetrics() - metrics.record_request("command", 50.0) - metrics.record_error("test_error") - metrics.active_connections = 2 - - data = metrics.to_dict() - - assert "requests" in data - assert "errors" in data - assert "latency_ms" in data - assert "connections" in data - assert "llm_integration" in data - assert data["connections"]["active"] == 2 + assert metrics.registry is not None diff --git a/tests/echoes/test_llm_app.py b/tests/echoes/test_llm_app.py index 5224b436..6cdbd003 100644 --- a/tests/echoes/test_llm_app.py +++ b/tests/echoes/test_llm_app.py @@ -3,11 +3,30 @@ from __future__ import annotations from fastapi.testclient import TestClient +from prometheus_client import generate_latest from gengine.echoes.llm.app import create_llm_app, LLMMetrics from gengine.echoes.llm.settings import LLMSettings +def _parse_prometheus_metrics(text: str) -> dict[str, float]: + """Parse Prometheus text format into a dict of metric name -> value.""" + metrics = {} + for line in text.strip().split("\n"): + if line.startswith("#") or not line: + continue + # Parse lines like "llm_requests_total 0.0" + parts = line.split() + if len(parts) >= 2: + name = parts[0] + try: + value = float(parts[-1]) + metrics[name] = value + except ValueError: + pass + return metrics + + class TestLLMApp: """Tests for LLM service FastAPI application.""" @@ -23,39 +42,23 @@ def test_health_check(self) -> None: assert data["provider"] == "stub" def test_metrics_endpoint(self) -> None: - """Verify that /metrics endpoint returns expected structure.""" + """Verify that /metrics endpoint returns Prometheus format.""" settings = LLMSettings(provider="stub") app = create_llm_app(settings=settings) client = TestClient(app) response = client.get("/metrics") assert response.status_code == 200 - data = response.json() - - # Check service identification - assert data["service"] == "llm" - # Check requests section - assert "requests" in data - assert data["requests"]["total"] == 0 - assert data["requests"]["parse_intent"] == 0 - assert data["requests"]["narrate"] == 0 + # Check content type is Prometheus text format + assert "text/plain" in response.headers.get("content-type", "") - # Check errors section - assert "errors" in data - assert data["errors"]["total"] == 0 + # Parse Prometheus format + metrics = _parse_prometheus_metrics(response.text) - # Check latency section - assert "latency_ms" in data - assert "parse_intent" in data["latency_ms"] - assert "narrate" in data["latency_ms"] - - # Check provider section - assert "provider" in data - assert data["provider"]["name"] == "stub" - - # Check token usage section - assert "token_usage" in data + # Check key metrics exist + assert "llm_requests_total" in metrics + assert "llm_errors_total" in metrics def test_metrics_track_parse_intent(self) -> None: """Verify that parse_intent requests are tracked in metrics.""" @@ -69,11 +72,10 @@ def test_metrics_track_parse_intent(self) -> None: ) response = client.get("/metrics") - data = response.json() + metrics = _parse_prometheus_metrics(response.text) - assert data["requests"]["total"] == 1 - assert data["requests"]["parse_intent"] == 1 - assert data["latency_ms"]["parse_intent"]["avg"] > 0 + assert metrics.get("llm_requests_total", 0) == 1 + assert metrics.get("llm_parse_intent_requests_total", 0) == 1 def test_metrics_track_narrate(self) -> None: """Verify that narrate requests are tracked in metrics.""" @@ -87,11 +89,10 @@ def test_metrics_track_narrate(self) -> None: ) response = client.get("/metrics") - data = response.json() + metrics = _parse_prometheus_metrics(response.text) - assert data["requests"]["total"] == 1 - assert data["requests"]["narrate"] == 1 - assert data["latency_ms"]["narrate"]["avg"] > 0 + assert metrics.get("llm_requests_total", 0) == 1 + assert metrics.get("llm_narrate_requests_total", 0) == 1 def test_parse_intent_basic(self) -> None: settings = LLMSettings(provider="stub") @@ -212,78 +213,50 @@ def test_narrate_validates_request(self) -> None: class TestLLMMetrics: - """Tests for LLMMetrics class.""" - - def test_initial_state(self) -> None: - """Metrics start at zero.""" - metrics = LLMMetrics() - assert metrics.total_requests == 0 - assert metrics.total_errors == 0 - assert metrics.parse_intent_requests == 0 - assert metrics.narrate_requests == 0 + """Tests for LLMMetrics class with Prometheus.""" def test_record_parse_intent(self) -> None: """Recording a parse_intent request increments counters.""" metrics = LLMMetrics() - metrics.record_parse_intent(50.0, input_tokens=100, output_tokens=50) + metrics.record_parse_intent(0.050, input_tokens=100, output_tokens=50) # 50ms in seconds - assert metrics.total_requests == 1 - assert metrics.parse_intent_requests == 1 - assert len(metrics.parse_intent_latencies) == 1 - assert metrics.total_input_tokens == 100 - assert metrics.total_output_tokens == 50 + output = generate_latest(metrics.registry).decode("utf-8") + assert "llm_requests_total 1.0" in output + assert "llm_parse_intent_requests_total 1.0" in output + assert "llm_input_tokens_total 100.0" in output + assert "llm_output_tokens_total 50.0" in output def test_record_narrate(self) -> None: """Recording a narrate request increments counters.""" metrics = LLMMetrics() - metrics.record_narrate(75.0, input_tokens=200, output_tokens=100) + metrics.record_narrate(0.075, input_tokens=200, output_tokens=100) # 75ms in seconds - assert metrics.total_requests == 1 - assert metrics.narrate_requests == 1 - assert len(metrics.narrate_latencies) == 1 - assert metrics.total_input_tokens == 200 - assert metrics.total_output_tokens == 100 + output = generate_latest(metrics.registry).decode("utf-8") + assert "llm_requests_total 1.0" in output + assert "llm_narrate_requests_total 1.0" in output + assert "llm_input_tokens_total 200.0" in output + assert "llm_output_tokens_total 100.0" in output def test_record_error(self) -> None: """Recording an error increments error counters.""" metrics = LLMMetrics() metrics.record_error("parse_intent", "ValueError") - assert metrics.total_errors == 1 - assert metrics.parse_intent_errors == 1 - assert metrics.errors_by_type["parse_intent:ValueError"] == 1 + output = generate_latest(metrics.registry).decode("utf-8") + assert "llm_errors_total 1.0" in output + assert "llm_parse_intent_errors_total 1.0" in output + assert 'llm_errors_by_type_total{endpoint="parse_intent",error_type="ValueError"} 1.0' in output - def test_latency_stats_empty(self) -> None: - """Empty latencies return zeros.""" + def test_record_narrate_error(self) -> None: + """Recording a narrate error increments narrate error counter.""" metrics = LLMMetrics() - data = metrics.to_dict() + metrics.record_error("narrate", "RuntimeError") - assert data["latency_ms"]["parse_intent"]["avg"] == 0.0 - assert data["latency_ms"]["narrate"]["avg"] == 0.0 + output = generate_latest(metrics.registry).decode("utf-8") + assert "llm_errors_total 1.0" in output + assert "llm_narrate_errors_total 1.0" in output - def test_latency_stats_calculated(self) -> None: - """Latency statistics are calculated correctly.""" + def test_registry_property(self) -> None: + """Registry property returns the collector registry.""" metrics = LLMMetrics() - for i in range(10): - metrics.record_parse_intent(float(i * 10)) - - data = metrics.to_dict() - assert data["latency_ms"]["parse_intent"]["min"] == 0.0 - assert data["latency_ms"]["parse_intent"]["max"] == 90.0 - assert data["latency_ms"]["parse_intent"]["avg"] == 45.0 - - def test_to_dict_structure(self) -> None: - """to_dict returns expected structure.""" - metrics = LLMMetrics() - metrics.record_parse_intent(50.0) - metrics.record_error("parse_intent", "TestError") - - data = metrics.to_dict(provider="openai", model="gpt-4") - - assert "requests" in data - assert "errors" in data - assert "latency_ms" in data - assert "provider" in data - assert "token_usage" in data - assert data["provider"]["name"] == "openai" - assert data["provider"]["model"] == "gpt-4" + assert metrics.registry is not None From f22edd2fa33af7e657c92c17827a92246de9ccb3 Mon Sep 17 00:00:00 2001 From: Ross Gardler Date: Tue, 2 Dec 2025 12:09:13 -0800 Subject: [PATCH 07/16] fix(merge): resolve conflict at line 260 using remote version --- src/gengine/echoes/gateway/app.py | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/src/gengine/echoes/gateway/app.py b/src/gengine/echoes/gateway/app.py index 567cb646..0c4965f8 100644 --- a/src/gengine/echoes/gateway/app.py +++ b/src/gengine/echoes/gateway/app.py @@ -257,24 +257,14 @@ async def websocket_handler(websocket: WebSocket) -> None: } ) continue -<<<<<<< HEAD - - start_time = time.perf_counter() - try: - if is_nl and session.llm_client: - metrics.natural_language_requests += 1 - llm_start = time.perf_counter() - result = await asyncio.to_thread(session.execute_natural_language, command) - llm_latency = (time.perf_counter() - llm_start) * 1000 - metrics.record_llm_request(llm_latency) -======= - try: if is_nl and session.llm_client: result = await asyncio.to_thread( session.execute_natural_language, command ) ->>>>>>> origin/main + else: + metrics.command_requests += 1 + result = await asyncio.to_thread(session.execute, command) else: metrics.command_requests += 1 result = await asyncio.to_thread(session.execute, command) From 89fc99e1d84ed80b396efa4680f613cf2f659729 Mon Sep 17 00:00:00 2001 From: Ross Gardler Date: Tue, 2 Dec 2025 12:09:29 -0800 Subject: [PATCH 08/16] fix(merge): remove duplicate else block after conflict resolution --- src/gengine/echoes/gateway/app.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/gengine/echoes/gateway/app.py b/src/gengine/echoes/gateway/app.py index 0c4965f8..925135b0 100644 --- a/src/gengine/echoes/gateway/app.py +++ b/src/gengine/echoes/gateway/app.py @@ -265,9 +265,6 @@ async def websocket_handler(websocket: WebSocket) -> None: else: metrics.command_requests += 1 result = await asyncio.to_thread(session.execute, command) - else: - metrics.command_requests += 1 - result = await asyncio.to_thread(session.execute, command) except Exception as exc: # pragma: no cover - unexpected failure LOGGER.exception("Gateway session crashed: %s", exc) metrics.record_error("execution_error") From 68ebe9b55a4b46aabae2e70fbdfb6fefb932e5a6 Mon Sep 17 00:00:00 2001 From: Ross Gardler Date: Tue, 2 Dec 2025 18:51:19 -0800 Subject: [PATCH 09/16] fix(gateway): remove merge artifact and restore correct command execution logic --- src/gengine/echoes/gateway/app.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/gengine/echoes/gateway/app.py b/src/gengine/echoes/gateway/app.py index 925135b0..91116ac2 100644 --- a/src/gengine/echoes/gateway/app.py +++ b/src/gengine/echoes/gateway/app.py @@ -257,11 +257,17 @@ async def websocket_handler(websocket: WebSocket) -> None: } ) continue + start_time = time.perf_counter() try: if is_nl and session.llm_client: - result = await asyncio.to_thread( - session.execute_natural_language, command - ) + metrics.natural_language_requests += 1 + llm_start = time.perf_counter() + result = await asyncio.to_thread(session.execute_natural_language, command) + llm_latency = (time.perf_counter() - llm_start) * 1000 + metrics.record_llm_request(llm_latency) + else: + metrics.command_requests += 1 + result = await asyncio.to_thread(session.execute, command) else: metrics.command_requests += 1 result = await asyncio.to_thread(session.execute, command) From 1ba6e830977f1c0eb9d557e602d5d5d237e4143f Mon Sep 17 00:00:00 2001 From: Ross Gardler Date: Tue, 2 Dec 2025 18:51:37 -0800 Subject: [PATCH 10/16] fix(gateway): remove duplicate else block causing syntax error --- src/gengine/echoes/gateway/app.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/gengine/echoes/gateway/app.py b/src/gengine/echoes/gateway/app.py index 91116ac2..d9725c2c 100644 --- a/src/gengine/echoes/gateway/app.py +++ b/src/gengine/echoes/gateway/app.py @@ -268,9 +268,6 @@ async def websocket_handler(websocket: WebSocket) -> None: else: metrics.command_requests += 1 result = await asyncio.to_thread(session.execute, command) - else: - metrics.command_requests += 1 - result = await asyncio.to_thread(session.execute, command) except Exception as exc: # pragma: no cover - unexpected failure LOGGER.exception("Gateway session crashed: %s", exc) metrics.record_error("execution_error") From 7f5b3b0891da87a6469d13f26c2f61cc50a47cc1 Mon Sep 17 00:00:00 2001 From: Ross Gardler Date: Tue, 2 Dec 2025 18:51:57 -0800 Subject: [PATCH 11/16] fix(gateway): remove merge artifact and restore correct logic in open_session --- src/gengine/echoes/gateway/app.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/gengine/echoes/gateway/app.py b/src/gengine/echoes/gateway/app.py index d9725c2c..74a4b168 100644 --- a/src/gengine/echoes/gateway/app.py +++ b/src/gengine/echoes/gateway/app.py @@ -337,15 +337,9 @@ def open_session(self) -> GatewaySession: # Check LLM service health if not llm_client.healthcheck(): LOGGER.warning("LLM service unhealthy at %s", self._llm_service_url) -<<<<<<< HEAD if self._metrics: self._metrics.record_llm_error() return GatewaySession(backend, limits=self._config.limits, llm_client=llm_client) -======= - return GatewaySession( - backend, limits=self._config.limits, llm_client=llm_client - ) ->>>>>>> origin/main async def _receive_message(websocket: WebSocket) -> dict[str, str | bool] | None: From 1ea3b241a195435346cfe77bda90c875450d35d6 Mon Sep 17 00:00:00 2001 From: Ross Gardler Date: Tue, 2 Dec 2025 18:52:18 -0800 Subject: [PATCH 12/16] fix(tests): remove merge artifact and restore correct imports in test_gateway_service.py --- tests/echoes/test_gateway_service.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/tests/echoes/test_gateway_service.py b/tests/echoes/test_gateway_service.py index e8a1e18c..5052b00e 100644 --- a/tests/echoes/test_gateway_service.py +++ b/tests/echoes/test_gateway_service.py @@ -4,12 +4,8 @@ from fastapi.testclient import TestClient -<<<<<<< HEAD -from gengine.echoes.gateway.app import GatewaySettings, GatewayMetrics, create_gateway_app -======= ->>>>>>> origin/main from gengine.echoes.cli.shell import LocalBackend -from gengine.echoes.gateway.app import GatewaySettings, create_gateway_app +from gengine.echoes.gateway.app import GatewaySettings, GatewayMetrics, create_gateway_app from gengine.echoes.sim import SimEngine From a0ee70183e0874e5095af3c4b059217ae4517cf4 Mon Sep 17 00:00:00 2001 From: Ross Gardler Date: Tue, 2 Dec 2025 19:03:50 -0800 Subject: [PATCH 13/16] Fix ruff linting errors: import sorting, line length, and indentation in metrics tests and gateway app --- src/gengine/echoes/gateway/app.py | 25 ++++++++++++++++++------- tests/echoes/test_gateway_service.py | 11 +++++++++-- tests/echoes/test_llm_app.py | 19 +++++++++++++++---- 3 files changed, 42 insertions(+), 13 deletions(-) diff --git a/src/gengine/echoes/gateway/app.py b/src/gengine/echoes/gateway/app.py index 74a4b168..3b5341f6 100644 --- a/src/gengine/echoes/gateway/app.py +++ b/src/gengine/echoes/gateway/app.py @@ -32,8 +32,10 @@ class GatewayMetrics: - natural_language_requests: Valid natural language commands executed - command_requests: Valid regular commands executed - The sum of natural_language_requests + command_requests will be <= websocket_messages - since invalid messages are counted in websocket_messages but not the request counters. + The sum of natural_language_requests + command_requests will be <= \ + websocket_messages. + Invalid messages are counted in websocket_messages but not the request \ + counters. """ # Request counts @@ -64,7 +66,9 @@ class GatewayMetrics: def record_request(self, request_type: str, latency_ms: float) -> None: """Record a request with its type and latency.""" self.total_requests += 1 - self.requests_by_type[request_type] = self.requests_by_type.get(request_type, 0) + 1 + self.requests_by_type[request_type] = ( + self.requests_by_type.get(request_type, 0) + 1 + ) self._add_latency(latency_ms) def record_error(self, error_type: str) -> None: @@ -98,7 +102,8 @@ def to_dict(self) -> dict[str, Any]: "requests": { "total": self.total_requests, "by_type": dict(self.requests_by_type), - "websocket_messages": self.websocket_messages, # All messages (including invalid) + # All messages (including invalid) + "websocket_messages": self.websocket_messages, "natural_language": self.natural_language_requests, # Valid NL commands "commands": self.command_requests, # Valid regular commands }, @@ -132,7 +137,9 @@ def _calculate_latency_stats(self, latencies: list[float]) -> dict[str, float]: "min": round(min(latencies), 2), "max": round(max(latencies), 2), "p50": round(sorted_latencies[n // 2], 2), - "p95": round(sorted_latencies[int(n * 0.95)] if n >= 20 else sorted_latencies[-1], 2), + "p95": round( + sorted_latencies[int(n * 0.95)] if n >= 20 else sorted_latencies[-1], 2 + ), } @@ -262,7 +269,9 @@ async def websocket_handler(websocket: WebSocket) -> None: if is_nl and session.llm_client: metrics.natural_language_requests += 1 llm_start = time.perf_counter() - result = await asyncio.to_thread(session.execute_natural_language, command) + result = await asyncio.to_thread( + session.execute_natural_language, command + ) llm_latency = (time.perf_counter() - llm_start) * 1000 metrics.record_llm_request(llm_latency) else: @@ -339,7 +348,9 @@ def open_session(self) -> GatewaySession: LOGGER.warning("LLM service unhealthy at %s", self._llm_service_url) if self._metrics: self._metrics.record_llm_error() - return GatewaySession(backend, limits=self._config.limits, llm_client=llm_client) + return GatewaySession( + backend, limits=self._config.limits, llm_client=llm_client + ) async def _receive_message(websocket: WebSocket) -> dict[str, str | bool] | None: diff --git a/tests/echoes/test_gateway_service.py b/tests/echoes/test_gateway_service.py index 5052b00e..c5f640cc 100644 --- a/tests/echoes/test_gateway_service.py +++ b/tests/echoes/test_gateway_service.py @@ -5,7 +5,11 @@ from fastapi.testclient import TestClient from gengine.echoes.cli.shell import LocalBackend -from gengine.echoes.gateway.app import GatewaySettings, GatewayMetrics, create_gateway_app +from gengine.echoes.gateway.app import ( + GatewayMetrics, + GatewaySettings, + create_gateway_app, +) from gengine.echoes.sim import SimEngine @@ -60,7 +64,10 @@ def test_gateway_metrics_endpoint(sim_config, gateway_settings) -> None: assert "llm_integration" in data -def test_gateway_metrics_track_websocket_connections(sim_config, gateway_settings) -> None: +def test_gateway_metrics_track_websocket_connections( + sim_config, + gateway_settings, +) -> None: """Verify that WebSocket connections are tracked in metrics.""" app = create_gateway_app( backend_factory=_local_backend_factory(sim_config), diff --git a/tests/echoes/test_llm_app.py b/tests/echoes/test_llm_app.py index 6cdbd003..93b27d5c 100644 --- a/tests/echoes/test_llm_app.py +++ b/tests/echoes/test_llm_app.py @@ -5,7 +5,10 @@ from fastapi.testclient import TestClient from prometheus_client import generate_latest -from gengine.echoes.llm.app import create_llm_app, LLMMetrics +from gengine.echoes.llm.app import ( + LLMMetrics, + create_llm_app, +) from gengine.echoes.llm.settings import LLMSettings @@ -218,7 +221,9 @@ class TestLLMMetrics: def test_record_parse_intent(self) -> None: """Recording a parse_intent request increments counters.""" metrics = LLMMetrics() - metrics.record_parse_intent(0.050, input_tokens=100, output_tokens=50) # 50ms in seconds + metrics.record_parse_intent( + 0.050, input_tokens=100, output_tokens=50 + ) # 50ms in seconds output = generate_latest(metrics.registry).decode("utf-8") assert "llm_requests_total 1.0" in output @@ -229,7 +234,9 @@ def test_record_parse_intent(self) -> None: def test_record_narrate(self) -> None: """Recording a narrate request increments counters.""" metrics = LLMMetrics() - metrics.record_narrate(0.075, input_tokens=200, output_tokens=100) # 75ms in seconds + metrics.record_narrate( + 0.075, input_tokens=200, output_tokens=100 + ) # 75ms in seconds output = generate_latest(metrics.registry).decode("utf-8") assert "llm_requests_total 1.0" in output @@ -245,7 +252,11 @@ def test_record_error(self) -> None: output = generate_latest(metrics.registry).decode("utf-8") assert "llm_errors_total 1.0" in output assert "llm_parse_intent_errors_total 1.0" in output - assert 'llm_errors_by_type_total{endpoint="parse_intent",error_type="ValueError"} 1.0' in output + expected_metric = ( + 'llm_errors_by_type_total{endpoint="parse_intent",error_type="ValueError"} ' + '1.0' + ) + assert expected_metric in output def test_record_narrate_error(self) -> None: """Recording a narrate error increments narrate error counter.""" From 879593d7900abbd7885f7fba555d270955acc81e Mon Sep 17 00:00:00 2001 From: Ross Gardler Date: Tue, 2 Dec 2025 19:09:51 -0800 Subject: [PATCH 14/16] Fix: Ensure namespace is created before validating k8s/base manifests in dry-run workflow --- .github/workflows/k8s-validation.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/k8s-validation.yml b/.github/workflows/k8s-validation.yml index 3ed2e699..82d015c6 100644 --- a/.github/workflows/k8s-validation.yml +++ b/.github/workflows/k8s-validation.yml @@ -87,6 +87,10 @@ jobs: kubectl cluster-info kubectl get nodes + - name: Create gengine namespace + run: | + kubectl apply --dry-run=server -f k8s/base/namespace.yaml + - name: Dry-run validate base manifests run: | echo "Validating k8s/base with --dry-run=server..." From f9ec85cb9f156ca19d2757939dc76244ba6ae9b8 Mon Sep 17 00:00:00 2001 From: Ross Gardler Date: Tue, 2 Dec 2025 19:17:50 -0800 Subject: [PATCH 15/16] Fix: Apply namespace manifest for real before dry-run validation to resolve NotFound errors --- .github/workflows/k8s-validation.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/k8s-validation.yml b/.github/workflows/k8s-validation.yml index 82d015c6..10e15cae 100644 --- a/.github/workflows/k8s-validation.yml +++ b/.github/workflows/k8s-validation.yml @@ -87,9 +87,9 @@ jobs: kubectl cluster-info kubectl get nodes - - name: Create gengine namespace + - name: Create gengine namespace (real) run: | - kubectl apply --dry-run=server -f k8s/base/namespace.yaml + kubectl apply -f k8s/base/namespace.yaml - name: Dry-run validate base manifests run: | From 7b4999237d0e5e84721d7e1f98631ccb39b11c9d Mon Sep 17 00:00:00 2001 From: Ross Gardler Date: Tue, 2 Dec 2025 19:20:47 -0800 Subject: [PATCH 16/16] Review: Confirm workflow applies namespace before dry-run validation. Ready for CI. --- .github/workflows/k8s-validation.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/k8s-validation.yml b/.github/workflows/k8s-validation.yml index 10e15cae..a5679713 100644 --- a/.github/workflows/k8s-validation.yml +++ b/.github/workflows/k8s-validation.yml @@ -12,12 +12,12 @@ on: branches: - main paths: - - 'k8s/**/*.yaml' - - '.github/workflows/k8s-*.yml' + - "k8s/**/*.yaml" + - ".github/workflows/k8s-*.yml" pull_request: paths: - - 'k8s/**/*.yaml' - - '.github/workflows/k8s-*.yml' + - "k8s/**/*.yaml" + - ".github/workflows/k8s-*.yml" permissions: contents: read