Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 81 additions & 3 deletions responses_api_models/inference_provider/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
For training workloads that require token IDs, use vllm_model instead.
"""

import json
from asyncio import Semaphore
from time import time
from typing import Any, Dict
from uuid import uuid4

from fastapi import Request
from aiohttp.client_exceptions import ClientResponseError
from fastapi import HTTPException, Request
from pydantic import Field

from nemo_gym.base_responses_api_model import (
Expand Down Expand Up @@ -59,6 +61,7 @@ class InferenceProviderConfig(BaseResponsesAPIModelConfig):

class InferenceProvider(SimpleResponsesAPIModel):
config: InferenceProviderConfig
_RETRYABLE_PROVIDER_STATUSES = {429, 500, 502, 503, 504, 520}

def model_post_init(self, context):
self._client = NeMoGymAsyncOpenAI(
Expand Down Expand Up @@ -145,9 +148,14 @@ async def chat_completions(
if isinstance(content, str):
_, remaining_content = self._converter._extract_reasoning_from_content(content)
message_dict["content"] = remaining_content

async with self._semaphore:
chat_completion_dict = await self._client.create_chat_completion(**body_dict)
try:
chat_completion_dict = await self._client.create_chat_completion(**body_dict)
except ClientResponseError as e:
normalized_payload = self._build_provider_error_payload(e)
raise HTTPException(
status_code=normalized_payload["provider_status"], detail=normalized_payload
) from e

choice_dict = chat_completion_dict["choices"][0]
if self.config.uses_reasoning_parser:
Expand All @@ -163,6 +171,76 @@ async def chat_completions(

return NeMoGymChatCompletion.model_validate(chat_completion_dict)

def _build_provider_error_payload(self, error: ClientResponseError) -> Dict[str, Any]:
provider_status = error.status if error.status else 500
message = self._extract_provider_error_message(error)
category = self._classify_provider_error(provider_status, message)
return {
"provider_status": provider_status,
"retryable": provider_status in self._RETRYABLE_PROVIDER_STATUSES,
"provider_context": {"base_url": self.config.base_url},
"model": self.config.model,
"category": category,
"message": message,
}

def _classify_provider_error(self, status: int, message: str) -> str:
message_lower = message.lower()
if status in {401, 403} or "api key" in message_lower or "auth" in message_lower:
return "authentication"
if status == 404 or ("model" in message_lower and "not found" in message_lower):
return "model_not_found"
if status == 429 or "rate limit" in message_lower:
return "rate_limit"
if status in {400, 422}:
return "request_error"
if status in self._RETRYABLE_PROVIDER_STATUSES:
return "transient_upstream_failure"
return "provider_error"

def _extract_provider_error_message(self, error: ClientResponseError) -> str:
response_content = getattr(error, "response_content", b"")
if isinstance(response_content, bytes):
response_text = response_content.decode("utf-8", errors="replace").strip()
elif response_content:
response_text = str(response_content).strip()
else:
response_text = str(error)

parsed_message = response_text
if response_text:
parsed_message = self._extract_error_message_from_response(response_text)

if parsed_message:
return self._concise(parsed_message)
return "Provider request failed"

@staticmethod
def _extract_error_message_from_response(response_text: str) -> str:
try:
payload = json.loads(response_text)
except json.JSONDecodeError:
return response_text

if isinstance(payload, dict):
if isinstance(payload.get("error"), dict):
nested_error = payload["error"]
if nested_error.get("message"):
return str(nested_error["message"])

if payload.get("message"):
return str(payload["message"])

if payload.get("detail"):
return str(payload["detail"])

return response_text

@staticmethod
def _concise(message: str) -> str:
compact = " ".join(message.strip().split())
return compact if len(compact) <= 200 else compact[:197] + "..."


if __name__ == "__main__":
InferenceProvider.run_webserver()
Expand Down
Loading