From a1fc97db3f6d8130779ed10f63a92377883518bd Mon Sep 17 00:00:00 2001 From: nuwangeek Date: Fri, 19 Jun 2026 15:34:25 +0530 Subject: [PATCH] Added centrlized logging using loki and grafana --- DSL/CronManager/script/api_tool_indexer.sh | 2 +- docker-compose-ec2.yml | 5 +- docker-compose.yml | 5 +- generate_presigned_url.py | 64 ------ .../grafana-dashboard-deployment.json | 4 +- grafana-configs/loki_logger.py | 117 ++++++++-- src/api_tool_indexer/main_indexer.py | 18 +- src/api_tool_indexer/qdrant_manager.py | 5 +- src/contextual_retrieval/bm25_search.py | 8 +- src/contextual_retrieval/config.py | 6 +- .../contextual_retrieval_api_client.py | 6 +- .../contextual_retriever.py | 6 +- src/contextual_retrieval/error_handler.py | 5 +- .../provider_detection.py | 6 +- src/contextual_retrieval/qdrant_search.py | 9 +- src/contextual_retrieval/rank_fusion.py | 8 +- src/guardrails/dspy_nemo_adapter.py | 5 +- src/guardrails/nemo_rails_adapter.py | 6 +- src/guardrails/optimized_guardrails_loader.py | 5 +- src/intent_data_enrichment/api_client.py | 5 +- src/intent_data_enrichment/main_enrichment.py | 6 +- src/intent_data_enrichment/qdrant_manager.py | 5 +- src/llm_orchestration_service.py | 5 +- src/llm_orchestration_service_api.py | 5 +- src/llm_orchestrator_config/config/loader.py | 5 +- .../context_manager.py | 6 +- .../embedding_manager.py | 6 +- src/llm_orchestrator_config/feature_flags.py | 5 +- .../vault/secret_resolver.py | 5 +- .../vault/vault_client.py | 5 +- src/loki_logger.py | 180 ++++++++++++++++ src/models/request_models.py | 11 +- src/optimization/metrics/generator_metrics.py | 5 +- .../metrics/guardrails_metrics.py | 5 +- src/optimization/metrics/refiner_metrics.py | 5 +- .../optimization_scripts/check_paths.py | 5 +- .../diagnose_guardrails_loader.py | 5 +- .../extract_guardrails_prompts.py | 5 +- .../inspect_guardrails_optimization.py | 5 +- .../run_all_optimizations.py | 6 +- .../optimization_scripts/split_datasets.py | 4 +- src/optimization/optimized_module_loader.py | 5 +- .../optimizers/generator_optimizer.py | 6 +- .../optimizers/guardrails_optimizer.py | 5 +- .../optimizers/refiner_optimizer.py | 6 +- src/prompt_refine_manager/prompt_refiner.py | 5 +- src/response_generator/response_generate.py | 9 +- src/tool_classifier/agentic_loop.py | 109 +++++----- src/tool_classifier/api_caller.py | 62 +++--- src/tool_classifier/api_response_formatter.py | 5 +- src/tool_classifier/api_semantic_searcher.py | 5 +- src/tool_classifier/classifier.py | 8 +- src/tool_classifier/context_analyzer.py | 4 +- src/tool_classifier/follow_up_detector.py | 30 ++- src/tool_classifier/intent_decomposer.py | 4 +- src/tool_classifier/intent_detector.py | 6 +- src/tool_classifier/multi_agentic_loop.py | 203 ++++++++++-------- src/tool_classifier/multi_api_caller.py | 45 ++-- .../multi_response_formatter.py | 5 +- src/tool_classifier/param_extractor.py | 78 +++++-- .../workflows/api_tool_workflow.py | 4 +- .../workflows/context_workflow.py | 5 +- src/tool_classifier/workflows/ood_workflow.py | 5 +- src/tool_classifier/workflows/rag_workflow.py | 5 +- .../workflows/service_workflow.py | 5 +- src/utils/api_tool_session_store.py | 47 ++-- src/utils/atc_cache_store.py | 35 +-- src/utils/budget_tracker.py | 5 +- src/utils/connection_id_fetcher.py | 5 +- src/utils/cost_utils.py | 5 +- src/utils/decrypt_vault_secrets.py | 18 +- src/utils/input_sanitizer.py | 5 +- src/utils/language_detector.py | 5 +- src/utils/production_store.py | 6 +- src/utils/prompt_config_loader.py | 5 +- src/utils/rate_limiter.py | 6 +- src/utils/redis_client.py | 6 +- src/utils/stream_manager.py | 5 +- src/utils/time_tracker.py | 5 +- src/vector_indexer/api_client.py | 5 +- src/vector_indexer/config/config_loader.py | 5 +- src/vector_indexer/contextual_processor.py | 5 +- src/vector_indexer/dataset_download.py | 5 +- .../diff_identifier/diff_detector.py | 18 +- .../diff_identifier/s3_ferry_client.py | 6 +- .../diff_identifier/version_manager.py | 6 +- src/vector_indexer/document_loader.py | 5 +- src/vector_indexer/error_logger.py | 28 +-- src/vector_indexer/loki_logger.py | 172 +++++++++++++++ src/vector_indexer/main_indexer.py | 35 +-- src/vector_indexer/qdrant_manager.py | 22 +- tests/api_tool_eval/test-endpoints.json | 12 +- 92 files changed, 1135 insertions(+), 555 deletions(-) delete mode 100644 generate_presigned_url.py create mode 100644 src/loki_logger.py diff --git a/DSL/CronManager/script/api_tool_indexer.sh b/DSL/CronManager/script/api_tool_indexer.sh index 9697697f..5c1507c1 100644 --- a/DSL/CronManager/script/api_tool_indexer.sh +++ b/DSL/CronManager/script/api_tool_indexer.sh @@ -36,7 +36,7 @@ echo "[PACKAGES] Installing required packages..." "$UV_BIN" pip install --python "$VENV_PATH/bin/python3" "httpx>=0.27.0" || exit 1 "$UV_BIN" pip install --python "$VENV_PATH/bin/python3" "pydantic>=2.11.7" || exit 1 "$UV_BIN" pip install --python "$VENV_PATH/bin/python3" "qdrant-client>=1.15.1" || exit 1 -"$UV_BIN" pip install --python "$VENV_PATH/bin/python3" "loguru>=0.7.3" || exit 1 +"$UV_BIN" pip install --python "$VENV_PATH/bin/python3" "requests>=2.32.5" || exit 1 echo "[PACKAGES] All packages installed successfully" diff --git a/docker-compose-ec2.yml b/docker-compose-ec2.yml index 1176e3dd..51f41549 100644 --- a/docker-compose-ec2.yml +++ b/docker-compose-ec2.yml @@ -184,6 +184,8 @@ services: - ./src/tool_classifier:/app/src/tool_classifier - ./src/intent_data_enrichment:/app/src/intent_data_enrichment - ./src/api_tool_indexer:/app/src/api_tool_indexer + - ./src/__init__.py:/app/src/__init__.py:ro + - ./src/loki_logger.py:/app/src/loki_logger.py:ro - ./src/utils/decrypt_vault_secrets.py:/app/src/utils/decrypt_vault_secrets.py:ro # Decryption utility (read-only) - cron_data:/app/data - shared-volume:/app/shared # Access to shared resources for cross-container coordination @@ -192,7 +194,7 @@ services: - ./.env:/app/.env:ro environment: - server.port=9010 - - PYTHONPATH=/app:/app/src/vector_indexer:/app/src/intent_data_enrichment:/app/src/api_tool_indexer + - PYTHONPATH=/app:/app/src:/app/src/vector_indexer:/app/src/intent_data_enrichment:/app/src/api_tool_indexer - VAULT_AGENT_URL=http://vault-agent-cron:8203 ports: - 9010:8080 @@ -645,6 +647,7 @@ services: - ./src/llm_config_module/config:/app/src/llm_config_module/config:ro - ./src/optimization/optimized_modules:/app/src/optimization/optimized_modules - llm_orchestration_logs:/app/logs + - ./grafana-configs/loki_logger.py:/app/src/loki_logger.py:ro networks: - bykstack depends_on: diff --git a/docker-compose.yml b/docker-compose.yml index b64224f1..71a0028d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -184,6 +184,8 @@ services: - ./src/tool_classifier:/app/src/tool_classifier - ./src/intent_data_enrichment:/app/src/intent_data_enrichment - ./src/api_tool_indexer:/app/src/api_tool_indexer + - ./src/__init__.py:/app/src/__init__.py:ro + - ./src/loki_logger.py:/app/src/loki_logger.py:ro - ./src/utils/decrypt_vault_secrets.py:/app/src/utils/decrypt_vault_secrets.py:ro # Decryption utility (read-only) - cron_data:/app/data - shared-volume:/app/shared # Access to shared resources for cross-container coordination @@ -192,7 +194,7 @@ services: - ./.env:/app/.env:ro environment: - server.port=9010 - - PYTHONPATH=/app:/app/src/vector_indexer:/app/src/intent_data_enrichment:/app/src/api_tool_indexer + - PYTHONPATH=/app:/app/src:/app/src/vector_indexer:/app/src/intent_data_enrichment:/app/src/api_tool_indexer - vaultAgentUrl=http://vault-agent-cron:8203 ports: - 9010:8080 @@ -594,6 +596,7 @@ services: - ./src/optimization/optimized_modules:/app/src/optimization/optimized_modules - llm_orchestration_logs:/app/logs - ./tests:/app/tests # mount tests directory (excluded from image via .dockerignore) + - ./grafana-configs/loki_logger.py:/app/src/loki_logger.py:ro networks: - bykstack depends_on: diff --git a/generate_presigned_url.py b/generate_presigned_url.py deleted file mode 100644 index 61028beb..00000000 --- a/generate_presigned_url.py +++ /dev/null @@ -1,64 +0,0 @@ -import boto3 -from botocore.client import Config -from typing import List, Dict -from loguru import logger - -# Create S3 client for MinIO -s3_client = boto3.client( - "s3", - endpoint_url="http://minio:9000", # Replace with your MinIO URL - aws_access_key_id="minioadmin", # Replace with your access key - aws_secret_access_key="minioadmin", # Replace with your secret key - config=Config(signature_version="s3v4"), # Hardcoded signature version - region_name="us-east-1", # MinIO usually works with any region -) - -# List of files to process -files_to_process: List[Dict[str, str]] = [ - {"bucket": "ckb", "key": "ID.ee/ID.zip"}, -] - -# Generate presigned URLs -presigned_urls: List[str] = [] - -logger.info("Generating presigned URLs...") -for file_info in files_to_process: - try: - url = s3_client.generate_presigned_url( - ClientMethod="get_object", - Params={"Bucket": file_info["bucket"], "Key": file_info["key"]}, - ExpiresIn=24 * 3600, # 4 hours in seconds - ) - presigned_urls.append(url) - logger.success(f"Generated URL for: {file_info['key']}") - logger.info(f" URL: {url}") - except Exception as e: - logger.error(f"Failed to generate URL for: {file_info['key']}") - logger.error(f" Error: {str(e)}") - -output_file: str = "minio_presigned_urls.txt" - -try: - with open(output_file, "w") as f: - # Write URLs separated by ||| delimiter (for your script) - url_string: str = "|||".join(presigned_urls) - f.write(url_string) - f.write("\n\n") - - # Also write each URL on separate lines for readability - f.write("Individual URLs:\n") - f.write("=" * 50 + "\n") - for i, url in enumerate(presigned_urls, 1): - f.write(f"URL {i}:\n{url}\n\n") - - logger.success(f"Presigned URLs saved to: {output_file}") - logger.info(f"Total URLs generated: {len(presigned_urls)}") - - # Display the combined URL string for easy copying - if presigned_urls: - logger.info("Combined URL string (for signedUrls environment variable):") - logger.info("=" * 60) - logger.info("|||".join(presigned_urls)) - -except Exception as e: - logger.error(f"Failed to save URLs to file: {str(e)}") diff --git a/grafana-configs/grafana-dashboard-deployment.json b/grafana-configs/grafana-dashboard-deployment.json index a1e469f2..549b8035 100644 --- a/grafana-configs/grafana-dashboard-deployment.json +++ b/grafana-configs/grafana-dashboard-deployment.json @@ -1,7 +1,7 @@ { "id": null, "title": "RAG Module Orchestrator", - "tags": ["deployment", "models", "triton"], + "tags": ["rag-module", "loki-logs", "llm-orchestration", "vector-search"], "timezone": "browser", "refresh": "30s", "time": { @@ -15,7 +15,7 @@ "type": "query", "label": "Service Name", "refresh": 1, - "query": "label_values(service)", + "query": "label_values({}, service)", "datasource": { "type": "loki", "uid": "loki-datasource" diff --git a/grafana-configs/loki_logger.py b/grafana-configs/loki_logger.py index e90dd059..59a901bc 100644 --- a/grafana-configs/loki_logger.py +++ b/grafana-configs/loki_logger.py @@ -1,19 +1,30 @@ #!/usr/bin/env python3 """ -Loki Logger for Global Classifier +Loki Logger for RAG Module Sends logs directly to Loki API for centralized logging """ import json -import socket import time from datetime import datetime +from threading import Thread +from queue import Full, Queue import requests class LokiLogger: - """Simple logger that sends logs directly to Loki API""" + """Simple logger that sends logs directly to Loki API with async background thread""" + + _instances: dict[str, "LokiLogger"] = {} + + def __new__( + cls, loki_url: str = "http://loki:3100", service_name: str = "default" + ) -> "LokiLogger": + key = f"{loki_url}:{service_name}" + if key not in cls._instances: + cls._instances[key] = super().__new__(cls) + return cls._instances[key] def __init__( self, loki_url: str = "http://loki:3100", service_name: str = "default" @@ -25,15 +36,40 @@ def __init__( loki_url: URL for Loki service (default: container URL in bykstack network) service_name: Name of the service for labeling logs """ + if hasattr(self, "_initialized"): + return + self._initialized = True self.loki_url = loki_url self.service_name = service_name - self.hostname = socket.gethostname() self.session = requests.Session() # Set default timeout for all requests self.timeout = 5 - def _send_to_loki(self, level: str, message: str) -> None: - """Send log entry directly to Loki API""" + # Queue for async log processing (bounded to avoid unbounded memory growth under load) + self.log_queue: Queue[tuple[str, str]] = Queue(maxsize=10_000) + + # Start background worker thread + self.worker_thread = Thread(target=self._process_logs, daemon=True) + self.worker_thread.start() + + def _process_logs(self) -> None: + """Background worker that processes log queue""" + while True: + try: + # Get log entry from queue (blocking) + level, message = self.log_queue.get() + + # Send to Loki + self._send_to_loki_sync(level, message) + + # Mark task as done + self.log_queue.task_done() + except Exception: + # Silently ignore errors in background thread + pass + + def _send_to_loki_sync(self, level: str, message: str) -> None: + """Send log entry directly to Loki API (called from background thread)""" try: # Create timestamp in nanoseconds (Loki requirement) timestamp_ns = str(int(time.time() * 1_000_000_000)) @@ -42,15 +78,12 @@ def _send_to_loki(self, level: str, message: str) -> None: labels = { "service": self.service_name, "level": level, - "hostname": self.hostname, } # Create log entry log_entry = { - "timestamp": datetime.now().isoformat(), "level": level, "message": message, - "hostname": self.hostname, "service": self.service_name, } @@ -64,7 +97,7 @@ def _send_to_loki(self, level: str, message: str) -> None: ] } - # Send to Loki (non-blocking, fire-and-forget) + # Send to Loki self.session.post( f"{self.loki_url}/loki/api/v1/push", json=payload, @@ -76,18 +109,64 @@ def _send_to_loki(self, level: str, message: str) -> None: # Silently ignore logging errors to not affect main application pass - # Also print to console for immediate feedback + def _log(self, level: str, message: str) -> None: + """Queue log entry for async processing (non-blocking)""" + # Print to console immediately for real-time feedback timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"[{timestamp}] {level: <8} | {message}") # noqa: T201 - def info(self, message: str) -> None: - self._send_to_loki("INFO", message) + # Queue for async Loki sending (non-blocking) + try: + self.log_queue.put_nowait((level, message)) + except Full: + # Queue full (Loki may be slow/unreachable) - drop log to avoid blocking + pass + + def info(self, message: str, **kwargs: object) -> None: + """Log info message. Extra kwargs (extra, exc_info) are ignored for compatibility.""" + self._log("INFO", message) + + def error(self, message: str, **kwargs: object) -> None: + """Log error message. Extra kwargs (extra, exc_info) are ignored for compatibility.""" + self._log("ERROR", message) + + def warning(self, message: str, **kwargs: object) -> None: + """Log warning message. Extra kwargs (extra, exc_info) are ignored for compatibility.""" + self._log("WARNING", message) + + def debug(self, message: str, **kwargs: object) -> None: + """Log debug message. Extra kwargs (extra, exc_info) are ignored for compatibility.""" + self._log("DEBUG", message) + + def success(self, message: str, **kwargs: object) -> None: + """Log success message (loguru compatibility). Extra kwargs ignored.""" + self._log("SUCCESS", message) + + def critical(self, message: str, **kwargs: object) -> None: + """Log critical message. Extra kwargs (extra, exc_info) are ignored for compatibility.""" + self._log("CRITICAL", message) + + def exception(self, message: str, **kwargs: object) -> None: + """Log exception message. Extra kwargs (extra, exc_info) are ignored for compatibility.""" + self._log("EXCEPTION", message) + + def add(self, *args: object, **kwargs: object) -> None: + """ + No-op method for loguru compatibility. + + LokiLogger sends logs to Loki/console only, not to files. + This method exists for backward compatibility with loguru code. + """ + pass # Silently ignore - logs go to Loki instead of files - def error(self, message: str) -> None: - self._send_to_loki("ERROR", message) + def remove(self, *args: object, **kwargs: object) -> None: + """No-op method for loguru compatibility.""" + pass # Silently ignore - def warning(self, message: str) -> None: - self._send_to_loki("WARNING", message) + def bind(self, **kwargs: object) -> "LokiLogger": + """No-op method for loguru compatibility. Returns self for chaining.""" + return self # Allow method chaining - def debug(self, message: str) -> None: - self._send_to_loki("DEBUG", message) + def opt(self, **kwargs: object) -> "LokiLogger": + """No-op method for loguru compatibility. Returns self for chaining.""" + return self # Allow method chaining diff --git a/src/api_tool_indexer/main_indexer.py b/src/api_tool_indexer/main_indexer.py index c5677c1b..5ec38565 100644 --- a/src/api_tool_indexer/main_indexer.py +++ b/src/api_tool_indexer/main_indexer.py @@ -28,7 +28,8 @@ import asyncio import argparse from typing import List -from loguru import logger + +from src.loki_logger import LokiLogger from api_tool_indexer.constants import ApiToolIndexerConstants from api_tool_indexer.models import EndpointData, EnrichedEndpoint, IndexingResult @@ -37,8 +38,7 @@ # Reuse LLMAPIClient from intent_data_enrichment. from intent_data_enrichment.api_client import LLMAPIClient -# Reuse sparse encoder from tool_classifier (shared BM25 implementation). -sys.path.insert(0, "/app/src") +logger = LokiLogger(service_name="api-tool-calling") try: from tool_classifier.sparse_encoder import compute_sparse_vector except ImportError: @@ -139,9 +139,8 @@ async def _generate_context_for_endpoint( ) logger.debug( - "Generated context prompt for endpoint '{}': {} chars", - endpoint_data.endpoint_id, - len(context_prompt), + f"Generated context prompt for endpoint '{endpoint_data.endpoint_id}': " + f"{len(context_prompt)} chars" ) # context_type="api_tool" makes context_manager use API_TOOL_CONTEXT_PROMPT, @@ -175,11 +174,10 @@ async def _generate_context_for_endpoint( context = result.get("context", "").strip() - logger.debug( - "context preview: {}{}", - context[:200].replace("\n", "\\n"), - "..." if len(context) > 200 else "", + _preview = context[:200].replace("\n", "\\n") + ( + "..." if len(context) > 200 else "" ) + logger.debug(f"context preview: {_preview}") if not context: raise ValueError("Empty context returned from API") diff --git a/src/api_tool_indexer/qdrant_manager.py b/src/api_tool_indexer/qdrant_manager.py index 984d695d..bd572661 100644 --- a/src/api_tool_indexer/qdrant_manager.py +++ b/src/api_tool_indexer/qdrant_manager.py @@ -4,7 +4,8 @@ import uuid from typing import Any, Dict, List, Optional -from loguru import logger + +from src.loki_logger import LokiLogger from qdrant_client import QdrantClient from qdrant_client.models import ( Distance, @@ -22,6 +23,8 @@ from api_tool_indexer.constants import ApiToolIndexerConstants from api_tool_indexer.models import EnrichedEndpoint +logger = LokiLogger(service_name="api-tool-calling") + # Error messages _CLIENT_NOT_INITIALIZED = "Qdrant client not initialized" diff --git a/src/contextual_retrieval/bm25_search.py b/src/contextual_retrieval/bm25_search.py index 7ec8ea9a..ef7b3831 100644 --- a/src/contextual_retrieval/bm25_search.py +++ b/src/contextual_retrieval/bm25_search.py @@ -6,10 +6,11 @@ """ from typing import List, Dict, Any, Optional, Set, TYPE_CHECKING -from loguru import logger +from src.loki_logger import LokiLogger from rank_bm25 import BM25Okapi import re import asyncio + from contextual_retrieval.contextual_retrieval_api_client import get_http_client_manager from contextual_retrieval.error_handler import SecureErrorHandler from contextual_retrieval.constants import ( @@ -20,6 +21,9 @@ ) from contextual_retrieval.config import ConfigLoader, ContextualRetrievalConfig +# Initialize Loki logger +logger = LokiLogger(service_name="bm25-search") + if TYPE_CHECKING: from contextual_retrieval.contextual_retrieval_api_client import HTTPClientManager @@ -165,7 +169,7 @@ async def search_bm25( logger.info(f"BM25 search found {len(results)} chunks") - # Detailed results at DEBUG level (loguru filters based on log level config) + # Detailed results at DEBUG level (filters based on log level config) logger.debug("=== BM25 SEARCH RESULTS BREAKDOWN ===") for i, chunk in enumerate(results[:10]): # Show top 10 results content_preview = ( diff --git a/src/contextual_retrieval/config.py b/src/contextual_retrieval/config.py index 49f78ef8..7e811e50 100644 --- a/src/contextual_retrieval/config.py +++ b/src/contextual_retrieval/config.py @@ -9,7 +9,8 @@ from typing import List import yaml from pathlib import Path -from loguru import logger +from src.loki_logger import LokiLogger + from contextual_retrieval.constants import ( HttpClientConstants, SearchConstants, @@ -17,6 +18,9 @@ BM25Constants, ) +# Initialize Loki logger +logger = LokiLogger(service_name="contextual-retrieval-config") + class HttpClientConfig(BaseModel): """HTTP client configuration.""" diff --git a/src/contextual_retrieval/contextual_retrieval_api_client.py b/src/contextual_retrieval/contextual_retrieval_api_client.py index 0de14558..f962e3d6 100644 --- a/src/contextual_retrieval/contextual_retrieval_api_client.py +++ b/src/contextual_retrieval/contextual_retrieval_api_client.py @@ -8,8 +8,9 @@ import asyncio from typing import Optional, Dict, Any import httpx -from loguru import logger +from src.loki_logger import LokiLogger import time + from contextual_retrieval.error_handler import SecureErrorHandler from contextual_retrieval.constants import ( HttpClientConstants, @@ -20,6 +21,9 @@ ) from contextual_retrieval.config import ConfigLoader, ContextualRetrievalConfig +# Initialize Loki logger +logger = LokiLogger(service_name="contextual-retrieval-api-client") + class ServiceResilienceManager: """Service resilience manager with circuit breaker functionality for HTTP requests.""" diff --git a/src/contextual_retrieval/contextual_retriever.py b/src/contextual_retrieval/contextual_retriever.py index bdb61eb1..7f0f5796 100644 --- a/src/contextual_retrieval/contextual_retriever.py +++ b/src/contextual_retrieval/contextual_retriever.py @@ -11,10 +11,11 @@ """ from typing import List, Dict, Any, Optional, Union, TYPE_CHECKING -from loguru import logger +from src.loki_logger import LokiLogger import asyncio import time from langfuse import observe + from contextual_retrieval.config import ConfigLoader, ContextualRetrievalConfig # Type checking import to avoid circular dependency at runtime @@ -26,6 +27,9 @@ from contextual_retrieval.bm25_search import SmartBM25Search from contextual_retrieval.rank_fusion import DynamicRankFusion +# Initialize Loki logger +logger = LokiLogger(service_name="contextual-retriever") + class ContextualRetriever: """ diff --git a/src/contextual_retrieval/error_handler.py b/src/contextual_retrieval/error_handler.py index 08fac2e7..650f4402 100644 --- a/src/contextual_retrieval/error_handler.py +++ b/src/contextual_retrieval/error_handler.py @@ -8,9 +8,12 @@ import re from typing import Dict, Any, Optional, Union from urllib.parse import urlparse, urlunparse -from loguru import logger +from src.loki_logger import LokiLogger import httpx +# Initialize Loki logger +logger = LokiLogger(service_name="error-handler") + class SecureErrorHandler: """ diff --git a/src/contextual_retrieval/provider_detection.py b/src/contextual_retrieval/provider_detection.py index 8abb4d18..78796b85 100644 --- a/src/contextual_retrieval/provider_detection.py +++ b/src/contextual_retrieval/provider_detection.py @@ -8,7 +8,8 @@ """ from typing import List, Optional, Dict, Any, TYPE_CHECKING -from loguru import logger +from src.loki_logger import LokiLogger + from contextual_retrieval.contextual_retrieval_api_client import get_http_client_manager from contextual_retrieval.error_handler import SecureErrorHandler from contextual_retrieval.constants import ( @@ -18,6 +19,9 @@ ) from contextual_retrieval.config import ConfigLoader, ContextualRetrievalConfig +# Initialize Loki logger +logger = LokiLogger(service_name="provider-detection") + if TYPE_CHECKING: from contextual_retrieval.contextual_retrieval_api_client import HTTPClientManager diff --git a/src/contextual_retrieval/qdrant_search.py b/src/contextual_retrieval/qdrant_search.py index 31515f32..28f45d89 100644 --- a/src/contextual_retrieval/qdrant_search.py +++ b/src/contextual_retrieval/qdrant_search.py @@ -6,8 +6,10 @@ """ from typing import List, Dict, Any, Optional, Protocol, TYPE_CHECKING -from loguru import logger +from src.loki_logger import LokiLogger + import asyncio + from contextual_retrieval.contextual_retrieval_api_client import get_http_client_manager from contextual_retrieval.error_handler import SecureErrorHandler from contextual_retrieval.constants import ( @@ -17,6 +19,9 @@ ) from contextual_retrieval.config import ConfigLoader, ContextualRetrievalConfig +# Initialize Loki logger +logger = LokiLogger(service_name="qdrant-search") + if TYPE_CHECKING: from contextual_retrieval.contextual_retrieval_api_client import HTTPClientManager @@ -151,7 +156,7 @@ async def search_contextual_embeddings_direct( f"Semantic search found {len(all_results)} chunks across {len(collections)} collections" ) - # Detailed results at DEBUG level (loguru filters based on log level config) + # Detailed results at DEBUG level (filters based on log level config) logger.debug("=== SEMANTIC SEARCH RESULTS BREAKDOWN ===") for i, chunk in enumerate(all_results[:10]): # Show top 10 results content_preview = ( diff --git a/src/contextual_retrieval/rank_fusion.py b/src/contextual_retrieval/rank_fusion.py index acea0aa2..d6a18ab9 100644 --- a/src/contextual_retrieval/rank_fusion.py +++ b/src/contextual_retrieval/rank_fusion.py @@ -6,10 +6,14 @@ """ from typing import List, Dict, Any, Optional -from loguru import logger +from src.loki_logger import LokiLogger + from contextual_retrieval.constants import QueryTypeConstants from contextual_retrieval.config import ConfigLoader, ContextualRetrievalConfig +# Initialize Loki logger +logger = LokiLogger(service_name="rank-fusion") + class DynamicRankFusion: """Dynamic score fusion without hardcoded collection weights.""" @@ -65,7 +69,7 @@ def fuse_results( logger.info(f"Fusion completed: {len(final_results)} final results") - # Detailed results at DEBUG level (loguru filters based on log level config) + # Detailed results at DEBUG level (filters based on log level config) logger.debug("=== RANK FUSION FINAL RESULTS ===") for i, chunk in enumerate(final_results): content_preview_len = self._config.rank_fusion.content_preview_length diff --git a/src/guardrails/dspy_nemo_adapter.py b/src/guardrails/dspy_nemo_adapter.py index 74a13c84..700f7577 100644 --- a/src/guardrails/dspy_nemo_adapter.py +++ b/src/guardrails/dspy_nemo_adapter.py @@ -7,7 +7,7 @@ from typing import Any, Dict, List, Optional, Union, cast, Iterator, AsyncIterator import asyncio import dspy -from loguru import logger +from src.loki_logger import LokiLogger from langfuse import observe from src.utils.observation_utils import update_observation_safe @@ -20,6 +20,9 @@ from src.guardrails.guardrails_llm_configs import TEMPERATURE, MAX_TOKENS, MODEL_NAME from src.utils.cost_utils import get_lm_usage_since +# Initialize Loki logger +logger = LokiLogger(service_name="dspy-nemo-adapter") + class DSPyNeMoLLM(LLM): """ diff --git a/src/guardrails/nemo_rails_adapter.py b/src/guardrails/nemo_rails_adapter.py index e9f931d8..fa8e98fd 100644 --- a/src/guardrails/nemo_rails_adapter.py +++ b/src/guardrails/nemo_rails_adapter.py @@ -1,8 +1,7 @@ from typing import Any, Dict, Optional, AsyncIterator, cast, Type import asyncio -from loguru import logger +from src.loki_logger import LokiLogger from pydantic import BaseModel, Field - from nemoguardrails import LLMRails, RailsConfig from nemoguardrails.llm.providers import register_llm_provider from langchain_core.language_models.llms import BaseLLM @@ -13,6 +12,9 @@ import dspy import re +# Initialize Loki logger +logger = LokiLogger(service_name="nemo-rails-adapter") + class GuardrailCheckResult(BaseModel): """Result from a guardrail check.""" diff --git a/src/guardrails/optimized_guardrails_loader.py b/src/guardrails/optimized_guardrails_loader.py index aef76727..51a65906 100644 --- a/src/guardrails/optimized_guardrails_loader.py +++ b/src/guardrails/optimized_guardrails_loader.py @@ -6,7 +6,10 @@ from pathlib import Path from typing import Optional, Dict, Any, Tuple import json -from loguru import logger +from src.loki_logger import LokiLogger + +# Initialize Loki logger +logger = LokiLogger(service_name="optimized-guardrails-loader") class OptimizedGuardrailsLoader: diff --git a/src/intent_data_enrichment/api_client.py b/src/intent_data_enrichment/api_client.py index 31ed96e2..081d0fb3 100644 --- a/src/intent_data_enrichment/api_client.py +++ b/src/intent_data_enrichment/api_client.py @@ -4,11 +4,14 @@ import httpx from typing import List, Optional from types import TracebackType -from loguru import logger +from src.loki_logger import LokiLogger from intent_data_enrichment.constants import EnrichmentConstants from intent_data_enrichment.models import ServiceData +# Initialize Loki logger +logger = LokiLogger(service_name="intent-enrichment-api-client") + class LLMAPIClient: """Client for calling LLM Orchestration Service endpoints.""" diff --git a/src/intent_data_enrichment/main_enrichment.py b/src/intent_data_enrichment/main_enrichment.py index b96e0d2f..9aff2959 100644 --- a/src/intent_data_enrichment/main_enrichment.py +++ b/src/intent_data_enrichment/main_enrichment.py @@ -14,13 +14,17 @@ import json import argparse import asyncio + from typing import List -from loguru import logger +from src.loki_logger import LokiLogger from intent_data_enrichment.models import ServiceData, EnrichedService, EnrichmentResult from intent_data_enrichment.api_client import LLMAPIClient from intent_data_enrichment.qdrant_manager import QdrantManager +# Initialize Loki logger +logger = LokiLogger(service_name="intent-enrichment-main") + # Import sparse encoder from tool_classifier (shared module) sys.path.insert(0, "/app/src") try: diff --git a/src/intent_data_enrichment/qdrant_manager.py b/src/intent_data_enrichment/qdrant_manager.py index d5593836..0bf0c744 100644 --- a/src/intent_data_enrichment/qdrant_manager.py +++ b/src/intent_data_enrichment/qdrant_manager.py @@ -2,7 +2,7 @@ import uuid from typing import Optional, List -from loguru import logger +from src.loki_logger import LokiLogger from qdrant_client import QdrantClient from qdrant_client.models import ( Distance, @@ -20,6 +20,9 @@ from intent_data_enrichment.constants import EnrichmentConstants from intent_data_enrichment.models import EnrichedService +# Initialize Loki logger +logger = LokiLogger(service_name="intent-qdrant-manager") + # Error messages _CLIENT_NOT_INITIALIZED = "Qdrant client not initialized" diff --git a/src/llm_orchestration_service.py b/src/llm_orchestration_service.py index cb01a6a8..543cc96f 100644 --- a/src/llm_orchestration_service.py +++ b/src/llm_orchestration_service.py @@ -5,7 +5,7 @@ import time import asyncio import threading -from loguru import logger +from src.loki_logger import LokiLogger from langfuse import Langfuse, observe import dspy from datetime import datetime @@ -72,6 +72,9 @@ from src.tool_classifier.constants import SERVICE_STEP_PREFIXES from src.tool_classifier.workflows.service_workflow import ServiceWorkflowExecutor +# Initialize Loki logger for orchestration service +logger = LokiLogger(service_name="llm-orchestration-service") + class LangfuseConfig: """Configuration for Langfuse integration.""" diff --git a/src/llm_orchestration_service_api.py b/src/llm_orchestration_service_api.py index 50f94d9d..c136bdb7 100644 --- a/src/llm_orchestration_service_api.py +++ b/src/llm_orchestration_service_api.py @@ -9,7 +9,6 @@ from fastapi.responses import StreamingResponse, JSONResponse from fastapi.exceptions import RequestValidationError from pydantic import ValidationError -from loguru import logger import uvicorn from llm_orchestration_service import LLMOrchestrationService @@ -53,6 +52,10 @@ DeepEvalTestOrchestrationResponse, ) from src.utils.connection_id_fetcher import get_connection_id_fetcher +from src.loki_logger import LokiLogger + +# Initialize Loki logger for centralized logging +logger = LokiLogger(service_name="llm-orchestration-api") @asynccontextmanager diff --git a/src/llm_orchestrator_config/config/loader.py b/src/llm_orchestrator_config/config/loader.py index 2eb214a6..001e3060 100644 --- a/src/llm_orchestrator_config/config/loader.py +++ b/src/llm_orchestrator_config/config/loader.py @@ -7,7 +7,7 @@ import yaml from dotenv import load_dotenv -from loguru import logger +from src.loki_logger import LokiLogger from llm_orchestrator_config.config.schema import ( LLMConfiguration, @@ -27,6 +27,9 @@ # Constants DEFAULT_CONFIG_FILENAME = "llm_config.yaml" +# Initialize Loki logger +logger = LokiLogger(service_name="config-loader") + # Type alias for configuration values that can be processed ConfigValue = Union[str, Dict[str, Any], List[Any], int, float, bool, None] diff --git a/src/llm_orchestrator_config/context_manager.py b/src/llm_orchestrator_config/context_manager.py index 3bb6e5ac..1b6146a1 100644 --- a/src/llm_orchestrator_config/context_manager.py +++ b/src/llm_orchestrator_config/context_manager.py @@ -2,12 +2,14 @@ from typing import Any, Dict, Optional -from loguru import logger - +from src.loki_logger import LokiLogger from src.llm_orchestrator_config.llm_manager import LLMManager from src.models.request_models import ContextGenerationRequest from langfuse import observe +# Initialize Loki logger +logger = LokiLogger(service_name="context-manager") + class ContextGenerationManager: """Manager for context generation with Anthropic methodology.""" diff --git a/src/llm_orchestrator_config/embedding_manager.py b/src/llm_orchestrator_config/embedding_manager.py index 6c9bf277..3fcca9c9 100644 --- a/src/llm_orchestrator_config/embedding_manager.py +++ b/src/llm_orchestrator_config/embedding_manager.py @@ -6,13 +6,15 @@ import dspy import numpy as np -from loguru import logger +from src.loki_logger import LokiLogger from pydantic import BaseModel - from .vault.vault_client import VaultAgentClient from .config.loader import ConfigurationLoader from .exceptions import ConfigurationError +# Initialize Loki logger +logger = LokiLogger(service_name="embedding-manager") + class EmbeddingFailure(BaseModel): """Model for tracking embedding failures.""" diff --git a/src/llm_orchestrator_config/feature_flags.py b/src/llm_orchestrator_config/feature_flags.py index 1740004f..f0346a73 100644 --- a/src/llm_orchestrator_config/feature_flags.py +++ b/src/llm_orchestrator_config/feature_flags.py @@ -1,7 +1,10 @@ """Feature flags for tool classifier system.""" import os -from loguru import logger +from src.loki_logger import LokiLogger + +# Initialize Loki logger +logger = LokiLogger(service_name="feature-flags") class FeatureFlags: diff --git a/src/llm_orchestrator_config/vault/secret_resolver.py b/src/llm_orchestrator_config/vault/secret_resolver.py index 4eb8dc9b..01f615fc 100644 --- a/src/llm_orchestrator_config/vault/secret_resolver.py +++ b/src/llm_orchestrator_config/vault/secret_resolver.py @@ -4,7 +4,7 @@ from datetime import datetime, timedelta from typing import Optional, Dict, Any, Union, List from pydantic import BaseModel -from loguru import logger +from src.loki_logger import LokiLogger from llm_orchestrator_config.vault.vault_client import ( VaultAgentClient, @@ -17,6 +17,9 @@ ) from llm_orchestrator_config.vault.exceptions import VaultConnectionError +# Initialize Loki logger +logger = LokiLogger(service_name="secret-resolver") + class CachedSecret(BaseModel): """Cached secret with TTL information.""" diff --git a/src/llm_orchestrator_config/vault/vault_client.py b/src/llm_orchestrator_config/vault/vault_client.py index 241f019e..51389421 100644 --- a/src/llm_orchestrator_config/vault/vault_client.py +++ b/src/llm_orchestrator_config/vault/vault_client.py @@ -4,7 +4,7 @@ import threading from pathlib import Path from typing import Optional, Dict, Any, cast -from loguru import logger +from src.loki_logger import LokiLogger import hvac from hvac.exceptions import InvalidPath, Forbidden @@ -14,6 +14,9 @@ VaultTokenError, ) +# Initialize Loki logger +logger = LokiLogger(service_name="vault-client") + # Global singleton instance _vault_client_instance: Optional["VaultAgentClient"] = None _vault_client_lock = threading.Lock() diff --git a/src/loki_logger.py b/src/loki_logger.py new file mode 100644 index 00000000..b70be5bd --- /dev/null +++ b/src/loki_logger.py @@ -0,0 +1,180 @@ +#!/usr/bin/env python3 +""" +Loki Logger for RAG Module +Sends logs directly to Loki API for centralized logging + +[CANONICAL SOURCE] +This is the single source of truth for LokiLogger. +Two copies exist for environments where `src` is not a Python package: + - grafana-configs/loki_logger.py — mounted into CronManager container at runtime + - src/vector_indexer/loki_logger.py — used when running vector_indexer scripts locally + +If you change the logger logic here, apply the same change to both copies. +""" + +import json +import time +from datetime import datetime +from threading import Thread +from queue import Full, Queue + +import requests + + +class LokiLogger: + """Simple logger that sends logs directly to Loki API with async background thread""" + + _instances: dict[str, "LokiLogger"] = {} + + def __new__( + cls, loki_url: str = "http://loki:3100", service_name: str = "default" + ) -> "LokiLogger": + key = f"{loki_url}:{service_name}" + if key not in cls._instances: + cls._instances[key] = super().__new__(cls) + return cls._instances[key] + + def __init__( + self, loki_url: str = "http://loki:3100", service_name: str = "default" + ) -> None: + """ + Initialize LokiLogger + + Args: + loki_url: URL for Loki service (default: container URL in bykstack network) + service_name: Name of the service for labeling logs + """ + if hasattr(self, "_initialized"): + return + self._initialized = True + self.loki_url = loki_url + self.service_name = service_name + self.session = requests.Session() + # Set default timeout for all requests + self.timeout = 5 + + # Queue for async log processing (bounded to avoid unbounded memory growth under load) + self.log_queue: Queue[tuple[str, str]] = Queue(maxsize=10_000) + + # Start background worker thread + self.worker_thread = Thread(target=self._process_logs, daemon=True) + self.worker_thread.start() + + def _process_logs(self) -> None: + """Background worker that processes log queue""" + while True: + try: + # Get log entry from queue (blocking) + level, message = self.log_queue.get() + + # Send to Loki + self._send_to_loki_sync(level, message) + + # Mark task as done + self.log_queue.task_done() + except Exception: + # Silently ignore errors in background thread + pass + + def _send_to_loki_sync(self, level: str, message: str) -> None: + """Send log entry directly to Loki API (called from background thread)""" + try: + # Create timestamp in nanoseconds (Loki requirement) + timestamp_ns = str(int(time.time() * 1_000_000_000)) + + # Prepare labels for Loki + labels = { + "service": self.service_name, + "level": level, + } + + # Create log entry + log_entry = { + "level": level, + "message": message, + "service": self.service_name, + } + + # Prepare Loki payload + payload = { + "streams": [ + { + "stream": labels, + "values": [[timestamp_ns, json.dumps(log_entry)]], + } + ] + } + + # Send to Loki + self.session.post( + f"{self.loki_url}/loki/api/v1/push", + json=payload, + headers={"Content-Type": "application/json"}, + timeout=self.timeout, + ) + + except Exception: + # Silently ignore logging errors to not affect main application + pass + + def _log(self, level: str, message: str) -> None: + """Queue log entry for async processing (non-blocking)""" + # Print to console immediately for real-time feedback + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + print(f"[{timestamp}] {level: <8} | {message}") # noqa: T201 + + # Queue for async Loki sending (non-blocking, drops log if queue is full) + try: + self.log_queue.put_nowait((level, message)) + except Full: + # Queue full (Loki may be slow/unreachable) - drop log to avoid blocking + pass + + def info(self, message: str, **kwargs: object) -> None: + """Log info message. Extra kwargs (extra, exc_info) are ignored for compatibility.""" + self._log("INFO", message) + + def error(self, message: str, **kwargs: object) -> None: + """Log error message. Extra kwargs (extra, exc_info) are ignored for compatibility.""" + self._log("ERROR", message) + + def warning(self, message: str, **kwargs: object) -> None: + """Log warning message. Extra kwargs (extra, exc_info) are ignored for compatibility.""" + self._log("WARNING", message) + + def debug(self, message: str, **kwargs: object) -> None: + """Log debug message. Extra kwargs (extra, exc_info) are ignored for compatibility.""" + self._log("DEBUG", message) + + def success(self, message: str, **kwargs: object) -> None: + """Log success message (loguru compatibility). Extra kwargs ignored.""" + self._log("SUCCESS", message) + + def critical(self, message: str, **kwargs: object) -> None: + """Log critical message. Extra kwargs (extra, exc_info) are ignored for compatibility.""" + self._log("CRITICAL", message) + + def exception(self, message: str, **kwargs: object) -> None: + """Log exception message. Extra kwargs (extra, exc_info) are ignored for compatibility.""" + self._log("EXCEPTION", message) + + def add(self, *args: object, **kwargs: object) -> None: + """ + No-op method for loguru compatibility. + + LokiLogger sends logs to Loki/console only, not to files. + This method exists for backward compatibility with loguru code. + """ + pass # Silently ignore - logs go to Loki instead of files + + def remove(self, *args: object, **kwargs: object) -> None: + """No-op method for loguru compatibility.""" + pass # Silently ignore + + def bind(self, **kwargs: object) -> "LokiLogger": + """No-op method for loguru compatibility. Returns self for chaining.""" + return self # Allow method chaining + + def opt(self, **kwargs: object) -> "LokiLogger": + """No-op method for loguru compatibility. Returns self for chaining.""" + return self # Allow method chaining diff --git a/src/models/request_models.py b/src/models/request_models.py index 4be546b1..fc93e125 100644 --- a/src/models/request_models.py +++ b/src/models/request_models.py @@ -6,7 +6,10 @@ from src.utils.input_sanitizer import InputSanitizer from src.llm_orchestrator_config.stream_config import StreamConfig -from loguru import logger +from src.loki_logger import LokiLogger + +# Initialize Loki logger +logger = LokiLogger(service_name="request-models") class ConversationItem(BaseModel): @@ -100,7 +103,6 @@ def validate_conversation_history( cls, v: List[ConversationItem] ) -> List[ConversationItem]: """Validate conversation history limits.""" - from loguru import logger # Limit number of conversation history items max_history_items = 100 @@ -269,9 +271,8 @@ class TestOrchestrationRequest(BaseModel): environment: Literal["production", "testing"] = Field( ..., description="Environment context" ) - connectionId: Optional[str] = Field( - None, - description="Connection identifier — the vault_uuid for the connection (required for testing)", + connectionId: Optional[int] = Field( + None, description="Optional connection identifier" ) diff --git a/src/optimization/metrics/generator_metrics.py b/src/optimization/metrics/generator_metrics.py index acb0a89f..ca0973a9 100644 --- a/src/optimization/metrics/generator_metrics.py +++ b/src/optimization/metrics/generator_metrics.py @@ -6,7 +6,10 @@ from typing import Any, Dict, List import dspy from dspy.evaluate import SemanticF1 -from loguru import logger +from src.loki_logger import LokiLogger + +# Initialize Loki logger +logger = LokiLogger(service_name="generator-metrics") class GeneratorMetric: diff --git a/src/optimization/metrics/guardrails_metrics.py b/src/optimization/metrics/guardrails_metrics.py index 97d4f523..953f4ce9 100644 --- a/src/optimization/metrics/guardrails_metrics.py +++ b/src/optimization/metrics/guardrails_metrics.py @@ -5,7 +5,10 @@ from typing import Any, Dict, List import dspy -from loguru import logger +from src.loki_logger import LokiLogger + +# Initialize Loki logger +logger = LokiLogger(service_name="guardrails-metrics") class GuardrailsMetric: diff --git a/src/optimization/metrics/refiner_metrics.py b/src/optimization/metrics/refiner_metrics.py index 8550d3a6..d29da021 100644 --- a/src/optimization/metrics/refiner_metrics.py +++ b/src/optimization/metrics/refiner_metrics.py @@ -5,7 +5,10 @@ from typing import Any, Dict, List import dspy -from loguru import logger +from src.loki_logger import LokiLogger + +# Initialize Loki logger +logger = LokiLogger(service_name="refiner-metrics") class RefinementJudge(dspy.Signature): diff --git a/src/optimization/optimization_scripts/check_paths.py b/src/optimization/optimization_scripts/check_paths.py index ff05e211..8dc33d2d 100644 --- a/src/optimization/optimization_scripts/check_paths.py +++ b/src/optimization/optimization_scripts/check_paths.py @@ -4,7 +4,10 @@ from pathlib import Path from typing import Dict -from loguru import logger +from src.loki_logger import LokiLogger + +# Initialize Loki logger +logger = LokiLogger(service_name="check-paths") def get_directory_structure() -> tuple[Path, Path]: diff --git a/src/optimization/optimization_scripts/diagnose_guardrails_loader.py b/src/optimization/optimization_scripts/diagnose_guardrails_loader.py index 28909caf..0dab5233 100644 --- a/src/optimization/optimization_scripts/diagnose_guardrails_loader.py +++ b/src/optimization/optimization_scripts/diagnose_guardrails_loader.py @@ -7,9 +7,12 @@ sys.path.append(str(Path(__file__).parent.parent.parent)) -from loguru import logger +from src.loki_logger import LokiLogger from src.guardrails.optimized_guardrails_loader import OptimizedGuardrailsLoader +# Initialize Loki logger +logger = LokiLogger(service_name="diagnose-guardrails-loader") + def main() -> None: """Run diagnostics.""" diff --git a/src/optimization/optimization_scripts/extract_guardrails_prompts.py b/src/optimization/optimization_scripts/extract_guardrails_prompts.py index 8c2654ae..501452ad 100644 --- a/src/optimization/optimization_scripts/extract_guardrails_prompts.py +++ b/src/optimization/optimization_scripts/extract_guardrails_prompts.py @@ -8,7 +8,10 @@ import yaml from pathlib import Path from typing import Dict, Any, Optional, List, Tuple -from loguru import logger +from src.loki_logger import LokiLogger + +# Initialize Loki logger +logger = LokiLogger(service_name="extract-guardrails-prompts") # Constants FULL_TRACEBACK_MSG = "Full traceback:" diff --git a/src/optimization/optimization_scripts/inspect_guardrails_optimization.py b/src/optimization/optimization_scripts/inspect_guardrails_optimization.py index f9632da7..9693aaee 100644 --- a/src/optimization/optimization_scripts/inspect_guardrails_optimization.py +++ b/src/optimization/optimization_scripts/inspect_guardrails_optimization.py @@ -4,7 +4,10 @@ import json from pathlib import Path -from loguru import logger +from src.loki_logger import LokiLogger + +# Initialize Loki logger +logger = LokiLogger(service_name="inspect-guardrails-optimization") def main() -> None: diff --git a/src/optimization/optimization_scripts/run_all_optimizations.py b/src/optimization/optimization_scripts/run_all_optimizations.py index 275148f1..02c041e1 100644 --- a/src/optimization/optimization_scripts/run_all_optimizations.py +++ b/src/optimization/optimization_scripts/run_all_optimizations.py @@ -13,13 +13,15 @@ sys.path.append(str(Path(__file__).parent.parent)) import dspy -from loguru import logger - +from src.loki_logger import LokiLogger from llm_orchestrator_config import LLMManager from optimizers.guardrails_optimizer import optimize_guardrails from optimizers.refiner_optimizer import optimize_refiner from optimizers.generator_optimizer import optimize_generator +# Initialize Loki logger +logger = LokiLogger(service_name="run-all-optimizations") + # Constants TRACEBACK_MSG = "Full traceback:" diff --git a/src/optimization/optimization_scripts/split_datasets.py b/src/optimization/optimization_scripts/split_datasets.py index 3316dffa..76649943 100644 --- a/src/optimization/optimization_scripts/split_datasets.py +++ b/src/optimization/optimization_scripts/split_datasets.py @@ -7,11 +7,13 @@ from typing import List, Dict, Any, Tuple import random import sys +from src.loki_logger import LokiLogger # Add src to path for imports sys.path.append(str(Path(__file__).parent.parent)) -from loguru import logger +# Initialize Loki logger +logger = LokiLogger(service_name="split-datasets") def load_dataset(filepath: Path) -> List[Dict[str, Any]]: diff --git a/src/optimization/optimized_module_loader.py b/src/optimization/optimized_module_loader.py index 19803415..f502e839 100644 --- a/src/optimization/optimized_module_loader.py +++ b/src/optimization/optimized_module_loader.py @@ -10,7 +10,10 @@ from datetime import datetime import threading import dspy -from loguru import logger +from src.loki_logger import LokiLogger + +# Initialize Loki logger +logger = LokiLogger(service_name="optimized-module-loader") class OptimizedModuleLoader: diff --git a/src/optimization/optimizers/generator_optimizer.py b/src/optimization/optimizers/generator_optimizer.py index d141ba8a..54a65eb4 100644 --- a/src/optimization/optimizers/generator_optimizer.py +++ b/src/optimization/optimizers/generator_optimizer.py @@ -12,13 +12,15 @@ sys.path.append(str(Path(__file__).parent.parent.parent)) import dspy -from loguru import logger - +from src.loki_logger import LokiLogger from optimization.metrics.generator_metrics import ( GeneratorMetric, calculate_generator_stats, ) +# Initialize Loki logger +logger = LokiLogger(service_name="generator-optimizer") + class ResponseGeneratorSignature(dspy.Signature): """ diff --git a/src/optimization/optimizers/guardrails_optimizer.py b/src/optimization/optimizers/guardrails_optimizer.py index 02d9e9a1..4fa38d8e 100644 --- a/src/optimization/optimizers/guardrails_optimizer.py +++ b/src/optimization/optimizers/guardrails_optimizer.py @@ -13,13 +13,16 @@ sys.path.append(str(Path(__file__).parent.parent.parent)) import dspy -from loguru import logger +from src.loki_logger import LokiLogger from optimization.metrics.guardrails_metrics import ( safety_weighted_accuracy, calculate_guardrails_stats, ) +# Initialize Loki logger +logger = LokiLogger(service_name="guardrails-optimizer") + class GuardrailsChecker(dspy.Signature): """ diff --git a/src/optimization/optimizers/refiner_optimizer.py b/src/optimization/optimizers/refiner_optimizer.py index 526ab9dc..f545a3c3 100644 --- a/src/optimization/optimizers/refiner_optimizer.py +++ b/src/optimization/optimizers/refiner_optimizer.py @@ -12,13 +12,15 @@ sys.path.append(str(Path(__file__).parent.parent.parent)) import dspy -from loguru import logger - +from src.loki_logger import LokiLogger from optimization.metrics.refiner_metrics import ( RefinerMetric, calculate_refiner_stats, ) +# Initialize Loki logger +logger = LokiLogger(service_name="refiner-optimizer") + class PromptRefinerSignature(dspy.Signature): """ diff --git a/src/prompt_refine_manager/prompt_refiner.py b/src/prompt_refine_manager/prompt_refiner.py index 5cbe30ec..5b857992 100644 --- a/src/prompt_refine_manager/prompt_refiner.py +++ b/src/prompt_refine_manager/prompt_refiner.py @@ -2,7 +2,7 @@ from typing import Any, Sequence, Optional, Dict, Union, cast, List import contextlib -import logging +from src.loki_logger import LokiLogger import dspy from pydantic import BaseModel, Field @@ -10,7 +10,8 @@ from src.utils.cost_utils import get_lm_usage_since from src.optimization.optimized_module_loader import get_module_loader -LOGGER = logging.getLogger(__name__) +# Initialize Loki logger for prompt refiner +LOGGER = LokiLogger(service_name="prompt-refiner") class ConversationHistory(BaseModel): diff --git a/src/response_generator/response_generate.py b/src/response_generator/response_generate.py index b6720726..ca065aee 100644 --- a/src/response_generator/response_generate.py +++ b/src/response_generator/response_generate.py @@ -2,7 +2,7 @@ from typing import List, Dict, Any, Tuple, AsyncIterator, Optional import re import dspy -import logging +from src.loki_logger import LokiLogger import asyncio import dspy.streaming from dspy.streaming import StreamListener @@ -17,11 +17,8 @@ from src.optimization.optimized_module_loader import get_module_loader from src.vector_indexer.constants import ResponseGenerationConstants -# Configure logging -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" -) -logger = logging.getLogger(__name__) +# Initialize Loki logger for response generator +logger = LokiLogger(service_name="response-generator") def _get_current_model_name() -> str: diff --git a/src/tool_classifier/agentic_loop.py b/src/tool_classifier/agentic_loop.py index 16eb95ce..2f136ad2 100644 --- a/src/tool_classifier/agentic_loop.py +++ b/src/tool_classifier/agentic_loop.py @@ -1,10 +1,10 @@ """Standalone agentic loop for multi-turn parameter collection.""" import asyncio +import time from typing import Any, Dict, List, Optional -from loguru import logger - +from src.loki_logger import LokiLogger from src.utils.api_tool_session_store import APIToolSessionStore from tool_classifier.constants import ( CONTINUATION_QUESTION, @@ -17,6 +17,8 @@ from tool_classifier.models import AgenticLoopResult from tool_classifier.param_extractor import ParamExtractionModule +logger = LokiLogger(service_name="api-tool-calling") + _CONTINUATION_QUESTIONS: dict[str, str] = { "en": CONTINUATION_QUESTION, "et": CONTINUATION_QUESTION_ET, @@ -141,6 +143,10 @@ async def run_turn( """ updated_turn_count = turn_count + 1 + logger.info( + f"AgenticLoop: loop turn started | event_type=loop_turn_started chat_id={chat_id} turn_count={turn_count} max_turns={max_turns} awaiting_continuation={awaiting_continuation}" + ) + # Seed inherited params from L2 follow-up detection — turn 0 only. # seeded_params take lower priority than anything already in collected_params # (i.e. values explicitly set by the session take precedence). @@ -153,18 +159,14 @@ async def run_turn( wants_to_continue = self._detect_continuation_response(user_message) if wants_to_continue: logger.debug( - "AgenticLoop: user chose to continue on turn {} for chat_id={}", - turn_count, - chat_id, + f"AgenticLoop: user chose to continue on turn {turn_count} for chat_id={chat_id}" ) # Reset the flag so normal extraction takes over from here. awaiting_continuation = False else: logger.info( - "AgenticLoop: user chose to exit on turn {} for chat_id={}, " - "falling back to RAG", - turn_count, - chat_id, + f"AgenticLoop: user chose to exit on turn {turn_count} for chat_id={chat_id}, " + "falling back to RAG" ) return AgenticLoopResult( status=AgenticLoopStatus.MAX_TURNS_REACHED, @@ -176,9 +178,7 @@ async def run_turn( # Step 1 — Turn limit guard (no session save — caller deletes) if turn_count >= max_turns: logger.warning( - "AgenticLoop: max_turns={} reached for chat_id={}, abandoning", - max_turns, - chat_id, + f"AgenticLoop: max_turns={max_turns} reached for chat_id={chat_id}, abandoning" ) return AgenticLoopResult( status=AgenticLoopStatus.MAX_TURNS_REACHED, @@ -188,6 +188,7 @@ async def run_turn( ) # Step 2 — Extract params from the current user message + _t0 = time.time() try: extraction = await asyncio.to_thread( self._param_extractor, @@ -198,12 +199,14 @@ async def run_turn( session_language, turn_count, ) + _duration_ms = round((time.time() - _t0) * 1000, 1) + logger.debug( + f"AgenticLoop: param extraction complete | event_type=param_extraction_complete chat_id={chat_id} turn_count={turn_count} extracted_count={len(extraction['extracted_params'])} duration_ms={_duration_ms}" + ) except Exception as exc: + _duration_ms = round((time.time() - _t0) * 1000, 1) logger.error( - "AgenticLoop: param extraction failed on turn {} for chat_id={}: {}", - turn_count, - chat_id, - exc, + f"AgenticLoop: param extraction failed on turn {turn_count} for chat_id={chat_id}: {exc}" ) # If a continuation decision was already consumed this turn, persist the # updated flag so the next user message is not misread as another @@ -239,11 +242,13 @@ async def run_turn( } all_collected = required_param_names.issubset(merged_params.keys()) + logger.debug( + f"AgenticLoop: params merged | event_type=params_merged chat_id={chat_id} turn_count={turn_count} required_count={len(required_param_names)} collected_count={len(merged_params)} missing_count={len(required_param_names - merged_params.keys())}" + ) + if all_collected: - logger.debug( - "AgenticLoop: all required params collected on turn {} for chat_id={}", - turn_count, - chat_id, + logger.info( + f"AgenticLoop: loop completed | event_type=loop_completed chat_id={chat_id} turn_count={turn_count} status=completed collected_count={len(merged_params)} duration_ms={_duration_ms}" ) await self._save_session( chat_id, merged_params, updated_turn_count, awaiting_continuation=False @@ -257,18 +262,13 @@ async def run_turn( # Step 5 — Still missing params logger.debug( - "AgenticLoop: turn {} for chat_id={} — still missing: {}", - turn_count, - chat_id, - extraction["missing_required"], + f"AgenticLoop: loop needs input | event_type=loop_needs_input chat_id={chat_id} turn_count={turn_count} missing_params={extraction['missing_required']} status=needs_input" ) # At exactly the continuation threshold, ask whether to keep going. if updated_turn_count == continuation_turn: logger.info( - "AgenticLoop: continuation threshold reached on turn {} for chat_id={}", - turn_count, - chat_id, + f"AgenticLoop: continuation threshold reached | event_type=continuation_threshold_reached chat_id={chat_id} turn_count={turn_count} continuation_turn={continuation_turn} missing_count={len(extraction['missing_required'])}" ) effective_continuation_lang = continuation_language or session_language continuation_q = _CONTINUATION_QUESTIONS.get( @@ -323,6 +323,10 @@ async def stream_run_turn( """ updated_turn_count = turn_count + 1 + logger.info( + f"AgenticLoop: loop turn started | event_type=loop_turn_started chat_id={chat_id} turn_count={turn_count} max_turns={max_turns} awaiting_continuation={awaiting_continuation}" + ) + # Seed inherited params from L2 follow-up detection — turn 0 only. if turn_count == 0 and seeded_params: collected_params = {**seeded_params, **collected_params} @@ -332,18 +336,14 @@ async def stream_run_turn( if awaiting_continuation: wants_to_continue = self._detect_continuation_response(user_message) if wants_to_continue: - logger.debug( - "AgenticLoop: user chose to continue on turn {} for chat_id={}", - turn_count, - chat_id, + logger.info( + f"AgenticLoop: continuation user accepted | event_type=continuation_user_accepted chat_id={chat_id} turn_count={turn_count}" ) awaiting_continuation = False else: logger.info( - "AgenticLoop: user chose to exit on turn {} for chat_id={}, " - "falling back to RAG", - turn_count, - chat_id, + f"AgenticLoop: user chose to exit on turn {turn_count} for chat_id={chat_id}, " + "falling back to RAG" ) return ( AgenticLoopResult( @@ -358,9 +358,7 @@ async def stream_run_turn( # Step 1 — Turn limit guard if turn_count >= max_turns: logger.warning( - "AgenticLoop: max_turns={} reached for chat_id={}, abandoning", - max_turns, - chat_id, + f"AgenticLoop: max_turns={max_turns} reached for chat_id={chat_id}, abandoning" ) return ( AgenticLoopResult( @@ -373,6 +371,7 @@ async def stream_run_turn( ) # Step 2 — Stream-extract params from the current user message + _t0 = time.time() try: question_tokens, extraction = await self._param_extractor.stream_forward( user_message=user_message, @@ -382,12 +381,14 @@ async def stream_run_turn( session_language=session_language, turn_count=turn_count, ) + _duration_ms = round((time.time() - _t0) * 1000, 1) + logger.debug( + f"AgenticLoop: param extraction complete | event_type=param_extraction_complete chat_id={chat_id} turn_count={turn_count} extracted_count={len(extraction['extracted_params'])} duration_ms={_duration_ms}" + ) except Exception as exc: + _duration_ms = round((time.time() - _t0) * 1000, 1) logger.error( - "AgenticLoop: stream param extraction failed on turn {} for chat_id={}: {}", - turn_count, - chat_id, - exc, + f"AgenticLoop: stream param extraction failed on turn {turn_count} for chat_id={chat_id}: {exc}" ) if awaiting_continuation != original_awaiting_continuation: await self._save_session( @@ -420,11 +421,13 @@ async def stream_run_turn( } all_collected = required_param_names.issubset(merged_params.keys()) + logger.debug( + f"AgenticLoop: params merged | event_type=params_merged chat_id={chat_id} turn_count={turn_count} required_count={len(required_param_names)} collected_count={len(merged_params)} missing_count={len(required_param_names - merged_params.keys())}" + ) + if all_collected: logger.debug( - "AgenticLoop: all required params collected on turn {} for chat_id={}", - turn_count, - chat_id, + f"AgenticLoop: all required params collected on turn {turn_count} for chat_id={chat_id}" ) await self._save_session( chat_id, merged_params, updated_turn_count, awaiting_continuation=False @@ -441,17 +444,12 @@ async def stream_run_turn( # Step 5 — Still missing params logger.debug( - "AgenticLoop: turn {} for chat_id={} — still missing: {}", - turn_count, - chat_id, - extraction["missing_required"], + f"AgenticLoop: turn {turn_count} for chat_id={chat_id} — still missing: {extraction['missing_required']}" ) if updated_turn_count == continuation_turn: logger.info( - "AgenticLoop: continuation threshold reached on turn {} for chat_id={}", - turn_count, - chat_id, + f"AgenticLoop: continuation threshold reached on turn {turn_count} for chat_id={chat_id}" ) effective_continuation_lang = continuation_language or session_language continuation_q = _CONTINUATION_QUESTIONS.get( @@ -504,8 +502,7 @@ async def _save_session( try: if self._session_store is None: logger.debug( - "AgenticLoop: session store unavailable — skipping save for chat_id={}", - chat_id, + f"AgenticLoop: session store unavailable — skipping save for chat_id={chat_id}" ) return await self._session_store.update( @@ -516,9 +513,7 @@ async def _save_session( ) except Exception as exc: logger.error( - "AgenticLoop: failed to save session for chat_id={}: {}", - chat_id, - exc, + f"AgenticLoop: failed to save session for chat_id={chat_id}: {exc}" ) def _detect_continuation_response(self, user_message: str) -> bool: diff --git a/src/tool_classifier/api_caller.py b/src/tool_classifier/api_caller.py index cbf38a4b..3ee4cccb 100644 --- a/src/tool_classifier/api_caller.py +++ b/src/tool_classifier/api_caller.py @@ -6,7 +6,8 @@ from typing import Any import httpx -from loguru import logger +from src.loki_logger import LokiLogger +from src.utils.error_utils import generate_error_id from llm_orchestrator_config.llm_ochestrator_constants import get_localized_message from tool_classifier.constants import ( @@ -23,7 +24,8 @@ SERVICE_UNAVAILABLE_MESSAGES, ) from tool_classifier.models import APICallResult -from src.utils.error_utils import generate_error_id, log_error_with_context + +logger = LokiLogger(service_name="api-tool-calling") @dataclass @@ -86,7 +88,9 @@ def can_execute(self, url: str) -> bool: if time.time() - breaker.last_failure_time >= self._cooldown_seconds: breaker.state = CB_STATE_HALF_OPEN breaker.probe_in_flight = True - logger.info(f"[CircuitBreaker] {url!r} → HALF_OPEN (probe allowed)") + logger.info( + f"CircuitBreaker: circuit half-open | event_type=circuit_breaker_half_open url={url}" + ) return True return False # HALF_OPEN: allow exactly one probe request through; gate subsequent @@ -101,8 +105,7 @@ def record_success(self, url: str) -> None: breaker = self._get_state(url) if breaker.state != CB_STATE_CLOSED: logger.info( - f"[CircuitBreaker] {url!r} → CLOSED " - f"(recovered after {breaker.failure_count} failure(s))" + f"CircuitBreaker: circuit recovered | event_type=circuit_breaker_recovered url={url} failure_count={breaker.failure_count}" ) breaker.state = CB_STATE_CLOSED breaker.failure_count = 0 @@ -120,8 +123,7 @@ def record_failure(self, url: str) -> None: if breaker.failure_count >= self._failure_threshold: if breaker.state != CB_STATE_OPEN: logger.warning( - f"[CircuitBreaker] {url!r} → OPEN " - f"after {breaker.failure_count} failure(s)" + f"CircuitBreaker: circuit opened | event_type=circuit_breaker_opened url={url} failure_count={breaker.failure_count} threshold={self._failure_threshold}" ) breaker.state = CB_STATE_OPEN @@ -192,7 +194,7 @@ async def call( if not self._circuit_breaker.can_execute(url): logger.warning( - f"[APICaller] Circuit breaker OPEN for {url!r} — rejecting call" + f"APICaller: api call rejected | event_type=api_call_rejected_circuit_open url={url} method={method_upper}" ) return APICallResult( success=False, @@ -202,6 +204,10 @@ async def call( ) effective_timeout = timeout if timeout is not None else self._default_timeout + logger.debug( + f"APICaller: api call started | event_type=api_call_started url={url} method={method_upper} timeout={effective_timeout}" + ) + _t0 = time.time() try: async with httpx.AsyncClient( timeout=effective_timeout, follow_redirects=True @@ -210,17 +216,19 @@ async def call( response = await client.post(url, json=params) else: response = await client.get(url, params=params) - return self._handle_response(response, url, language) + result = self._handle_response(response, url, language) + _duration_ms = round((time.time() - _t0) * 1000, 1) + if result.success: + logger.info( + f"APICaller: api call success | event_type=api_call_success url={url} method={method_upper} status_code={result.status_code} duration_ms={_duration_ms}" + ) + return result except httpx.TimeoutException as exc: - error_id = generate_error_id() - log_error_with_context( - logger, - error_id, - "api_call_timeout", - None, - exc, - {"url": url, "method": method_upper}, + _duration_ms = round((time.time() - _t0) * 1000, 1) + _error_id = generate_error_id() + logger.error( + f"APICaller: api call timeout | event_type=api_call_timeout url={url} method={method_upper} error_id={_error_id} duration_ms={_duration_ms} exc={exc!r}" ) self._circuit_breaker.record_failure(url) return APICallResult( @@ -231,14 +239,10 @@ async def call( ) except httpx.RequestError as exc: - error_id = generate_error_id() - log_error_with_context( - logger, - error_id, - "api_call_network_error", - None, - exc, - {"url": url, "method": method_upper}, + _duration_ms = round((time.time() - _t0) * 1000, 1) + _error_id = generate_error_id() + logger.error( + f"APICaller: api call network error | event_type=api_call_network_error url={url} method={method_upper} error_id={_error_id} duration_ms={_duration_ms} exc={exc!r}" ) self._circuit_breaker.record_failure(url) return APICallResult( @@ -271,8 +275,7 @@ def _handle_response( # Not a server fault — do NOT trip the circuit breaker. location = response.headers.get("location", "") logger.warning( - f"[APICaller] Unresolved redirect {status_code} from {url!r} " - f"→ {location!r}" + f"APICaller: api call redirect | event_type=api_call_redirect url={url} status_code={status_code} location={location!r}" ) base_msg = get_localized_message(REDIRECT_NOT_FOLLOWED_MESSAGES, language) error_msg = base_msg.format( @@ -293,7 +296,7 @@ def _handle_response( error_body = self._parse_response_body(response) raw_msg = error_body if isinstance(error_body, str) else str(error_body) logger.warning( - f"[APICaller] 4xx response {status_code} from {url!r}: {raw_msg[:200]}" + f"APICaller: api call client error | event_type=api_call_client_error url={url} status_code={status_code} error_preview={raw_msg[:200]!r}" ) return APICallResult( success=False, @@ -303,9 +306,8 @@ def _handle_response( ) # 5xx — server is misbehaving; trip the circuit breaker. - error_id = generate_error_id() logger.error( - f"[{error_id}] [APICaller] Server error {status_code} from {url!r}" + f"APICaller: api call server error | event_type=api_call_server_error url={url} status_code={status_code} error_id={generate_error_id()}" ) self._circuit_breaker.record_failure(url) return APICallResult( diff --git a/src/tool_classifier/api_response_formatter.py b/src/tool_classifier/api_response_formatter.py index 923f685e..8e7d9f15 100644 --- a/src/tool_classifier/api_response_formatter.py +++ b/src/tool_classifier/api_response_formatter.py @@ -12,11 +12,12 @@ safe_observation_context, update_observation_safe, ) -from loguru import logger - +from src.loki_logger import LokiLogger from llm_orchestrator_config.llm_ochestrator_constants import get_localized_message from src.utils.cost_utils import get_lm_usage_since +logger = LokiLogger(service_name="api-tool-calling") + _MAX_ITEMS: int = 500 _MAX_RESPONSE_BYTES: int = 50_000 diff --git a/src/tool_classifier/api_semantic_searcher.py b/src/tool_classifier/api_semantic_searcher.py index e1c63e79..efe3ee84 100644 --- a/src/tool_classifier/api_semantic_searcher.py +++ b/src/tool_classifier/api_semantic_searcher.py @@ -8,7 +8,7 @@ import dspy import httpx from src.utils.observation_utils import safe_observation_context -from loguru import logger +from src.loki_logger import LokiLogger from tool_classifier.constants import ( API_TOOL_COLLECTION, @@ -38,6 +38,9 @@ def _get_current_model_name() -> str: return "unknown" +logger = LokiLogger(service_name="api-tool-calling") + + class EmbeddingServiceProtocol(Protocol): """Protocol for any service that can generate text embeddings.""" diff --git a/src/tool_classifier/classifier.py b/src/tool_classifier/classifier.py index fa1096ce..3ce4b37e 100644 --- a/src/tool_classifier/classifier.py +++ b/src/tool_classifier/classifier.py @@ -12,9 +12,10 @@ TYPE_CHECKING, ) import httpx + import asyncio -from loguru import logger +from src.loki_logger import LokiLogger from llm_orchestrator_config.llm_manager import LLMManager from models.request_models import ( ConversationItem, @@ -62,6 +63,9 @@ from llm_orchestrator_config.feature_flags import FeatureFlags from utils.atc_cache_store import ATCCacheStore +# Initialize Loki logger +logger = LokiLogger(service_name="tool-classifier") + if TYPE_CHECKING: from llm_orchestration_service import LLMOrchestrationService @@ -909,7 +913,7 @@ async def _try_parallel_api_tool_classification( None to signal the caller to fall back to the single-endpoint path. Args: - sub_queries: Focused sub-queries from IntentDecomposer (2–3 items). + sub_queries: Focused sub-queries from IntentDecomposer (2-3 items). environment: LLM environment from the original request. connection_id: Connection ID from the original request. original_matched: The gate search result (used as fallback reference). diff --git a/src/tool_classifier/context_analyzer.py b/src/tool_classifier/context_analyzer.py index 36078c8b..d4016180 100644 --- a/src/tool_classifier/context_analyzer.py +++ b/src/tool_classifier/context_analyzer.py @@ -12,12 +12,14 @@ safe_observation_context, update_observation_safe, ) -from loguru import logger +from src.loki_logger import LokiLogger from pydantic import BaseModel, Field from src.utils.cost_utils import get_lm_usage_since from tool_classifier.greeting_constants import get_greeting_response +logger = LokiLogger(service_name="api-tool-calling") + def _get_current_model_name() -> str: """Best-effort model name lookup from current DSPy LM.""" diff --git a/src/tool_classifier/follow_up_detector.py b/src/tool_classifier/follow_up_detector.py index 73e16289..cbd8a403 100644 --- a/src/tool_classifier/follow_up_detector.py +++ b/src/tool_classifier/follow_up_detector.py @@ -1,13 +1,17 @@ """Follow-up detection using DSPy — classifies whether a user query is a follow-up to a previous ATC API call.""" import json +import time from typing import Any, Dict, List, TypedDict import dspy -from loguru import logger +from src.loki_logger import LokiLogger +from src.utils.error_utils import generate_error_id from .param_extractor import strip_format_hints +logger = LokiLogger(service_name="api-tool-calling") + _VALID_FOLLOW_UP_TYPES = {"param_update", "response_question", "new_intent"} @@ -36,7 +40,7 @@ def _validate_updated_params( for key, value in updated_params.items(): if key not in schema_lookup: logger.warning( - f"Parameter '{key}' from LLM is not in schema; dropping it to prevent injection" + f"FollowUpDetectorModule: unexpected param dropped | event_type=unexpected_param_dropped param_name={key!r}" ) else: validated[key] = value @@ -161,6 +165,7 @@ def forward( params_schema_json = json.dumps(sanitized_schema, ensure_ascii=False) result = None + _t0 = time.time() try: result = self.detector( user_query=user_query, @@ -168,12 +173,13 @@ def forward( previous_params=previous_params_json, params_schema=params_schema_json, ) + _duration_ms = round((time.time() - _t0) * 1000, 1) # Parse and validate follow_up_type follow_up_type = result.follow_up_type.strip().strip("'\"") if follow_up_type not in _VALID_FOLLOW_UP_TYPES: logger.warning( - f"Invalid follow_up_type value '{follow_up_type}'; defaulting to 'new_intent'" + f"FollowUpDetectorModule: invalid follow_up_type | event_type=follow_up_type_invalid received={follow_up_type!r}" ) follow_up_type = "new_intent" @@ -187,12 +193,13 @@ def forward( updated_params = json.loads(sanitized_params) if not isinstance(updated_params, dict): logger.warning( - f"updated_params is not a dict (got {type(updated_params).__name__}); defaulting to {{}}" + f"FollowUpDetectorModule: updated_params not a dict | event_type=updated_params_not_dict received_type={type(updated_params).__name__}" ) updated_params = {} - except json.JSONDecodeError as e: + except json.JSONDecodeError as exc: + _err_id = generate_error_id() logger.error( - f"Failed to parse updated_params JSON for param_update: {e}" + f"FollowUpDetectorModule: JSON parse error in updated_params | event_type=updated_params_json_error error_id={_err_id} error={exc}" ) updated_params = {} @@ -204,11 +211,18 @@ def forward( # Validate against schema to drop unexpected/injected parameters updated_params = _validate_updated_params(updated_params, params_schema) + logger.debug( + f"FollowUpDetectorModule: detection complete | event_type=follow_up_detection_complete follow_up_type={follow_up_type} updated_params_count={len(updated_params)} duration_ms={_duration_ms}" + ) + return FollowUpDetectionResult( follow_up_type=follow_up_type, updated_params=updated_params, ) - except Exception as e: - logger.error(f"Follow-up detection forward failed: {e}", exc_info=True) + except Exception as exc: + _err_id = generate_error_id() + logger.error( + f"FollowUpDetectorModule: detection failed | event_type=follow_up_detection_failed error_id={_err_id} error={exc}" + ) return _safe_fallback diff --git a/src/tool_classifier/intent_decomposer.py b/src/tool_classifier/intent_decomposer.py index 5bf53efe..408b17d1 100644 --- a/src/tool_classifier/intent_decomposer.py +++ b/src/tool_classifier/intent_decomposer.py @@ -6,12 +6,14 @@ from typing import Any, cast import dspy -from loguru import logger +from src.loki_logger import LokiLogger from src.utils.cost_utils import get_lm_usage_since from src.utils.observation_utils import safe_observation_context from tool_classifier.constants import MULTI_API_MAX_ENDPOINTS +logger = LokiLogger(service_name="api-tool-calling") + def _get_current_model_name() -> str: """Best-effort model name lookup from current DSPy LM.""" diff --git a/src/tool_classifier/intent_detector.py b/src/tool_classifier/intent_detector.py index 5580d086..52f2af9c 100644 --- a/src/tool_classifier/intent_detector.py +++ b/src/tool_classifier/intent_detector.py @@ -5,12 +5,16 @@ import dspy from langfuse import observe -from loguru import logger +from src.loki_logger import LokiLogger from src.utils.cost_utils import get_lm_usage_since from src.utils.observation_utils import update_observation_safe +# Initialize Loki logger +logger = LokiLogger(service_name="intent-detector") + + def _get_current_model_name() -> str: """Best-effort model name lookup from current DSPy LM.""" try: diff --git a/src/tool_classifier/multi_agentic_loop.py b/src/tool_classifier/multi_agentic_loop.py index 6332039c..4ce8ae32 100644 --- a/src/tool_classifier/multi_agentic_loop.py +++ b/src/tool_classifier/multi_agentic_loop.py @@ -2,9 +2,11 @@ import asyncio import json +import time from typing import Any, Dict, List, Optional, Tuple -from loguru import logger +from src.loki_logger import LokiLogger +from src.utils.error_utils import generate_error_id from models.session_models import EndpointSessionState from utils.api_tool_session_store import APIToolSessionStore @@ -21,6 +23,8 @@ from tool_classifier.models import AgenticLoopResult from tool_classifier.param_extractor import ParamExtractionModule, strip_format_hints +logger = LokiLogger(service_name="api-tool-calling") + _CONTINUATION_QUESTIONS: dict[str, str] = { "en": CONTINUATION_QUESTION, "et": CONTINUATION_QUESTION_ET, @@ -154,8 +158,7 @@ async def run_turn( """ if not endpoint_states: logger.debug( - "MultiEndpointAgenticLoop: no endpoints provided for chat_id={} — returning COMPLETED", - chat_id, + f"MultiEndpointAgenticLoop: no endpoints provided for chat_id={chat_id} — returning COMPLETED" ) return AgenticLoopResult( status=AgenticLoopStatus.COMPLETED, @@ -168,22 +171,25 @@ async def run_turn( max_turns, continuation_turn = self._compute_turn_limits(num_endpoints) updated_turn_count = turn_count + 1 + logger.info( + f"MultiEndpointAgenticLoop: multi loop turn started | event_type=multi_loop_turn_started" + f" chat_id={chat_id} turn_count={turn_count} endpoint_count={num_endpoints}" + f" incomplete_count={sum(1 for s in endpoint_states if not s.completed)}" + ) + # Step 0 — Continuation decision if awaiting_continuation: wants_to_continue = detect_continuation_response(user_message) if wants_to_continue: - logger.debug( - "MultiEndpointAgenticLoop: user chose to continue on turn {} for chat_id={}", - turn_count, - chat_id, + logger.info( + f"MultiEndpointAgenticLoop: continuation user accepted | event_type=continuation_user_accepted" + f" chat_id={chat_id} turn_count={turn_count}" ) awaiting_continuation = False else: logger.info( - "MultiEndpointAgenticLoop: user chose to exit on turn {} for chat_id={}, " - "falling back to RAG", - turn_count, - chat_id, + f"MultiEndpointAgenticLoop: continuation user declined | event_type=continuation_user_declined" + f" chat_id={chat_id} turn_count={turn_count} status=max_turns_reached" ) return AgenticLoopResult( status=AgenticLoopStatus.MAX_TURNS_REACHED, @@ -195,9 +201,8 @@ async def run_turn( # Step 1 — Turn limit guard (no session save — caller deletes) if turn_count >= max_turns: logger.warning( - "MultiEndpointAgenticLoop: max_turns={} reached for chat_id={}, abandoning", - max_turns, - chat_id, + f"MultiEndpointAgenticLoop: turn limit reached | event_type=turn_limit_reached" + f" chat_id={chat_id} turn_count={turn_count} max_turns={max_turns}" ) return AgenticLoopResult( status=AgenticLoopStatus.MAX_TURNS_REACHED, @@ -211,6 +216,13 @@ async def run_turn( endpoint_states ) + logger.debug( + f"MultiEndpointAgenticLoop: schema merged | event_type=schema_merged" + f" chat_id={chat_id} turn_count={turn_count}" + f" total_param_count={sum(len(s.endpoint.get('params', [])) for s in endpoint_states if not s.completed)}" + f" deduplicated_count={len(merged_schema)} namespaced_count={len(namespace_map)}" + ) + # Step 3 — Namespaced already_collected for the LLM extractor merged_already_collected = self._build_namespaced_already_collected( endpoint_states, namespace_map @@ -220,6 +232,7 @@ async def run_turn( intent_groups = self._build_intent_groups( endpoint_states, merged_already_collected, namespace_map ) + _t0 = time.time() try: extraction = await asyncio.to_thread( self._param_extractor, @@ -231,12 +244,18 @@ async def run_turn( turn_count, intent_groups, ) + _duration_ms = round((time.time() - _t0) * 1000, 1) + logger.debug( + f"MultiEndpointAgenticLoop: param extraction complete | event_type=param_extraction_complete" + f" chat_id={chat_id} turn_count={turn_count}" + f" extracted_count={len(extraction['extracted_params'])} duration_ms={_duration_ms}" + ) except Exception as exc: + _duration_ms = round((time.time() - _t0) * 1000, 1) logger.error( - "MultiEndpointAgenticLoop: param extraction failed on turn {} for chat_id={}: {}", - turn_count, - chat_id, - exc, + f"MultiEndpointAgenticLoop: param extraction failed | event_type=param_extraction_failed" + f" chat_id={chat_id} turn_count={turn_count}" + f" error_id={generate_error_id()} duration_ms={_duration_ms} exc={exc}" ) await self._save_session( chat_id, @@ -282,10 +301,10 @@ async def run_turn( merged_after = self._merged_collected(endpoint_states) if all_done: - logger.debug( - "MultiEndpointAgenticLoop: all endpoints completed on turn {} for chat_id={}", - turn_count, - chat_id, + logger.info( + f"MultiEndpointAgenticLoop: all endpoints completed | event_type=multi_loop_all_endpoints_completed" + f" chat_id={chat_id} turn_count={turn_count} endpoint_count={num_endpoints}" + f" status=completed duration_ms={_duration_ms}" ) await self._save_session( chat_id, @@ -302,18 +321,17 @@ async def run_turn( # Step 7 — Still missing params logger.debug( - "MultiEndpointAgenticLoop: turn {} for chat_id={} — still missing: {}", - turn_count, - chat_id, - extraction["missing_required"], + f"MultiEndpointAgenticLoop: loop needs input | event_type=loop_needs_input" + f" chat_id={chat_id} turn_count={turn_count}" + f" missing_params={extraction['missing_required']} status=needs_input" ) # At exactly the continuation threshold, ask whether to keep going. if updated_turn_count == continuation_turn: logger.info( - "MultiEndpointAgenticLoop: continuation threshold reached on turn {} for chat_id={}", - turn_count, - chat_id, + f"MultiEndpointAgenticLoop: continuation threshold reached | event_type=continuation_threshold_reached" + f" chat_id={chat_id} turn_count={turn_count} continuation_turn={continuation_turn}" + f" missing_count={len(extraction['missing_required'])}" ) effective_lang = continuation_language or session_language continuation_q = _CONTINUATION_QUESTIONS.get( @@ -379,8 +397,7 @@ async def stream_run_turn( """ if not endpoint_states: logger.debug( - "MultiEndpointAgenticLoop: no endpoints provided for chat_id={} — returning COMPLETED", - chat_id, + f"MultiEndpointAgenticLoop: no endpoints provided for chat_id={chat_id} — returning COMPLETED" ) return ( AgenticLoopResult( @@ -396,22 +413,25 @@ async def stream_run_turn( max_turns, continuation_turn = self._compute_turn_limits(num_endpoints) updated_turn_count = turn_count + 1 + logger.info( + f"MultiEndpointAgenticLoop: multi loop turn started | event_type=multi_loop_turn_started" + f" chat_id={chat_id} turn_count={turn_count} endpoint_count={num_endpoints}" + f" incomplete_count={sum(1 for s in endpoint_states if not s.completed)}" + ) + # Step 0 — Continuation decision if awaiting_continuation: wants_to_continue = detect_continuation_response(user_message) if wants_to_continue: - logger.debug( - "MultiEndpointAgenticLoop: user chose to continue on turn {} for chat_id={}", - turn_count, - chat_id, + logger.info( + f"MultiEndpointAgenticLoop: continuation user accepted | event_type=continuation_user_accepted" + f" chat_id={chat_id} turn_count={turn_count}" ) awaiting_continuation = False else: logger.info( - "MultiEndpointAgenticLoop: user chose to exit on turn {} for chat_id={}, " - "falling back to RAG", - turn_count, - chat_id, + f"MultiEndpointAgenticLoop: continuation user declined | event_type=continuation_user_declined" + f" chat_id={chat_id} turn_count={turn_count} status=max_turns_reached" ) return ( AgenticLoopResult( @@ -426,9 +446,8 @@ async def stream_run_turn( # Step 1 — Turn limit guard if turn_count >= max_turns: logger.warning( - "MultiEndpointAgenticLoop: max_turns={} reached for chat_id={}, abandoning", - max_turns, - chat_id, + f"MultiEndpointAgenticLoop: turn limit reached | event_type=turn_limit_reached" + f" chat_id={chat_id} turn_count={turn_count} max_turns={max_turns}" ) return ( AgenticLoopResult( @@ -454,6 +473,7 @@ async def stream_run_turn( intent_groups = self._build_intent_groups( endpoint_states, merged_already_collected, namespace_map ) + _t0 = time.time() try: question_tokens, extraction = await self._param_extractor.stream_forward( user_message=user_message, @@ -464,12 +484,18 @@ async def stream_run_turn( turn_count=turn_count, intent_groups=intent_groups, ) + _duration_ms = round((time.time() - _t0) * 1000, 1) + logger.debug( + f"MultiEndpointAgenticLoop: param extraction complete | event_type=param_extraction_complete" + f" chat_id={chat_id} turn_count={turn_count}" + f" extracted_count={len(extraction['extracted_params'])} duration_ms={_duration_ms}" + ) except Exception as exc: + _duration_ms = round((time.time() - _t0) * 1000, 1) logger.error( - "MultiEndpointAgenticLoop: stream param extraction failed on turn {} for chat_id={}: {}", - turn_count, - chat_id, - exc, + f"MultiEndpointAgenticLoop: param extraction failed | event_type=param_extraction_failed" + f" chat_id={chat_id} turn_count={turn_count}" + f" error_id={generate_error_id()} duration_ms={_duration_ms} exc={exc}" ) await self._save_session( chat_id, @@ -517,10 +543,10 @@ async def stream_run_turn( merged_after = self._merged_collected(endpoint_states) if all_done: - logger.debug( - "MultiEndpointAgenticLoop: all endpoints completed on turn {} for chat_id={}", - turn_count, - chat_id, + logger.info( + f"MultiEndpointAgenticLoop: all endpoints completed | event_type=multi_loop_all_endpoints_completed" + f" chat_id={chat_id} turn_count={turn_count} endpoint_count={num_endpoints}" + f" status=completed duration_ms={_duration_ms}" ) await self._save_session( chat_id, @@ -540,17 +566,16 @@ async def stream_run_turn( # Step 7 — Still missing params logger.debug( - "MultiEndpointAgenticLoop: turn {} for chat_id={} — still missing: {}", - turn_count, - chat_id, - extraction["missing_required"], + f"MultiEndpointAgenticLoop: loop needs input | event_type=loop_needs_input" + f" chat_id={chat_id} turn_count={turn_count}" + f" missing_params={extraction['missing_required']} status=needs_input" ) if updated_turn_count == continuation_turn: logger.info( - "MultiEndpointAgenticLoop: continuation threshold reached on turn {} for chat_id={}", - turn_count, - chat_id, + f"MultiEndpointAgenticLoop: continuation threshold reached | event_type=continuation_threshold_reached" + f" chat_id={chat_id} turn_count={turn_count} continuation_turn={continuation_turn}" + f" missing_count={len(extraction['missing_required'])}" ) effective_lang = continuation_language or session_language continuation_q = _CONTINUATION_QUESTIONS.get( @@ -664,9 +689,7 @@ async def _regenerate_question_after_enforcement( return question except Exception as exc: logger.warning( - "MultiEndpointAgenticLoop: failed to regenerate clarifying question " - "after enforcement: {}", - exc, + f"MultiEndpointAgenticLoop: failed to regenerate clarifying question after enforcement | exc={exc}" ) return "" @@ -878,11 +901,8 @@ def _build_merged_schema( # Warn on type conflicts if param_type != seen_types[name]: logger.warning( - "MultiEndpointAgenticLoop: param '{}' has conflicting types " - "across endpoints ({} vs {}). Using first occurrence's type.", - name, - seen_types[name], - param_type, + f"MultiEndpointAgenticLoop: param '{name}' has conflicting types " + f"across endpoints ({seen_types[name]} vs {param_type}). Using first occurrence's type." ) # Promote to required if required in any owner @@ -1094,12 +1114,10 @@ def _distribute_params( if value == existing: continue logger.warning( - "MultiEndpointAgenticLoop: overwriting '{}' on completed endpoint '{}' " - "(old={!r}, new={!r})", - original_name, - state.endpoint.get("name", ""), - existing, - value, + f"MultiEndpointAgenticLoop: overwriting completed endpoint param" + f" | event_type=endpoint_param_overwrite" + f" endpoint_name={state.endpoint.get('name', '')}" + f" param_name={original_name} old_value={repr(existing)} new_value={repr(value)}" ) state.collected_params[original_name] = value else: @@ -1112,12 +1130,10 @@ def _distribute_params( if value == existing: continue logger.warning( - "MultiEndpointAgenticLoop: overwriting '{}' on completed endpoint '{}' " - "(old={!r}, new={!r})", - param_name, - state.endpoint.get("name", ""), - existing, - value, + f"MultiEndpointAgenticLoop: overwriting completed endpoint param" + f" | event_type=endpoint_param_overwrite" + f" endpoint_name={state.endpoint.get('name', '')}" + f" param_name={param_name} old_value={repr(existing)} new_value={repr(value)}" ) state.collected_params[param_name] = value @@ -1134,8 +1150,8 @@ def _distribute_params( if required_names.issubset(state.collected_params.keys()): state.completed = True logger.debug( - "MultiEndpointAgenticLoop: endpoint '{}' completed", - state.endpoint.get("name", ""), + f"MultiEndpointAgenticLoop: endpoint completed | event_type=endpoint_completed" + f" endpoint_name={state.endpoint.get('name', '')}" ) def _enforce_sequential_parallel_completion( @@ -1220,11 +1236,10 @@ def _enforce_sequential_parallel_completion( and later_values_normalized.isdisjoint(first_values_normalized) ): logger.debug( - "MultiEndpointAgenticLoop: endpoint '{}' completed with values " - "distinct from '{}' — keeping both (user provided separate params " - "for each intent)", - state.endpoint.get("name", ""), - first_state.endpoint.get("name", ""), + f"MultiEndpointAgenticLoop: parallel endpoints completed with distinct values — keeping both" + f" | event_type=enforcement_parallel_completion" + f" endpoint_name={state.endpoint.get('name', '')}" + f" first_endpoint_name={first_state.endpoint.get('name', '')}" ) continue @@ -1232,9 +1247,9 @@ def _enforce_sequential_parallel_completion( # to multiple endpoints. Clear this endpoint so the loop re-asks. if later_values_normalized == first_values_normalized: logger.debug( - "MultiEndpointAgenticLoop: clearing params from endpoint '{}' — " - "multiple endpoints completed simultaneously with identical values", - state.endpoint.get("name", ""), + f"MultiEndpointAgenticLoop: clearing duplicate params from endpoint" + f" | event_type=enforcement_sequential_conflict" + f" endpoint_name={state.endpoint.get('name', '')}" ) for param in params_schema: if isinstance(param, dict) and param.get("required", False): @@ -1321,9 +1336,9 @@ def _enforce_sequential_conflicting_params( continue if later_values == first_values: logger.debug( - "MultiEndpointAgenticLoop: clearing duplicate conflicting params " - "from endpoint '{}' — user gave a single value, not separate ones", - state.endpoint.get("name", ""), + f"MultiEndpointAgenticLoop: clearing duplicate conflicting params from endpoint" + f" | event_type=enforcement_sequential_conflict" + f" endpoint_name={state.endpoint.get('name', '')}" ) for name in shared_names: state.collected_params.pop(name, None) @@ -1389,8 +1404,7 @@ async def _save_session( try: if self._session_store is None: logger.debug( - "MultiEndpointAgenticLoop: session store unavailable — skipping save for chat_id={}", - chat_id, + f"MultiEndpointAgenticLoop: session store unavailable — skipping save for chat_id={chat_id}" ) return await self._session_store.update( @@ -1401,7 +1415,6 @@ async def _save_session( ) except Exception as exc: logger.error( - "MultiEndpointAgenticLoop: failed to save session for chat_id={}: {}", - chat_id, - exc, + f"MultiEndpointAgenticLoop: failed to save session | event_type=session_save_failed" + f" chat_id={chat_id} turn_count={turn_count} error_id={generate_error_id()} exc={exc}" ) diff --git a/src/tool_classifier/multi_api_caller.py b/src/tool_classifier/multi_api_caller.py index 34829302..6674f334 100644 --- a/src/tool_classifier/multi_api_caller.py +++ b/src/tool_classifier/multi_api_caller.py @@ -1,9 +1,11 @@ """MultiAPICaller — concurrent batch execution of multiple API endpoints.""" import asyncio +import time from typing import Any -from loguru import logger +from src.loki_logger import LokiLogger +from src.utils.error_utils import generate_error_id from tool_classifier.api_caller import APICaller from tool_classifier.constants import ( @@ -11,9 +13,10 @@ MULTI_API_PARTIAL_FAILURE_MESSAGES, ) from tool_classifier.models import APICallResult, MultiAPICallResult -from src.utils.error_utils import generate_error_id, log_error_with_context from llm_orchestrator_config.llm_ochestrator_constants import get_localized_message +logger = LokiLogger(service_name="api-tool-calling") + class MultiAPICaller: """ @@ -71,6 +74,12 @@ async def call_all( if not endpoints: return MultiAPICallResult(results=[], endpoints=[]) + logger.info( + f"MultiAPICaller: batch started | event_type=multi_api_batch_started" + f" endpoint_count={len(endpoints)} batch_timeout={self._batch_timeout}" + ) + _t0 = time.time() + tasks: list[asyncio.Task[APICallResult]] = [ asyncio.create_task( self._api_caller.call( @@ -95,8 +104,8 @@ async def call_all( except asyncio.TimeoutError: pending = [t for t in tasks if not t.done()] logger.warning( - f"[MultiAPICaller] Batch timeout ({self._batch_timeout}s) — " - f"cancelling {len(pending)} pending task(s)" + f"MultiAPICaller: batch timeout | event_type=multi_api_batch_timeout" + f" pending_count={len(pending)} batch_timeout={self._batch_timeout}" ) for task in pending: task.cancel() @@ -128,10 +137,19 @@ async def call_all( had_failure = any(not r.success for r in results) if had_failure: logger.warning( - "[MultiAPICaller] Partial failure: " - f"{sum(not r.success for r in results)}/{len(results)} call(s) failed" + f"MultiAPICaller: partial failure | event_type=multi_api_partial_failure" + f" failed_count={sum(not r.success for r in results)} total_count={len(results)}" ) + _duration_ms = round((time.time() - _t0) * 1000, 1) + logger.info( + f"MultiAPICaller: batch completed | event_type=multi_api_batch_completed" + f" endpoint_count={len(endpoints)}" + f" success_count={sum(r.success for r in results)}" + f" failed_count={sum(not r.success for r in results)}" + f" duration_ms={_duration_ms}" + ) + return MultiAPICallResult(results=results, endpoints=endpoints) # ------------------------------------------------------------------ @@ -155,16 +173,11 @@ def _coerce_result( # Assume any non-exception value is already an APICallResult. return item # type: ignore[return-value] error_id = generate_error_id() - exc: Exception = ( - item if isinstance(item, Exception) else RuntimeError(str(item)) - ) - log_error_with_context( - logger, - error_id, - "multi_api_task_exception", - None, - exc, - {}, + exc_type = type(item).__name__ + exc_msg = str(item) + logger.error( + f"MultiAPICaller: task exception | event_type=multi_api_task_exception" + f" error_id={error_id} exc_type={exc_type} exc_msg={exc_msg!r}" ) return APICallResult( success=False, diff --git a/src/tool_classifier/multi_response_formatter.py b/src/tool_classifier/multi_response_formatter.py index 270cd98a..9ebcdef1 100644 --- a/src/tool_classifier/multi_response_formatter.py +++ b/src/tool_classifier/multi_response_formatter.py @@ -6,8 +6,7 @@ import dspy.streaming from dspy.streaming import StreamListener from langfuse import observe -from loguru import logger - +from src.loki_logger import LokiLogger from llm_orchestrator_config.llm_ochestrator_constants import get_localized_message from src.utils.cost_utils import get_lm_usage_since from src.utils.observation_utils import ( @@ -34,6 +33,8 @@ def _get_current_model_name() -> str: return "unknown" +logger = LokiLogger(service_name="api-tool-calling") + _MAX_TOTAL_RESPONSE_BYTES: int = 100_000 _MULTI_FORMATTER_ERROR_MESSAGES: Dict[str, str] = { diff --git a/src/tool_classifier/param_extractor.py b/src/tool_classifier/param_extractor.py index 67ed173d..a3c0cbe6 100644 --- a/src/tool_classifier/param_extractor.py +++ b/src/tool_classifier/param_extractor.py @@ -3,6 +3,7 @@ import asyncio import json import re +import time from datetime import datetime, timezone from typing import Any, Dict, List, Optional, TypedDict @@ -11,7 +12,8 @@ from dspy.streaming import StreamListener from langfuse import observe from src.utils.observation_utils import update_observation_safe -from loguru import logger +from src.loki_logger import LokiLogger +from src.utils.error_utils import generate_error_id from src.utils.cost_utils import get_lm_usage_since @@ -20,6 +22,8 @@ _MAX_HISTORY_TURNS = 5 +logger = LokiLogger(service_name="api-tool-calling") + # Regex patterns to strip format hints from parameter descriptions before # they are fed to the question-generation prompt. This prevents the LLM # from including format instructions (e.g. "YYYY-MM-DD") in its questions. @@ -294,6 +298,7 @@ def forward( ) result = None + _t0 = time.time() try: result = self.extractor( user_message=user_message, @@ -305,6 +310,12 @@ def forward( custom_instructions=self._custom_instructions, intent_groups=intent_groups_json, ) + _duration_ms = round((time.time() - _t0) * 1000, 1) + logger.debug( + f"ParamExtractionModule: LLM extraction complete" + f" | event_type=param_extraction_llm_complete" + f" turn_count={turn_count} duration_ms={_duration_ms}" + ) parsed = self._parse_prediction(result, params_schema, already_collected) usage = get_lm_usage_since(history_length_before) update_observation_safe( @@ -324,16 +335,17 @@ def forward( }, ) return parsed - except json.JSONDecodeError as e: - logger.error(f"Failed to parse param extraction JSON: {e}") - if result: - logger.error( - f"Raw extracted_params: {getattr(result, 'extracted_params', None)}" - ) - logger.error( - f"Raw missing_required: {getattr(result, 'missing_required', None)}" - ) + _raw_ep = getattr(result, "extracted_params", None) + _raw_mr = getattr(result, "missing_required", None) + logger.error( + f"ParamExtractionModule: JSON parse error in forward" + f" | event_type=param_extraction_json_error" + f" error_id={generate_error_id()}" + f" exc_type={type(e).__name__} exc_msg={e!r}" + f" raw_extracted_params={_raw_ep!r}" + f" raw_missing_required={_raw_mr!r}" + ) fallback = self._safe_defaults(params_schema, already_collected) usage = get_lm_usage_since(history_length_before) update_observation_safe( @@ -355,7 +367,12 @@ def forward( return fallback except Exception as e: - logger.exception(f"Param extraction forward failed: {e}") + logger.error( + f"ParamExtractionModule: forward failed" + f" | event_type=param_extraction_failed" + f" error_id={generate_error_id()}" + f" exc_type={type(e).__name__} exc_msg={e!r}" + ) fallback = self._safe_defaults(params_schema, already_collected) usage = get_lm_usage_since(history_length_before) update_observation_safe( @@ -483,8 +500,8 @@ async def stream_forward( if prediction is None: logger.warning( - "ParamExtractionModule.stream_forward: no Prediction received — " - "falling back to blocking forward()" + "ParamExtractionModule: no Prediction received" + " | event_type=stream_no_prediction_received" ) result = await asyncio.to_thread( self.forward, @@ -512,7 +529,8 @@ async def stream_forward( if tokens: logger.debug( - f"ParamExtractionModule.stream_forward: streamed {len(tokens)} tokens" + f"ParamExtractionModule: stream tokens collected" + f" | event_type=stream_tokens_collected token_count={len(tokens)}" ) # Join all streamed token chunks into the complete assembled question for langfuse logging. @@ -543,7 +561,10 @@ async def stream_forward( except json.JSONDecodeError as e: logger.error( - f"ParamExtractionModule.stream_forward failed to parse JSON: {e}" + f"ParamExtractionModule: JSON parse error in stream_forward" + f" | event_type=stream_json_error" + f" error_id={generate_error_id()}" + f" exc_type={type(e).__name__} exc_msg={e!r}" ) fallback = self._safe_defaults(params_schema, already_collected) usage = get_lm_usage_since(history_length_before) @@ -566,7 +587,12 @@ async def stream_forward( return [], fallback except Exception as e: - logger.exception(f"ParamExtractionModule.stream_forward failed: {e}") + logger.error( + f"ParamExtractionModule: stream_forward failed" + f" | event_type=stream_extraction_failed" + f" error_id={generate_error_id()}" + f" exc_type={type(e).__name__} exc_msg={e!r}" + ) fallback = self._safe_defaults(params_schema, already_collected) usage = get_lm_usage_since(history_length_before) update_observation_safe( @@ -591,9 +617,11 @@ async def stream_forward( if output_stream is not None: try: await output_stream.aclose() - except Exception as cleanup_error: + except Exception as e: logger.debug( - f"Error during param extraction stream cleanup: {cleanup_error}" + f"ParamExtractionModule: stream cleanup error" + f" | event_type=stream_cleanup_error" + f" exc_type={type(e).__name__} exc_msg={e!r}" ) # ------------------------------------------------------------------ @@ -668,7 +696,10 @@ def _validate_param_type( return False, value # Unknown type — accept as string to avoid silent data loss - logger.warning(f"Unknown param type '{param_type}'; accepting as string") + logger.warning( + f"ParamExtractionModule: unknown param type" + f" | event_type=param_type_unknown param_type={param_type!r}" + ) return True, str_value def _format_conversation_history( @@ -766,8 +797,10 @@ def _parse_prediction( validated_params[param_name] = coerced else: logger.warning( - f"Extracted value for '{param_name}' failed type validation " - f"(expected {param_type}, got {raw_value!r})" + f"ParamExtractionModule: param value type mismatch" + f" | event_type=param_value_type_mismatch" + f" param_name={param_name!r} expected_type={param_type!r}" + f" raw_value={raw_value!r}" ) type_invalid_params.append(param_name) @@ -800,8 +833,7 @@ def _parse_prediction( # LLM incorrectly returned "none" despite missing params — reset to empty # string so callers receive a reliable signal that a follow-up is needed. logger.warning( - "LLM returned clarifying_question='none' but required params are " - f"still missing: {missing_required}. Resetting to empty string." + f"ParamExtractionModule: LLM returned 'none' despite missing params: {missing_required}" ) clarifying_question = "" diff --git a/src/tool_classifier/workflows/api_tool_workflow.py b/src/tool_classifier/workflows/api_tool_workflow.py index a9ba5511..dfb03bab 100644 --- a/src/tool_classifier/workflows/api_tool_workflow.py +++ b/src/tool_classifier/workflows/api_tool_workflow.py @@ -16,7 +16,7 @@ cast, ) -from loguru import logger +from src.loki_logger import LokiLogger from llm_orchestrator_config.feature_flags import FeatureFlags from models.request_models import ( @@ -39,6 +39,8 @@ from tool_classifier.multi_api_caller import MultiAPICaller from tool_classifier.multi_response_formatter import MultiResponseFormatterModule +logger = LokiLogger(service_name="api-tool-calling") + if TYPE_CHECKING: from guardrails.nemo_rails_adapter import NeMoRailsAdapter diff --git a/src/tool_classifier/workflows/context_workflow.py b/src/tool_classifier/workflows/context_workflow.py index 109d9142..8ffddd5c 100644 --- a/src/tool_classifier/workflows/context_workflow.py +++ b/src/tool_classifier/workflows/context_workflow.py @@ -4,7 +4,7 @@ import time import dspy from langfuse import observe -from loguru import logger +from src.loki_logger import LokiLogger from src.models.request_models import OrchestrationRequest, OrchestrationResponse from tool_classifier.base_workflow import BaseWorkflow @@ -20,6 +20,9 @@ OUTPUT_GUARDRAIL_VIOLATION_MESSAGE, ) +# Initialize Loki logger +logger = LokiLogger(service_name="context-workflow") + class ContextWorkflowExecutor(BaseWorkflow): """ diff --git a/src/tool_classifier/workflows/ood_workflow.py b/src/tool_classifier/workflows/ood_workflow.py index ed923879..dcf1c866 100644 --- a/src/tool_classifier/workflows/ood_workflow.py +++ b/src/tool_classifier/workflows/ood_workflow.py @@ -1,11 +1,14 @@ """OOD workflow executor - Layer 4: Out-of-domain fallback.""" from typing import Any, AsyncIterator, Dict, Optional -from loguru import logger +from src.loki_logger import LokiLogger from models.request_models import OrchestrationRequest, OrchestrationResponse from tool_classifier.base_workflow import BaseWorkflow +# Initialize Loki logger +logger = LokiLogger(service_name="ood-workflow") + class OODWorkflowExecutor(BaseWorkflow): """ diff --git a/src/tool_classifier/workflows/rag_workflow.py b/src/tool_classifier/workflows/rag_workflow.py index 9b3c588f..d84e5ae2 100644 --- a/src/tool_classifier/workflows/rag_workflow.py +++ b/src/tool_classifier/workflows/rag_workflow.py @@ -1,7 +1,7 @@ """RAG workflow executor - Layer 3: Knowledge base retrieval.""" from typing import Any, AsyncIterator, Dict, Optional, Union, cast, TYPE_CHECKING -from loguru import logger +from src.loki_logger import LokiLogger from models.request_models import ( OrchestrationRequest, @@ -14,6 +14,9 @@ if TYPE_CHECKING: from llm_orchestration_service import LLMOrchestrationService +# Initialize Loki logger +logger = LokiLogger(service_name="rag-workflow") + class RAGWorkflowExecutor(BaseWorkflow): """ diff --git a/src/tool_classifier/workflows/service_workflow.py b/src/tool_classifier/workflows/service_workflow.py index 5bf703ba..4b2c4572 100644 --- a/src/tool_classifier/workflows/service_workflow.py +++ b/src/tool_classifier/workflows/service_workflow.py @@ -6,7 +6,7 @@ import dspy import httpx from langfuse import observe -from loguru import logger +from src.loki_logger import LokiLogger from llm_orchestrator_config.llm_manager import LLMManager from src.guardrails.nemo_rails_adapter import NeMoRailsAdapter @@ -40,6 +40,9 @@ from tool_classifier.intent_detector import IntentDetectionModule import time +# Initialize Loki logger +logger = LokiLogger(service_name="service-workflow") + class LLMServiceProtocol(Protocol): """Protocol defining interface for LLM service embedding operations.""" diff --git a/src/utils/api_tool_session_store.py b/src/utils/api_tool_session_store.py index d6c4b01c..807e6de8 100644 --- a/src/utils/api_tool_session_store.py +++ b/src/utils/api_tool_session_store.py @@ -3,12 +3,14 @@ from typing import Any, Optional from fastapi import HTTPException, Request, status -from loguru import logger +from src.loki_logger import LokiLogger from redis import WatchError from models.session_models import APIToolSession from src.utils.redis_client import get_redis_client +logger = LokiLogger(service_name="api-tool-session-store") + _SESSION_KEY_PREFIX = "session:" _SESSION_TTL_SECONDS = 1800 # 30 minutes, sliding _UPDATE_MAX_RETRIES = 3 @@ -36,9 +38,7 @@ async def get(self, chat_id: str) -> Optional[APIToolSession]: """ client = get_redis_client() if client is None: - logger.warning( - "[SessionStore] Redis unavailable - get({}) skipped", chat_id - ) + logger.warning(f"[SessionStore] Redis unavailable - get({chat_id}) skipped") return None try: @@ -47,7 +47,7 @@ async def get(self, chat_id: str) -> Optional[APIToolSession]: return None return APIToolSession.model_validate_json(raw) except Exception as exc: - logger.error("[SessionStore] get({}) failed: {}", chat_id, exc) + logger.error(f"[SessionStore] get({chat_id}) failed: {exc}") return None async def save(self, session: APIToolSession) -> None: @@ -59,7 +59,7 @@ async def save(self, session: APIToolSession) -> None: client = get_redis_client() if client is None: logger.warning( - "[SessionStore] Redis unavailable - save({}) skipped", session.chat_id + f"[SessionStore] Redis unavailable - save({session.chat_id}) skipped" ) return @@ -69,9 +69,9 @@ async def save(self, session: APIToolSession) -> None: session.model_dump_json(), ex=_SESSION_TTL_SECONDS, ) - logger.debug("[SessionStore] Session saved for chat_id={}", session.chat_id) + logger.debug(f"[SessionStore] Session saved for chat_id={session.chat_id}") except Exception as exc: - logger.error("[SessionStore] save({}) failed: {}", session.chat_id, exc) + logger.error(f"[SessionStore] save({session.chat_id}) failed: {exc}") async def update(self, chat_id: str, **fields: Any) -> Optional[APIToolSession]: """Atomically update a session using optimistic locking (WATCH/MULTI/EXEC). @@ -97,7 +97,7 @@ async def update(self, chat_id: str, **fields: Any) -> Optional[APIToolSession]: client = get_redis_client() if client is None: logger.warning( - "[SessionStore] Redis unavailable - update({}) skipped", chat_id + f"[SessionStore] Redis unavailable - update({chat_id}) skipped" ) return None @@ -112,8 +112,7 @@ async def update(self, chat_id: str, **fields: Any) -> Optional[APIToolSession]: if raw is None: await pipe.unwatch() logger.warning( - "[SessionStore] update({}) - session not found, skipping", - chat_id, + f"[SessionStore] update({chat_id}) - session not found, skipping" ) return None @@ -125,27 +124,22 @@ async def update(self, chat_id: str, **fields: Any) -> Optional[APIToolSession]: await pipe.execute() logger.debug( - "[SessionStore] Session updated for chat_id={}", chat_id + f"[SessionStore] Session updated for chat_id={chat_id}" ) return updated except WatchError: logger.debug( - "[SessionStore] update({}) - concurrent modification detected, " - "retrying (attempt {}/{})", - chat_id, - attempt + 1, - _UPDATE_MAX_RETRIES, + f"[SessionStore] update({chat_id}) - concurrent modification detected, " + f"retrying (attempt {attempt + 1}/{_UPDATE_MAX_RETRIES})" ) continue except Exception as exc: - logger.error("[SessionStore] update({}) failed: {}", chat_id, exc) + logger.error(f"[SessionStore] update({chat_id}) failed: {exc}") return None logger.error( - "[SessionStore] update({}) - exhausted {} retries due to concurrent writes", - chat_id, - _UPDATE_MAX_RETRIES, + f"[SessionStore] update({chat_id}) - exhausted {_UPDATE_MAX_RETRIES} retries due to concurrent writes" ) return None @@ -158,15 +152,15 @@ async def delete(self, chat_id: str) -> None: client = get_redis_client() if client is None: logger.warning( - "[SessionStore] Redis unavailable - delete({}) skipped", chat_id + f"[SessionStore] Redis unavailable - delete({chat_id}) skipped" ) return try: await client.delete(_key(chat_id)) - logger.debug("[SessionStore] Session deleted for chat_id={}", chat_id) + logger.debug(f"[SessionStore] Session deleted for chat_id={chat_id}") except Exception as exc: - logger.error("[SessionStore] delete({}) failed: {}", chat_id, exc) + logger.error(f"[SessionStore] delete({chat_id}) failed: {exc}") async def exists(self, chat_id: str) -> bool: """Check whether a session exists for the given chat_id. @@ -181,7 +175,7 @@ async def exists(self, chat_id: str) -> bool: try: return bool(await client.exists(_key(chat_id))) except Exception as exc: - logger.error("[SessionStore] exists({}) failed: {}", chat_id, exc) + logger.error(f"[SessionStore] exists({chat_id}) failed: {exc}") return False @@ -197,8 +191,7 @@ def require_session_store(request: Request) -> APIToolSessionStore: ) if store is None: logger.error( - "[SessionStore] Session store unavailable — returning 503 for {}", - request.url.path, + f"[SessionStore] Session store unavailable — returning 503 for {request.url.path}" ) raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, diff --git a/src/utils/atc_cache_store.py b/src/utils/atc_cache_store.py index 822648ee..9e203709 100644 --- a/src/utils/atc_cache_store.py +++ b/src/utils/atc_cache_store.py @@ -4,7 +4,7 @@ import json from typing import Any -from loguru import logger +from src.loki_logger import LokiLogger from models.session_models import LastCallContext from tool_classifier.constants import ( @@ -15,6 +15,8 @@ ) from src.utils.redis_client import get_redis_client +logger = LokiLogger(service_name="atc-cache-store") + class ATCCacheStore: """Two-tier response cache for the ATC workflow. @@ -94,10 +96,7 @@ async def get_l1( return json.loads(raw) except Exception as exc: logger.warning( - "[ATCCache] get_l1 failed: chat_id={} api_name={!r} error={}", - chat_id, - api_name, - exc, + f"[ATCCache] get_l1 failed: chat_id={chat_id} api_name={api_name!r} error={exc}" ) return None @@ -128,17 +127,11 @@ async def set_l1( ex=ttl, ) logger.debug( - "[ATCCache] L1 set: chat_id={} api_name={!r} ttl={}s", - chat_id, - api_name, - ttl, + f"[ATCCache] L1 set: chat_id={chat_id} api_name={api_name!r} ttl={ttl}s" ) except Exception as exc: logger.warning( - "[ATCCache] set_l1 failed: chat_id={} api_name={!r} error={}", - chat_id, - api_name, - exc, + f"[ATCCache] set_l1 failed: chat_id={chat_id} api_name={api_name!r} error={exc}" ) # ------------------------------------------------------------------ @@ -157,9 +150,7 @@ async def get_l2(self, chat_id: str) -> list[LastCallContext] | None: data: list[dict[str, Any]] = json.loads(raw) return [LastCallContext.model_validate(item) for item in data] except Exception as exc: - logger.warning( - "[ATCCache] get_l2 failed: chat_id={} error={}", chat_id, exc - ) + logger.warning(f"[ATCCache] get_l2 failed: chat_id={chat_id} error={exc}") return None async def set_l2(self, chat_id: str, contexts: list[LastCallContext]) -> None: @@ -182,14 +173,10 @@ async def set_l2(self, chat_id: str, contexts: list[LastCallContext]) -> None: ex=ATC_LAST_CALL_TTL_SECONDS, ) logger.debug( - "[ATCCache] L2 set: chat_id={} entries={}", - chat_id, - len(contexts), + f"[ATCCache] L2 set: chat_id={chat_id} entries={len(contexts)}" ) except Exception as exc: - logger.warning( - "[ATCCache] set_l2 failed: chat_id={} error={}", chat_id, exc - ) + logger.warning(f"[ATCCache] set_l2 failed: chat_id={chat_id} error={exc}") async def invalidate_l2(self, chat_id: str) -> None: """Delete the L2 last-call context key for a chat session. @@ -203,8 +190,8 @@ async def invalidate_l2(self, chat_id: str) -> None: return try: await client.delete(self._l2_key(chat_id)) - logger.debug("[ATCCache] L2 invalidated: chat_id={}", chat_id) + logger.debug(f"[ATCCache] L2 invalidated: chat_id={chat_id}") except Exception as exc: logger.warning( - "[ATCCache] invalidate_l2 failed: chat_id={} error={}", chat_id, exc + f"[ATCCache] invalidate_l2 failed: chat_id={chat_id} error={exc}" ) diff --git a/src/utils/budget_tracker.py b/src/utils/budget_tracker.py index f82000a1..58d356b2 100644 --- a/src/utils/budget_tracker.py +++ b/src/utils/budget_tracker.py @@ -1,11 +1,14 @@ """Budget tracking utility for LLM connection usage.""" from typing import Optional, Dict, Any, cast, List -from loguru import logger +from src.loki_logger import LokiLogger import requests from ..llm_orchestrator_config.llm_ochestrator_constants import RAG_SEARCH_RESQL +# Initialize Loki logger +logger = LokiLogger(service_name="budget-tracker") + class BudgetTracker: """Handles budget updates for LLM connections using vault_uuid.""" diff --git a/src/utils/connection_id_fetcher.py b/src/utils/connection_id_fetcher.py index 8cbbb717..ac17f6cc 100644 --- a/src/utils/connection_id_fetcher.py +++ b/src/utils/connection_id_fetcher.py @@ -8,12 +8,15 @@ import asyncio import threading from typing import Optional, Dict, Any -from loguru import logger +from src.loki_logger import LokiLogger import requests import aiohttp from src.llm_orchestrator_config.llm_ochestrator_constants import RAG_SEARCH_RESQL +# Initialize Loki logger +logger = LokiLogger(service_name="connection-id-fetcher") + class ConnectionIdFetcher: """ diff --git a/src/utils/cost_utils.py b/src/utils/cost_utils.py index a3725de5..b4c1a0e0 100644 --- a/src/utils/cost_utils.py +++ b/src/utils/cost_utils.py @@ -1,10 +1,11 @@ """Cost calculation utilities for LLM usage tracking.""" from typing import Dict, Any, List, Tuple -import logging +from src.loki_logger import LokiLogger import dspy -logger = logging.getLogger(__name__) +# Initialize Loki logger for cost tracking +logger = LokiLogger(service_name="cost-utils") def _to_float(value: str | int | float | bytes | bytearray | None) -> float: diff --git a/src/utils/decrypt_vault_secrets.py b/src/utils/decrypt_vault_secrets.py index 6a47507f..e0246230 100644 --- a/src/utils/decrypt_vault_secrets.py +++ b/src/utils/decrypt_vault_secrets.py @@ -12,15 +12,15 @@ from cryptography.hazmat.primitives import serialization, hashes from cryptography.hazmat.primitives.asymmetric import padding, rsa from cryptography.hazmat.backends import default_backend -from loguru import logger - -# Configure logger to write ONLY to stderr (stdout is reserved for decrypted value) -logger.remove() # Remove default handler -logger.add( - sys.stderr, - format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name} - {message}", - level="INFO", -) + +try: + # In cron-manager: loki_logger.py is mounted into /app/src/vector_indexer/ + from loki_logger import LokiLogger +except ModuleNotFoundError: + # In the main llm-service: import via the full src package path. + from src.loki_logger import LokiLogger + +logger = LokiLogger(service_name="vault-secrets-decryptor") def base64url_to_bytes(base64url_string: str) -> bytes: diff --git a/src/utils/input_sanitizer.py b/src/utils/input_sanitizer.py index b0bd146f..c0c1bae9 100644 --- a/src/utils/input_sanitizer.py +++ b/src/utils/input_sanitizer.py @@ -3,7 +3,10 @@ import re import html from typing import Optional, List, Dict, Any -from loguru import logger +from src.loki_logger import LokiLogger + +# Initialize Loki logger +logger = LokiLogger(service_name="input-sanitizer") class InputSanitizer: diff --git a/src/utils/language_detector.py b/src/utils/language_detector.py index db79988a..92755836 100644 --- a/src/utils/language_detector.py +++ b/src/utils/language_detector.py @@ -5,7 +5,10 @@ import re from typing import Literal -from loguru import logger +from src.loki_logger import LokiLogger + +# Initialize Loki logger +logger = LokiLogger(service_name="language-detector") LanguageCode = Literal["et", "ru", "en"] diff --git a/src/utils/production_store.py b/src/utils/production_store.py index f4b48856..742803b4 100644 --- a/src/utils/production_store.py +++ b/src/utils/production_store.py @@ -8,13 +8,17 @@ from typing import Dict, List, Any, Optional from datetime import datetime import json -from loguru import logger +from src.loki_logger import LokiLogger import requests import aiohttp + from src.llm_orchestrator_config.llm_ochestrator_constants import ( RAG_SEARCH_RUUTER_PUBLIC, ) +# Initialize Loki logger +logger = LokiLogger(service_name="production-store") + class ProductionInferenceStore: """ diff --git a/src/utils/prompt_config_loader.py b/src/utils/prompt_config_loader.py index bd977657..f4b00053 100644 --- a/src/utils/prompt_config_loader.py +++ b/src/utils/prompt_config_loader.py @@ -7,7 +7,10 @@ import time import threading from enum import Enum -from loguru import logger +from src.loki_logger import LokiLogger + +# Initialize Loki logger +logger = LokiLogger(service_name="prompt-config-loader") class PromptConfigLoadError(Exception): diff --git a/src/utils/rate_limiter.py b/src/utils/rate_limiter.py index 074a0f6f..b875715d 100644 --- a/src/utils/rate_limiter.py +++ b/src/utils/rate_limiter.py @@ -4,12 +4,14 @@ from collections import defaultdict, deque from typing import Dict, Deque, Optional, Any from threading import Lock - -from loguru import logger from pydantic import BaseModel, Field, ConfigDict +from src.loki_logger import LokiLogger from src.llm_orchestrator_config.stream_config import StreamConfig +# Initialize Loki logger +logger = LokiLogger(service_name="rate-limiter") + class RateLimitResult(BaseModel): """Result of rate limit check.""" diff --git a/src/utils/redis_client.py b/src/utils/redis_client.py index 960a9752..ffeca9f8 100644 --- a/src/utils/redis_client.py +++ b/src/utils/redis_client.py @@ -4,7 +4,9 @@ from typing import Any, Optional import redis.asyncio as aioredis -from loguru import logger +from src.loki_logger import LokiLogger + +logger = LokiLogger(service_name="redis_client") _redis_client: Optional[aioredis.Redis] = None # type: ignore[type-arg] @@ -69,7 +71,7 @@ async def init_redis_client() -> aioredis.Redis: # Verify connectivity await _redis_client.ping() logger.info( - "Redis session store connected (db={})", os.getenv("REDIS_SESSION_DB", "1") + f"Redis session store connected (db={os.getenv('REDIS_SESSION_DB', '1')})" ) return _redis_client diff --git a/src/utils/stream_manager.py b/src/utils/stream_manager.py index e12296ea..8486797c 100644 --- a/src/utils/stream_manager.py +++ b/src/utils/stream_manager.py @@ -4,13 +4,16 @@ from datetime import datetime from contextlib import asynccontextmanager import asyncio -from loguru import logger +from src.loki_logger import LokiLogger from pydantic import BaseModel, Field, ConfigDict from src.llm_orchestrator_config.stream_config import StreamConfig from src.llm_orchestrator_config.exceptions import StreamError from src.utils.error_utils import generate_error_id +# Initialize Loki logger +logger = LokiLogger(service_name="stream-manager") + class StreamContext(BaseModel): """Context for tracking a single stream's lifecycle.""" diff --git a/src/utils/time_tracker.py b/src/utils/time_tracker.py index 619030d7..5f9a4816 100644 --- a/src/utils/time_tracker.py +++ b/src/utils/time_tracker.py @@ -1,7 +1,10 @@ """Simple time tracking for orchestration service steps.""" from typing import Dict, Optional -from loguru import logger +from src.loki_logger import LokiLogger + +# Initialize Loki logger +logger = LokiLogger(service_name="time-tracker") def log_step_timings( diff --git a/src/vector_indexer/api_client.py b/src/vector_indexer/api_client.py index fa4f2d29..0f43e824 100644 --- a/src/vector_indexer/api_client.py +++ b/src/vector_indexer/api_client.py @@ -3,11 +3,14 @@ import asyncio from typing import List, Dict, Any, Optional, Union import httpx -from loguru import logger from typing_extensions import Self +from loki_logger import LokiLogger from vector_indexer.config.config_loader import VectorIndexerConfig +# Initialize Loki logger +logger = LokiLogger(service_name="api-client") + class LLMOrchestrationAPIClient: """Client for calling LLM Orchestration Service API endpoints.""" diff --git a/src/vector_indexer/config/config_loader.py b/src/vector_indexer/config/config_loader.py index 24af5d76..be107577 100644 --- a/src/vector_indexer/config/config_loader.py +++ b/src/vector_indexer/config/config_loader.py @@ -4,7 +4,7 @@ from pathlib import Path from typing import Optional, List, Dict, Any from pydantic import BaseModel, Field, field_validator, model_validator -from loguru import logger +from loki_logger import LokiLogger from vector_indexer.constants import ( DocumentConstants, @@ -13,6 +13,9 @@ ProcessingConstants, ) +# Initialize Loki logger +logger = LokiLogger(service_name="config-loader") + class ChunkingConfig(BaseModel): """Configuration for document chunking operations""" diff --git a/src/vector_indexer/contextual_processor.py b/src/vector_indexer/contextual_processor.py index 6b21d326..9b69641c 100644 --- a/src/vector_indexer/contextual_processor.py +++ b/src/vector_indexer/contextual_processor.py @@ -3,7 +3,7 @@ import asyncio import tiktoken from typing import List, Dict, Any, Optional -from loguru import logger +from loki_logger import LokiLogger from vector_indexer.config.config_loader import VectorIndexerConfig from vector_indexer.models import ProcessingDocument, BaseChunk, ContextualChunk @@ -11,6 +11,9 @@ from vector_indexer.error_logger import ErrorLogger from vector_indexer.constants import ChunkingConstants, ProcessingConstants +# Initialize Loki logger +logger = LokiLogger(service_name="contextual-processor") + class ContextualProcessor: """Processes documents into contextual chunks using Anthropic methodology.""" diff --git a/src/vector_indexer/dataset_download.py b/src/vector_indexer/dataset_download.py index ebd95901..c3a3741f 100644 --- a/src/vector_indexer/dataset_download.py +++ b/src/vector_indexer/dataset_download.py @@ -4,7 +4,10 @@ import tempfile from pathlib import Path import requests -from loguru import logger +from loki_logger import LokiLogger + +# Initialize Loki logger +logger = LokiLogger(service_name="dataset-download") def download_and_extract_dataset(signed_url: str) -> tuple[str, int]: diff --git a/src/vector_indexer/diff_identifier/diff_detector.py b/src/vector_indexer/diff_identifier/diff_detector.py index 46edd3dd..51f77b72 100644 --- a/src/vector_indexer/diff_identifier/diff_detector.py +++ b/src/vector_indexer/diff_identifier/diff_detector.py @@ -3,7 +3,7 @@ import os from pathlib import Path from typing import List, Optional, Dict, Any -from loguru import logger +from loki_logger import LokiLogger import hashlib from diff_identifier.diff_models import DiffConfig, DiffError, DiffResult @@ -12,6 +12,9 @@ load_dotenv(".env") +# Initialize Loki logger +logger = LokiLogger(service_name="diff-detector") + class DiffDetector: """Main orchestrator for diff identification.""" @@ -144,13 +147,14 @@ async def mark_files_processed( logger.info(f"Marking {len(processed_file_paths)} files as processed...") - # Log chunks_info received + # Log chunks_info summary only (avoid massive logs) if chunks_info: - logger.info(f"RECEIVED CHUNKS INFO: {len(chunks_info)} documents") - for doc_hash, info in chunks_info.items(): - logger.info( - f" {doc_hash[:12]}... -> {info.get('chunk_count', 0)} chunks" - ) + total_chunks = sum( + info.get("chunk_count", 0) for info in chunks_info.values() + ) + logger.info( + f"RECEIVED CHUNKS INFO: {len(chunks_info)} documents, {total_chunks} total chunks" + ) else: logger.warning("No chunks_info provided to mark_files_processed") diff --git a/src/vector_indexer/diff_identifier/s3_ferry_client.py b/src/vector_indexer/diff_identifier/s3_ferry_client.py index bebb7464..f423bdd4 100644 --- a/src/vector_indexer/diff_identifier/s3_ferry_client.py +++ b/src/vector_indexer/diff_identifier/s3_ferry_client.py @@ -5,12 +5,16 @@ import time from typing import Any, Callable, Dict, Optional import requests -from loguru import logger +from loki_logger import LokiLogger + from typing_extensions import Self from diff_identifier.diff_models import DiffConfig, DiffError from constants import get_s3_ferry_payload +# Initialize Loki logger +logger = LokiLogger(service_name="s3-ferry-client") + class S3Ferry: """Client for interacting with S3Ferry service.""" diff --git a/src/vector_indexer/diff_identifier/version_manager.py b/src/vector_indexer/diff_identifier/version_manager.py index d7df5a83..a0592093 100644 --- a/src/vector_indexer/diff_identifier/version_manager.py +++ b/src/vector_indexer/diff_identifier/version_manager.py @@ -5,7 +5,8 @@ from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Set, Any -from loguru import logger +from loki_logger import LokiLogger + from typing_extensions import Self from diff_identifier.diff_models import ( @@ -16,6 +17,9 @@ ) from diff_identifier.s3_ferry_client import S3FerryClient +# Initialize Loki logger +logger = LokiLogger(service_name="version-manager") + class VersionManager: """Manages DVC operations and version tracking.""" diff --git a/src/vector_indexer/document_loader.py b/src/vector_indexer/document_loader.py index 9e03b290..6c572834 100644 --- a/src/vector_indexer/document_loader.py +++ b/src/vector_indexer/document_loader.py @@ -6,12 +6,15 @@ from typing import List from urllib.parse import urlparse -from loguru import logger +from loki_logger import LokiLogger from vector_indexer.config.config_loader import VectorIndexerConfig from vector_indexer.models import DocumentInfo, ProcessingDocument from vector_indexer.constants import DocumentConstants +# Initialize Loki logger +logger = LokiLogger(service_name="document-loader") + class DocumentLoadError(Exception): """Custom exception for document loading failures.""" diff --git a/src/vector_indexer/error_logger.py b/src/vector_indexer/error_logger.py index c62de79c..4a1db2aa 100644 --- a/src/vector_indexer/error_logger.py +++ b/src/vector_indexer/error_logger.py @@ -1,13 +1,15 @@ """Enhanced error logging for vector indexer.""" import json -import sys from pathlib import Path -from loguru import logger +from loki_logger import LokiLogger from vector_indexer.config.config_loader import VectorIndexerConfig from vector_indexer.models import ProcessingError, ProcessingStats +# Initialize Loki logger +logger = LokiLogger(service_name="error-logger") + class ErrorLogger: """Enhanced error logging with file-based failure tracking.""" @@ -27,24 +29,10 @@ def _ensure_log_directories(self) -> None: Path(log_file).parent.mkdir(parents=True, exist_ok=True) def _setup_logging(self) -> None: - """Setup loguru logging with file output.""" - logger.remove() # Remove default handler - - # Console logging - logger.add( - sys.stdout, - level=self.config.log_level, - format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}", - ) - - # File logging - logger.add( - self.config.processing_log_file, - level=self.config.log_level, - format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}", - rotation="10 MB", - retention="7 days", - ) + """Setup logging - LokiLogger handles all logging (console + Loki service).""" + # LokiLogger is already configured and handles console output + Loki integration + # No additional configuration needed + pass def log_document_failure( self, document_hash: str, error: str, retry_count: int = 0 diff --git a/src/vector_indexer/loki_logger.py b/src/vector_indexer/loki_logger.py index e69de29b..59a901bc 100644 --- a/src/vector_indexer/loki_logger.py +++ b/src/vector_indexer/loki_logger.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python3 +""" +Loki Logger for RAG Module +Sends logs directly to Loki API for centralized logging +""" + +import json +import time +from datetime import datetime +from threading import Thread +from queue import Full, Queue + +import requests + + +class LokiLogger: + """Simple logger that sends logs directly to Loki API with async background thread""" + + _instances: dict[str, "LokiLogger"] = {} + + def __new__( + cls, loki_url: str = "http://loki:3100", service_name: str = "default" + ) -> "LokiLogger": + key = f"{loki_url}:{service_name}" + if key not in cls._instances: + cls._instances[key] = super().__new__(cls) + return cls._instances[key] + + def __init__( + self, loki_url: str = "http://loki:3100", service_name: str = "default" + ) -> None: + """ + Initialize LokiLogger + + Args: + loki_url: URL for Loki service (default: container URL in bykstack network) + service_name: Name of the service for labeling logs + """ + if hasattr(self, "_initialized"): + return + self._initialized = True + self.loki_url = loki_url + self.service_name = service_name + self.session = requests.Session() + # Set default timeout for all requests + self.timeout = 5 + + # Queue for async log processing (bounded to avoid unbounded memory growth under load) + self.log_queue: Queue[tuple[str, str]] = Queue(maxsize=10_000) + + # Start background worker thread + self.worker_thread = Thread(target=self._process_logs, daemon=True) + self.worker_thread.start() + + def _process_logs(self) -> None: + """Background worker that processes log queue""" + while True: + try: + # Get log entry from queue (blocking) + level, message = self.log_queue.get() + + # Send to Loki + self._send_to_loki_sync(level, message) + + # Mark task as done + self.log_queue.task_done() + except Exception: + # Silently ignore errors in background thread + pass + + def _send_to_loki_sync(self, level: str, message: str) -> None: + """Send log entry directly to Loki API (called from background thread)""" + try: + # Create timestamp in nanoseconds (Loki requirement) + timestamp_ns = str(int(time.time() * 1_000_000_000)) + + # Prepare labels for Loki + labels = { + "service": self.service_name, + "level": level, + } + + # Create log entry + log_entry = { + "level": level, + "message": message, + "service": self.service_name, + } + + # Prepare Loki payload + payload = { + "streams": [ + { + "stream": labels, + "values": [[timestamp_ns, json.dumps(log_entry)]], + } + ] + } + + # Send to Loki + self.session.post( + f"{self.loki_url}/loki/api/v1/push", + json=payload, + headers={"Content-Type": "application/json"}, + timeout=self.timeout, + ) + + except Exception: + # Silently ignore logging errors to not affect main application + pass + + def _log(self, level: str, message: str) -> None: + """Queue log entry for async processing (non-blocking)""" + # Print to console immediately for real-time feedback + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + print(f"[{timestamp}] {level: <8} | {message}") # noqa: T201 + + # Queue for async Loki sending (non-blocking) + try: + self.log_queue.put_nowait((level, message)) + except Full: + # Queue full (Loki may be slow/unreachable) - drop log to avoid blocking + pass + + def info(self, message: str, **kwargs: object) -> None: + """Log info message. Extra kwargs (extra, exc_info) are ignored for compatibility.""" + self._log("INFO", message) + + def error(self, message: str, **kwargs: object) -> None: + """Log error message. Extra kwargs (extra, exc_info) are ignored for compatibility.""" + self._log("ERROR", message) + + def warning(self, message: str, **kwargs: object) -> None: + """Log warning message. Extra kwargs (extra, exc_info) are ignored for compatibility.""" + self._log("WARNING", message) + + def debug(self, message: str, **kwargs: object) -> None: + """Log debug message. Extra kwargs (extra, exc_info) are ignored for compatibility.""" + self._log("DEBUG", message) + + def success(self, message: str, **kwargs: object) -> None: + """Log success message (loguru compatibility). Extra kwargs ignored.""" + self._log("SUCCESS", message) + + def critical(self, message: str, **kwargs: object) -> None: + """Log critical message. Extra kwargs (extra, exc_info) are ignored for compatibility.""" + self._log("CRITICAL", message) + + def exception(self, message: str, **kwargs: object) -> None: + """Log exception message. Extra kwargs (extra, exc_info) are ignored for compatibility.""" + self._log("EXCEPTION", message) + + def add(self, *args: object, **kwargs: object) -> None: + """ + No-op method for loguru compatibility. + + LokiLogger sends logs to Loki/console only, not to files. + This method exists for backward compatibility with loguru code. + """ + pass # Silently ignore - logs go to Loki instead of files + + def remove(self, *args: object, **kwargs: object) -> None: + """No-op method for loguru compatibility.""" + pass # Silently ignore + + def bind(self, **kwargs: object) -> "LokiLogger": + """No-op method for loguru compatibility. Returns self for chaining.""" + return self # Allow method chaining + + def opt(self, **kwargs: object) -> "LokiLogger": + """No-op method for loguru compatibility. Returns self for chaining.""" + return self # Allow method chaining diff --git a/src/vector_indexer/main_indexer.py b/src/vector_indexer/main_indexer.py index bf407682..63596e90 100644 --- a/src/vector_indexer/main_indexer.py +++ b/src/vector_indexer/main_indexer.py @@ -7,10 +7,9 @@ from pathlib import Path from datetime import datetime from typing import List, Optional, Dict, Any -from loguru import logger +from loki_logger import LokiLogger import hashlib - # Add src to path for imports sys.path.append(str(Path(__file__).parent.parent)) @@ -22,7 +21,10 @@ from vector_indexer.models import ProcessingStats, DocumentInfo from vector_indexer.diff_identifier import DiffDetector, create_diff_config, DiffError from vector_indexer.diff_identifier.diff_models import DiffResult -from src.vector_indexer.dataset_download import download_and_extract_dataset +from vector_indexer.dataset_download import download_and_extract_dataset + +# Initialize Loki logger +logger = LokiLogger(service_name="main-indexer") class VectorIndexer: @@ -217,14 +219,11 @@ async def process_all_documents(self) -> ProcessingStats: f"CHUNK COUNT: Document {doc_info.document_hash[:12]}... (content: {content_hash[:12]}...) -> {chunk_count} chunks ({failed_chunks} failed)" ) - # Log the complete chunks_info dictionary + # Log summary only (avoid massive logs for large datasets) + total_chunks = sum(info["chunk_count"] for info in chunks_info.values()) logger.info( - f"CHUNKS INFO SUMMARY: {len(chunks_info)} documents tracked" + f"CHUNKS INFO SUMMARY: {len(chunks_info)} documents tracked, {total_chunks} total chunks" ) - for doc_hash, info in chunks_info.items(): - logger.info( - f" {doc_hash[:12]}... -> {info['chunk_count']} chunks" - ) # Calculate final statistics self.stats.end_time = datetime.now() @@ -664,22 +663,8 @@ async def main() -> int: parser.add_argument("--signed-url", help="Signed URL for dataset download") args = parser.parse_args() - # Configure logging - logger.remove() # Remove default handler - logger.add( - sys.stdout, - format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}", - level="INFO", - ) - - # Add file logging - logger.add( - "vector_indexer.log", - format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}", - level="DEBUG", - rotation="10 MB", - retention="7 days", - ) + # LokiLogger handles all logging (console + Loki service) + # No additional configuration needed indexer = None try: diff --git a/src/vector_indexer/qdrant_manager.py b/src/vector_indexer/qdrant_manager.py index 08664652..d465bcd2 100644 --- a/src/vector_indexer/qdrant_manager.py +++ b/src/vector_indexer/qdrant_manager.py @@ -1,7 +1,7 @@ """Qdrant vector database manager for storing contextual chunks.""" from typing import List, Dict, Any, Optional -from loguru import logger +from loki_logger import LokiLogger import httpx import uuid from typing_extensions import Self @@ -9,6 +9,8 @@ from vector_indexer.config.config_loader import VectorIndexerConfig from vector_indexer.models import ContextualChunk +logger = LokiLogger(service_name="qdrant-manager") + class QdrantOperationError(Exception): """Custom exception for Qdrant operations.""" @@ -171,22 +173,10 @@ async def _store_chunks_in_collection( upsert_payload = {"points": batch} - # DEBUG: Log the actual HTTP request payload being sent to Qdrant - logger.info("=== QDRANT HTTP REQUEST PAYLOAD DEBUG ===") - logger.info( - f"URL: {self.qdrant_url}/collections/{collection_name}/points" + # Log batch summary only (avoid massive logs) + logger.debug( + f"Storing batch {i // batch_size + 1}: {len(batch)} points to {collection_name}" ) - logger.info("Method: PUT") - logger.info(f"Batch size: {len(batch)} points") - for idx, point in enumerate(batch): - logger.info(f"Point {idx + 1}:") - logger.info(f" ID: {point['id']} (type: {type(point['id'])})") - logger.info( - f" Vector length: {len(point['vector'])} (type: {type(point['vector'])})" - ) - logger.info(f" Vector sample: {point['vector'][:3]}...") - logger.info(f" Payload keys: {list(point['payload'].keys())}") - logger.info("=== END QDRANT REQUEST DEBUG ===") response = await self.client.put( f"{self.qdrant_url}/collections/{collection_name}/points", diff --git a/tests/api_tool_eval/test-endpoints.json b/tests/api_tool_eval/test-endpoints.json index a7d86b09..d5c729e1 100644 --- a/tests/api_tool_eval/test-endpoints.json +++ b/tests/api_tool_eval/test-endpoints.json @@ -31,8 +31,8 @@ "url": "https://dashboard.elering.ee/api/nps/price", "method": "GET", "params": [ - { "name": "start", "type": "datetime", "required": true, "description": "Alguskuup\u00e4ev ja -aeg UTC formaadis (YYYY-MM-DDTHH:MM:SSZ). N\u00e4ide: 2025-01-01T00:00:00Z. Kui kasutaja annab ainult kuup\u00e4eva, kasuta alguseks T00:00:00Z." }, - { "name": "end", "type": "datetime", "required": true, "description": "L\u00f5ppkuup\u00e4ev ja -aeg UTC formaadis (YYYY-MM-DDTHH:MM:SSZ). N\u00e4ide: 2025-01-31T23:59:59Z. Kui kasutaja annab ainult kuup\u00e4eva, kasuta l\u00f5puks T23:59:59Z." } + { "name": "start", "type": "datetime", "required": true, "description": "Alguskuupäev ja -aeg UTC formaadis (YYYY-MM-DDTHH:MM:SSZ). Näide: 2025-01-01T00:00:00Z. Kui kasutaja annab ainult kuupäeva, kasuta alguseks T00:00:00Z. Ei tohi olla tulevane kuupäev — kui kasutaja sisestab tulevase kuupäeva, keeldu ja palu kehtiv mineviku- või tänane kuupäev." }, + { "name": "end", "type": "datetime", "required": true, "description": "Lõppkuupäev ja -aeg UTC formaadis (YYYY-MM-DDTHH:MM:SSZ). Näide: 2025-01-31T23:59:59Z. Kui kasutaja annab ainult kuupäeva, kasuta lõpuks T23:59:59Z. Ei tohi olla tulevane kuupäev — kui kasutaja sisestab tulevase kuupäeva, keeldu ja palu kehtiv mineviku- või tänane kuupäev." } ] }, { @@ -54,8 +54,8 @@ "url": "https://api.riigikogu.ee/api/votings", "method": "GET", "params": [ - { "name": "startDate", "type": "date", "required": true, "description": "Alguskuupäev (YYYY-MM-DD)" }, - { "name": "endDate", "type": "date", "required": true, "description": "Lõppkuupäev (YYYY-MM-DD)" }, + { "name": "startDate", "type": "date", "required": true, "description": "Alguskuupäev (YYYY-MM-DD). Ei tohi olla tulevane kuupäev — kui kasutaja sisestab tulevase kuupäeva, keeldu ja palu kehtiv mineviku- või tänane kuupäev." }, + { "name": "endDate", "type": "date", "required": true, "description": "Lõppkuupäev (YYYY-MM-DD). Ei tohi olla tulevane kuupäev — kui kasutaja sisestab tulevase kuupäeva, keeldu ja palu kehtiv mineviku- või tänane kuupäev." }, { "name": "lang", "type": "string", "required": false, "description": "Vastuse keel (ET, EN, RU). Täidetakse automaatselt kasutaja keele põhjal." } ] }, @@ -66,8 +66,8 @@ "url": "https://api.riigikogu.ee/api/statistics/participations/plenary", "method": "GET", "params": [ - { "name": "startDate", "type": "date", "required": true, "description": "Alguskuupäev (YYYY-MM-DD)" }, - { "name": "endDate", "type": "date", "required": true, "description": "Lõppkuupäev (YYYY-MM-DD)" }, + { "name": "startDate", "type": "date", "required": true, "description": "Alguskuupäev (YYYY-MM-DD). Ei tohi olla tulevane kuupäev — kui kasutaja sisestab tulevase kuupäeva, keeldu ja palu kehtiv mineviku- või tänane kuupäev." }, + { "name": "endDate", "type": "date", "required": true, "description": "Lõppkuupäev (YYYY-MM-DD). Ei tohi olla tulevane kuupäev — kui kasutaja sisestab tulevase kuupäeva, keeldu ja palu kehtiv mineviku- või tänane kuupäev." }, { "name": "lang", "type": "string", "required": false, "description": "Vastuse keel (ET, EN, RU). Täidetakse automaatselt kasutaja keele põhjal." } ] },