Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DSL/CronManager/script/api_tool_indexer.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ echo "[PACKAGES] Installing required packages..."
"$UV_BIN" pip install --python "$VENV_PATH/bin/python3" "httpx>=0.27.0" || exit 1
"$UV_BIN" pip install --python "$VENV_PATH/bin/python3" "pydantic>=2.11.7" || exit 1
"$UV_BIN" pip install --python "$VENV_PATH/bin/python3" "qdrant-client>=1.15.1" || exit 1
"$UV_BIN" pip install --python "$VENV_PATH/bin/python3" "loguru>=0.7.3" || exit 1
"$UV_BIN" pip install --python "$VENV_PATH/bin/python3" "requests>=2.32.5" || exit 1

echo "[PACKAGES] All packages installed successfully"

Expand Down
5 changes: 4 additions & 1 deletion docker-compose-ec2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ services:
- ./src/tool_classifier:/app/src/tool_classifier
- ./src/intent_data_enrichment:/app/src/intent_data_enrichment
- ./src/api_tool_indexer:/app/src/api_tool_indexer
- ./src/__init__.py:/app/src/__init__.py:ro
- ./src/loki_logger.py:/app/src/loki_logger.py:ro
- ./src/utils/decrypt_vault_secrets.py:/app/src/utils/decrypt_vault_secrets.py:ro # Decryption utility (read-only)
- cron_data:/app/data
- shared-volume:/app/shared # Access to shared resources for cross-container coordination
Expand All @@ -192,7 +194,7 @@ services:
- ./.env:/app/.env:ro
environment:
- server.port=9010
- PYTHONPATH=/app:/app/src/vector_indexer:/app/src/intent_data_enrichment:/app/src/api_tool_indexer
- PYTHONPATH=/app:/app/src:/app/src/vector_indexer:/app/src/intent_data_enrichment:/app/src/api_tool_indexer
- VAULT_AGENT_URL=http://vault-agent-cron:8203
ports:
- 9010:8080
Expand Down Expand Up @@ -645,6 +647,7 @@ services:
- ./src/llm_config_module/config:/app/src/llm_config_module/config:ro
- ./src/optimization/optimized_modules:/app/src/optimization/optimized_modules
- llm_orchestration_logs:/app/logs
- ./grafana-configs/loki_logger.py:/app/src/loki_logger.py:ro
networks:
- bykstack
depends_on:
Expand Down
5 changes: 4 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ services:
- ./src/tool_classifier:/app/src/tool_classifier
- ./src/intent_data_enrichment:/app/src/intent_data_enrichment
- ./src/api_tool_indexer:/app/src/api_tool_indexer
- ./src/__init__.py:/app/src/__init__.py:ro
- ./src/loki_logger.py:/app/src/loki_logger.py:ro
- ./src/utils/decrypt_vault_secrets.py:/app/src/utils/decrypt_vault_secrets.py:ro # Decryption utility (read-only)
- cron_data:/app/data
- shared-volume:/app/shared # Access to shared resources for cross-container coordination
Expand All @@ -192,7 +194,7 @@ services:
- ./.env:/app/.env:ro
environment:
- server.port=9010
- PYTHONPATH=/app:/app/src/vector_indexer:/app/src/intent_data_enrichment:/app/src/api_tool_indexer
- PYTHONPATH=/app:/app/src:/app/src/vector_indexer:/app/src/intent_data_enrichment:/app/src/api_tool_indexer
- vaultAgentUrl=http://vault-agent-cron:8203
ports:
- 9010:8080
Expand Down Expand Up @@ -594,6 +596,7 @@ services:
- ./src/optimization/optimized_modules:/app/src/optimization/optimized_modules
- llm_orchestration_logs:/app/logs
- ./tests:/app/tests # mount tests directory (excluded from image via .dockerignore)
- ./grafana-configs/loki_logger.py:/app/src/loki_logger.py:ro
networks:
- bykstack
depends_on:
Expand Down
64 changes: 0 additions & 64 deletions generate_presigned_url.py

This file was deleted.

4 changes: 2 additions & 2 deletions grafana-configs/grafana-dashboard-deployment.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"id": null,
"title": "RAG Module Orchestrator",
"tags": ["deployment", "models", "triton"],
"tags": ["rag-module", "loki-logs", "llm-orchestration", "vector-search"],
"timezone": "browser",
"refresh": "30s",
"time": {
Expand All @@ -15,7 +15,7 @@
"type": "query",
"label": "Service Name",
"refresh": 1,
"query": "label_values(service)",
"query": "label_values({}, service)",
"datasource": {
"type": "loki",
"uid": "loki-datasource"
Expand Down
117 changes: 98 additions & 19 deletions grafana-configs/loki_logger.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,30 @@
#!/usr/bin/env python3
"""
Loki Logger for Global Classifier
Loki Logger for RAG Module
Sends logs directly to Loki API for centralized logging
"""

import json
import socket
import time
from datetime import datetime
from threading import Thread
from queue import Full, Queue

import requests


class LokiLogger:
"""Simple logger that sends logs directly to Loki API"""
"""Simple logger that sends logs directly to Loki API with async background thread"""

_instances: dict[str, "LokiLogger"] = {}

def __new__(
cls, loki_url: str = "http://loki:3100", service_name: str = "default"
) -> "LokiLogger":
key = f"{loki_url}:{service_name}"
if key not in cls._instances:
cls._instances[key] = super().__new__(cls)
return cls._instances[key]

def __init__(
self, loki_url: str = "http://loki:3100", service_name: str = "default"
Expand All @@ -25,15 +36,40 @@ def __init__(
loki_url: URL for Loki service (default: container URL in bykstack network)
service_name: Name of the service for labeling logs
"""
if hasattr(self, "_initialized"):
return
self._initialized = True
self.loki_url = loki_url
self.service_name = service_name
self.hostname = socket.gethostname()
self.session = requests.Session()
# Set default timeout for all requests
self.timeout = 5

def _send_to_loki(self, level: str, message: str) -> None:
"""Send log entry directly to Loki API"""
# Queue for async log processing (bounded to avoid unbounded memory growth under load)
self.log_queue: Queue[tuple[str, str]] = Queue(maxsize=10_000)

# Start background worker thread
self.worker_thread = Thread(target=self._process_logs, daemon=True)
self.worker_thread.start()

def _process_logs(self) -> None:
"""Background worker that processes log queue"""
while True:
try:
# Get log entry from queue (blocking)
level, message = self.log_queue.get()

# Send to Loki
self._send_to_loki_sync(level, message)

# Mark task as done
self.log_queue.task_done()
except Exception:
# Silently ignore errors in background thread
pass

def _send_to_loki_sync(self, level: str, message: str) -> None:
"""Send log entry directly to Loki API (called from background thread)"""
try:
# Create timestamp in nanoseconds (Loki requirement)
timestamp_ns = str(int(time.time() * 1_000_000_000))
Expand All @@ -42,15 +78,12 @@ def _send_to_loki(self, level: str, message: str) -> None:
labels = {
"service": self.service_name,
"level": level,
"hostname": self.hostname,
}

# Create log entry
log_entry = {
"timestamp": datetime.now().isoformat(),
"level": level,
"message": message,
"hostname": self.hostname,
"service": self.service_name,
}

Expand All @@ -64,7 +97,7 @@ def _send_to_loki(self, level: str, message: str) -> None:
]
}

# Send to Loki (non-blocking, fire-and-forget)
# Send to Loki
self.session.post(
f"{self.loki_url}/loki/api/v1/push",
json=payload,
Expand All @@ -76,18 +109,64 @@ def _send_to_loki(self, level: str, message: str) -> None:
# Silently ignore logging errors to not affect main application
pass

# Also print to console for immediate feedback
def _log(self, level: str, message: str) -> None:
"""Queue log entry for async processing (non-blocking)"""
# Print to console immediately for real-time feedback
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] {level: <8} | {message}") # noqa: T201

def info(self, message: str) -> None:
self._send_to_loki("INFO", message)
# Queue for async Loki sending (non-blocking)
try:
self.log_queue.put_nowait((level, message))
except Full:
# Queue full (Loki may be slow/unreachable) - drop log to avoid blocking
pass

def info(self, message: str, **kwargs: object) -> None:
"""Log info message. Extra kwargs (extra, exc_info) are ignored for compatibility."""
self._log("INFO", message)

def error(self, message: str, **kwargs: object) -> None:
"""Log error message. Extra kwargs (extra, exc_info) are ignored for compatibility."""
self._log("ERROR", message)

def warning(self, message: str, **kwargs: object) -> None:
"""Log warning message. Extra kwargs (extra, exc_info) are ignored for compatibility."""
self._log("WARNING", message)

def debug(self, message: str, **kwargs: object) -> None:
"""Log debug message. Extra kwargs (extra, exc_info) are ignored for compatibility."""
self._log("DEBUG", message)

def success(self, message: str, **kwargs: object) -> None:
"""Log success message (loguru compatibility). Extra kwargs ignored."""
self._log("SUCCESS", message)

def critical(self, message: str, **kwargs: object) -> None:
"""Log critical message. Extra kwargs (extra, exc_info) are ignored for compatibility."""
self._log("CRITICAL", message)

def exception(self, message: str, **kwargs: object) -> None:
"""Log exception message. Extra kwargs (extra, exc_info) are ignored for compatibility."""
self._log("EXCEPTION", message)

def add(self, *args: object, **kwargs: object) -> None:
"""
No-op method for loguru compatibility.

LokiLogger sends logs to Loki/console only, not to files.
This method exists for backward compatibility with loguru code.
"""
pass # Silently ignore - logs go to Loki instead of files

def error(self, message: str) -> None:
self._send_to_loki("ERROR", message)
def remove(self, *args: object, **kwargs: object) -> None:
"""No-op method for loguru compatibility."""
pass # Silently ignore

def warning(self, message: str) -> None:
self._send_to_loki("WARNING", message)
def bind(self, **kwargs: object) -> "LokiLogger":
"""No-op method for loguru compatibility. Returns self for chaining."""
return self # Allow method chaining

def debug(self, message: str) -> None:
self._send_to_loki("DEBUG", message)
def opt(self, **kwargs: object) -> "LokiLogger":
"""No-op method for loguru compatibility. Returns self for chaining."""
return self # Allow method chaining
18 changes: 8 additions & 10 deletions src/api_tool_indexer/main_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
import asyncio
import argparse
from typing import List
from loguru import logger

from src.loki_logger import LokiLogger

from api_tool_indexer.constants import ApiToolIndexerConstants
from api_tool_indexer.models import EndpointData, EnrichedEndpoint, IndexingResult
Expand All @@ -37,8 +38,7 @@
# Reuse LLMAPIClient from intent_data_enrichment.
from intent_data_enrichment.api_client import LLMAPIClient

# Reuse sparse encoder from tool_classifier (shared BM25 implementation).
sys.path.insert(0, "/app/src")
logger = LokiLogger(service_name="api-tool-calling")
try:
from tool_classifier.sparse_encoder import compute_sparse_vector
except ImportError:
Expand Down Expand Up @@ -139,9 +139,8 @@ async def _generate_context_for_endpoint(
)

logger.debug(
"Generated context prompt for endpoint '{}': {} chars",
endpoint_data.endpoint_id,
len(context_prompt),
f"Generated context prompt for endpoint '{endpoint_data.endpoint_id}': "
f"{len(context_prompt)} chars"
)

# context_type="api_tool" makes context_manager use API_TOOL_CONTEXT_PROMPT,
Expand Down Expand Up @@ -175,11 +174,10 @@ async def _generate_context_for_endpoint(

context = result.get("context", "").strip()

logger.debug(
"context preview: {}{}",
context[:200].replace("\n", "\\n"),
"..." if len(context) > 200 else "",
_preview = context[:200].replace("\n", "\\n") + (
"..." if len(context) > 200 else ""
)
logger.debug(f"context preview: {_preview}")

if not context:
raise ValueError("Empty context returned from API")
Expand Down
5 changes: 4 additions & 1 deletion src/api_tool_indexer/qdrant_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

import uuid
from typing import Any, Dict, List, Optional
from loguru import logger

from src.loki_logger import LokiLogger
from qdrant_client import QdrantClient
from qdrant_client.models import (
Distance,
Expand All @@ -22,6 +23,8 @@
from api_tool_indexer.constants import ApiToolIndexerConstants
from api_tool_indexer.models import EnrichedEndpoint

logger = LokiLogger(service_name="api-tool-calling")

# Error messages
_CLIENT_NOT_INITIALIZED = "Qdrant client not initialized"

Expand Down
Loading
Loading