Skip to content

mkkotcherla/microagent-guide

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

2 Commits
 
 
 
 

Repository files navigation

Building Micro Agents as Production-Grade Microservices

A Complete Engineering Guide

Table of Contents

  1. Introduction & Motivation
  2. Core Architecture Principles
  3. Agent Service Design
  4. The AgentRunner Loop
  5. Inter-Agent Communication
  6. Tool Registry Service
  7. Memory Architecture
  8. Context Window Management
  9. Orchestrator & Supervisor Pattern
  10. Security & Authorization
  11. Observability: Traces, Logs, Metrics
  12. Deployment on Kubernetes
  13. Scaling Strategies
  14. Fault Tolerance & Retry Strategies
  15. Testing Agent Microservices
  16. CI/CD Pipeline for Agent Services
  17. Cost Management & Token Budgeting
  18. Production Readiness Checklist
  19. Reference Architecture Diagram
  20. Further Reading

1. Introduction & Motivation

Why monolithic agent systems fail in production

A single-process agent that handles reasoning, tool calls, memory retrieval, and output generation works well in prototypes. In production it breaks in predictable ways:

  • Latency coupling — one slow tool call blocks the entire inference loop
  • Unscalable compute — you cannot scale the summarization workload independently from the search workload
  • Blast radius — a single LLM API timeout or memory corruption takes the whole system down
  • Zero deployment granularity — updating one tool integration requires redeploying everything
  • No isolation for billing — impossible to attribute compute cost to individual agent functions

The microservice solution

Each autonomous capability becomes an independently deployable, independently scalable service with:

  • Its own API surface (HTTP/gRPC)
  • Its own health checks and readiness probes
  • Its own memory scope (no shared in-process state)
  • Its own tool bindings (resolved at runtime from a Tool Registry)
  • Its own observability (distributed traces, metrics, structured logs)

What is a Micro Agent?

A micro agent is a bounded autonomous service that:

  1. Accepts a task (prompt + context + session ID) via an API call
  2. Runs a plan → act → observe loop using an LLM backend
  3. Invokes tools via a centralized Tool Registry
  4. Stores and retrieves conversation state from an external memory store
  5. Returns a typed result or emits an event to downstream consumers

Key insight: A micro agent is not a "smart function" — it is a service with its own API contract, memory scope, failure modes, and SLA. Design it accordingly.


2. Core Architecture Principles

2.1 Single Responsibility

Each agent owns exactly one reasoning domain. Examples:

Agent Name Responsibility
agent-search Web and knowledge base retrieval
agent-summarize Long-form content compression
agent-code Code generation, review, execution
agent-classify Intent detection and routing
agent-email Email drafting and sending
agent-data SQL generation and data analysis

2.2 Stateless Reasoning, Stateful Memory

The LLM inference step must be stateless. Memory lives in external stores:

┌──────────────┐     reads/writes     ┌──────────────────┐
│  AgentRunner │ ──────────────────► │   Redis / Postgres│
│  (stateless) │                      │   / Vector DB     │
└──────────────┘                      └──────────────────┘

No conversation history should ever live in in-process RAM between requests.

2.3 Schema-First Tool Contracts

Every tool must have a JSON Schema definition published to a shared Tool Registry before any agent can invoke it. No ad-hoc function signatures. This enables:

  • Runtime input validation before LLM output reaches backend services
  • Auto-generated documentation
  • Tool versioning with backwards compatibility checks

2.4 Idempotent Actions

Any tool call that modifies external state (send email, write to DB, trigger webhook) must be idempotent. Strategies:

  • Use idempotency keys at the HTTP layer (pass Idempotency-Key header)
  • Use message deduplication at the queue level (Kafka exactly-once semantics)
  • Design tool handlers to be safe to retry: check-then-act patterns

2.5 Async by Default

Long-running agent tasks (multi-step research, code generation + execution) must use async task queues — not synchronous HTTP with long timeouts.

Client ──► POST /tasks ──► Kafka/BullMQ ──► AgentWorker
Client ──► GET /tasks/{id} ──► Redis (status polling)
       ◄── WebSocket/SSE push (optional)

2.6 Explicit Context Boundaries

Each agent invocation carries a bounded context packet — never grow unbounded message histories. A ContextManager service compresses/summarizes history before injection.


3. Agent Service Design

Project Layout

Each agent is a containerized FastAPI or gRPC service with this canonical structure:

agent-search/
├── agent/
│   ├── core.py           # AgentRunner: plan → act → observe loop
│   ├── prompts.py        # System prompt + few-shot templates
│   ├── memory.py         # ContextManager: load/compress/save
│   ├── tools.py          # Tool bindings (calls Tool Registry)
│   └── schemas.py        # Pydantic models for all I/O
├── api/
│   ├── routes.py         # POST /run, GET /status/{task_id}
│   ├── middleware.py     # Auth, rate limiting, request tracing
│   └── deps.py           # Dependency injection: DB, Redis, LLM client
├── tests/
│   ├── unit/
│   ├── integration/
│   └── fixtures/
├── Dockerfile
├── pyproject.toml
└── k8s/
    ├── deployment.yaml
    ├── service.yaml
    ├── hpa.yaml
    └── configmap.yaml

API Contract

Every agent exposes these HTTP endpoints at minimum:

POST   /run                  Submit a task (sync, short tasks only)
POST   /tasks                Submit a task (async, returns task_id)
GET    /tasks/{task_id}      Poll task status and result
GET    /health               Liveness probe
GET    /ready                Readiness probe (checks LLM + memory store)
GET    /metrics              Prometheus metrics endpoint

Pydantic Schemas

# agent/schemas.py
from pydantic import BaseModel, Field
from typing import Optional, Dict, Any
from enum import Enum

class TaskStatus(str, Enum):
    PENDING   = "pending"
    RUNNING   = "running"
    COMPLETED = "completed"
    FAILED    = "failed"
    CANCELLED = "cancelled"

class AgentTask(BaseModel):
    id:         str
    session_id: str
    prompt:     str
    metadata:   Dict[str, Any] = Field(default_factory=dict)
    max_steps:  int = Field(default=10, ge=1, le=25)
    token_budget: int = Field(default=8192, ge=512, le=32768)

class AgentResult(BaseModel):
    task_id:    str
    status:     TaskStatus
    output:     Optional[str] = None
    steps_used: int = 0
    tokens_used: int = 0
    tool_calls:  int = 0
    error:       Optional[str] = None
    duration_ms: int = 0

4. The AgentRunner Loop

Full Implementation

# agent/core.py
import asyncio
import time
from opentelemetry import trace
from tenacity import retry, stop_after_attempt, wait_exponential_jitter

tracer = trace.get_tracer(__name__)
MAX_STEPS = 15

class AgentRunner:
    def __init__(self, agent_id: str, config: AgentConfig):
        self.agent_id = agent_id
        self.llm      = LLMClient(model=config.model, timeout=30)
        self.memory   = ContextManager(agent_id, max_tokens=config.context_limit)
        self.tools    = ToolRegistryClient(config.tool_registry_url)
        self.metrics  = AgentMetrics(agent_id)

    async def run(self, task: AgentTask) -> AgentResult:
        start = time.monotonic()

        with tracer.start_as_current_span("agent.run") as span:
            span.set_attribute("agent.id",      self.agent_id)
            span.set_attribute("agent.task_id", task.id)
            span.set_attribute("agent.session", task.session_id)

            try:
                result = await self._run_loop(task, span)
            except TokenBudgetExceeded as e:
                result = AgentResult(
                    task_id=task.id,
                    status=TaskStatus.COMPLETED,
                    output=e.partial_output,
                    error="token_budget_exceeded"
                )
            except Exception as e:
                span.record_exception(e)
                result = AgentResult(
                    task_id=task.id,
                    status=TaskStatus.FAILED,
                    error=str(e)
                )
            finally:
                result.duration_ms = int((time.monotonic() - start) * 1000)
                self.metrics.record(result)

            return result

    async def _run_loop(self, task: AgentTask, span) -> AgentResult:
        # Load available tools from registry
        tool_schemas = await self.tools.fetch(agent_id=self.agent_id)

        # Load and compress conversation history
        context = await self.memory.load(task.session_id)
        messages = build_messages(context, task.prompt)

        total_tokens = 0
        tool_call_count = 0

        for step in range(task.max_steps):
            span.set_attribute("agent.current_step", step)

            with tracer.start_as_current_span("agent.llm_call") as llm_span:
                response = await self._complete_with_retry(messages, tool_schemas)
                llm_span.set_attribute("llm.prompt_tokens",     response.usage.prompt_tokens)
                llm_span.set_attribute("llm.completion_tokens", response.usage.completion_tokens)

            total_tokens += response.usage.total_tokens

            if total_tokens > task.token_budget:
                raise TokenBudgetExceeded(
                    partial_output=response.content,
                    tokens_used=total_tokens
                )

            if response.finish_reason == "stop":
                await self.memory.save(task.session_id, messages + [response.message])
                return AgentResult(
                    task_id=task.id,
                    status=TaskStatus.COMPLETED,
                    output=response.content,
                    steps_used=step + 1,
                    tokens_used=total_tokens,
                    tool_calls=tool_call_count
                )

            if response.tool_calls:
                tool_call_count += len(response.tool_calls)
                results = await self._execute_tools(response.tool_calls)
                messages.append(response.message)
                messages.extend(tool_result_messages(results))

        # Hit max steps — return best available output
        return AgentResult(
            task_id=task.id,
            status=TaskStatus.COMPLETED,
            output=response.content,
            steps_used=task.max_steps,
            tokens_used=total_tokens,
            error="max_steps_reached"
        )

    @retry(stop=stop_after_attempt(3), wait=wait_exponential_jitter(max=15))
    async def _complete_with_retry(self, messages, tools):
        return await self.llm.complete(messages=messages, tools=tools)

    async def _execute_tools(self, tool_calls):
        tasks = [self.tools.invoke(tc) for tc in tool_calls]
        return await asyncio.gather(*tasks, return_exceptions=True)

5. Inter-Agent Communication

Pattern Selection Matrix

Pattern Use Case Latency Durability Complexity
Sync HTTP/gRPC Short sub-tasks (<3s), latency-critical Low None Low
Async Queue (Kafka) Long-running pipelines, multi-step workflows Medium High Medium
Event Bus (Redis Streams) Fan-out: one event triggers many agents Low Medium Low
GraphQL Subscriptions Real-time streaming results to clients Low Low Medium

gRPC Service Definition

For synchronous sub-agent calls, gRPC provides strong typing, bidirectional streaming, and efficient binary serialization.

// proto/agent_service.proto
syntax = "proto3";
package agents.v1;

service AgentService {
  rpc RunTask      (TaskRequest)  returns (TaskResponse);
  rpc StreamSteps  (TaskRequest)  returns (stream StepEvent);
  rpc Health       (HealthRequest) returns (HealthResponse);
}

message TaskRequest {
  string task_id    = 1;
  string session_id = 2;
  string prompt     = 3;
  map<string, string> metadata = 4;
  int32  max_steps  = 5;
  int32  token_budget = 6;
}

message TaskResponse {
  string task_id    = 1;
  string status     = 2;
  string output     = 3;
  int32  steps_used = 4;
  int32  tokens_used = 5;
  string error      = 6;
}

message StepEvent {
  int32  step_number = 1;
  string type        = 2;  // "llm_call" | "tool_call" | "tool_result"
  string content     = 3;
}

Kafka Event Schema

For async pipeline handoffs between agents, use Avro or JSON schemas registered in a Schema Registry.

{
  "schema": {
    "type": "record",
    "name": "AgentTaskEvent",
    "namespace": "com.myco.agents.v1",
    "fields": [
      {"name": "task_id",       "type": "string"},
      {"name": "source_agent",  "type": "string"},
      {"name": "target_agent",  "type": "string"},
      {"name": "session_id",    "type": "string"},
      {"name": "prompt",        "type": "string"},
      {"name": "context",       "type": {"type": "map", "values": "string"}},
      {"name": "created_at",    "type": {"type": "long", "logicalType": "timestamp-millis"}}
    ]
  }
}

Kafka Producer (in Orchestrator)

# In orchestrator when dispatching to agent-search
from aiokafka import AIOKafkaProducer
import json

async def dispatch_to_agent(target_agent: str, task: AgentTask):
    producer = AIOKafkaProducer(bootstrap_servers=KAFKA_BROKERS)
    await producer.start()
    try:
        event = {
            "task_id":      task.id,
            "source_agent": "orchestrator",
            "target_agent": target_agent,
            "session_id":   task.session_id,
            "prompt":       task.prompt,
            "created_at":   int(time.time() * 1000)
        }
        await producer.send_and_wait(
            topic=f"agent.tasks.{target_agent}",
            value=json.dumps(event).encode(),
            key=task.session_id.encode(),   # partition by session
            headers=[("trace-id", get_current_trace_id().encode())]
        )
    finally:
        await producer.stop()

6. Tool Registry Service

Architecture

The Tool Registry is a centralized FastAPI service that stores, validates, and serves tool definitions. It acts as a typed API gateway for all agent→tool traffic.

┌─────────────┐   GET /tools        ┌──────────────────┐
│ AgentRunner │ ──────────────────► │  Tool Registry   │
│             │                     │                  │
│             │   POST /invoke      │  - Tool schemas  │
│             │ ──────────────────► │  - Auth configs  │
└─────────────┘                     │  - Rate limits   │
                                    │  - Health status │
                                    └──────────────────┘
                                             │
                              validates + routes
                                             │
                            ┌────────────────┼────────────────┐
                            ▼                ▼                ▼
                      ┌──────────┐    ┌──────────┐    ┌──────────┐
                      │  Search  │    │  Email   │    │   SQL    │
                      │  Tool    │    │  Tool    │    │  Tool    │
                      └──────────┘    └──────────┘    └──────────┘

Tool Registration Schema

# Tool self-registers on startup
class ToolDefinition(BaseModel):
    name:        str
    version:     str
    description: str
    parameters:  Dict[str, Any]   # JSON Schema
    returns:     Dict[str, Any]   # JSON Schema
    endpoint:    str              # where registry routes calls
    health_url:  str
    auth_type:   str              # "api_key" | "oauth2" | "none"
    rate_limit:  int              # calls per minute per agent
    timeout_ms:  int = 10000

# Registration call at tool service startup
@app.on_event("startup")
async def register_tool():
    registry = ToolRegistryClient(TOOL_REGISTRY_URL)
    await registry.register(ToolDefinition(
        name="web_search",
        version="2.1.0",
        description="Search the web and return ranked results",
        parameters={
            "type": "object",
            "properties": {
                "query":       {"type": "string", "maxLength": 500},
                "num_results": {"type": "integer", "minimum": 1, "maximum": 20}
            },
            "required": ["query"]
        },
        returns={
            "type": "array",
            "items": {
                "type": "object",
                "properties": {
                    "url":     {"type": "string"},
                    "title":   {"type": "string"},
                    "snippet": {"type": "string"}
                }
            }
        },
        endpoint=f"{SERVICE_URL}/invoke",
        health_url=f"{SERVICE_URL}/health",
        auth_type="api_key",
        rate_limit=60,
        timeout_ms=8000
    ))

Registry Validation Layer

# Tool Registry validates before forwarding
async def invoke_tool(agent_id: str, tool_name: str, params: dict):
    tool = await db.get_tool(tool_name)

    if not tool:
        raise ToolNotFoundError(tool_name)

    # Validate against JSON Schema
    jsonschema.validate(params, tool.parameters)  # raises on invalid input

    # Check rate limit
    if not await rate_limiter.check(agent_id, tool_name, tool.rate_limit):
        raise RateLimitExceeded(f"{tool_name} limit: {tool.rate_limit}/min")

    # Forward to tool service with timeout
    async with httpx.AsyncClient(timeout=tool.timeout_ms / 1000) as client:
        response = await client.post(
            tool.endpoint,
            json={"params": params},
            headers={"X-Agent-Id": agent_id, "X-Request-Id": str(uuid4())}
        )
        response.raise_for_status()
        return response.json()

7. Memory Architecture

Memory Tier Selection

Tier Store Use Case TTL
Working memory In-request dict Current tool call results Request lifetime
Short-term memory Redis Active session context (last N turns) 24 hours
Long-term memory Postgres User preferences, past decisions Permanent
Semantic memory Vector DB (Qdrant/Pinecone) Relevant past context retrieved by embedding Permanent
Episodic memory Postgres + embedding index Full session summaries Permanent

ContextManager Implementation

# agent/memory.py
import json
from redis.asyncio import Redis
from qdrant_client import QdrantClient
from typing import List

class ContextManager:
    def __init__(self, agent_id: str, max_tokens: int = 4096):
        self.agent_id   = agent_id
        self.max_tokens = max_tokens
        self.redis      = Redis.from_url(REDIS_URL)
        self.qdrant     = QdrantClient(QDRANT_URL)
        self.embedder   = EmbeddingClient()

    async def load(self, session_id: str) -> List[dict]:
        # 1. Load recent turns from Redis
        raw = await self.redis.get(f"session:{session_id}:messages")
        messages = json.loads(raw) if raw else []

        # 2. Retrieve semantically relevant past context
        if messages:
            last_user_msg = next(m for m in reversed(messages) if m["role"] == "user")
            embedding = await self.embedder.embed(last_user_msg["content"])
            relevant  = await self.qdrant.search(
                collection_name=f"agent_{self.agent_id}_memory",
                query_vector=embedding,
                limit=3
            )
            # Prepend as system context
            for hit in relevant:
                messages.insert(0, {
                    "role":    "system",
                    "content": f"[Past context] {hit.payload['summary']}"
                })

        # 3. Compress if over token limit
        return await self._compress_if_needed(messages)

    async def save(self, session_id: str, messages: List[dict]):
        # Save last 20 turns to Redis
        recent = messages[-20:]
        await self.redis.setex(
            f"session:{session_id}:messages",
            86400,  # 24h TTL
            json.dumps(recent)
        )

        # If session is long, generate and store a summary in vector DB
        if len(messages) > 30:
            summary = await self._summarize(messages)
            embedding = await self.embedder.embed(summary)
            await self.qdrant.upsert(
                collection_name=f"agent_{self.agent_id}_memory",
                points=[{
                    "id":      session_id,
                    "vector":  embedding,
                    "payload": {"summary": summary, "session_id": session_id}
                }]
            )

    async def _compress_if_needed(self, messages: List[dict]) -> List[dict]:
        token_count = estimate_tokens(messages)
        if token_count <= self.max_tokens:
            return messages

        # Keep system messages + last N user/assistant turns
        system_msgs  = [m for m in messages if m["role"] == "system"]
        recent_turns = messages[-12:]  # last 6 exchanges
        return system_msgs + recent_turns

8. Context Window Management

Token Estimation

import tiktoken

def estimate_tokens(messages: list, model: str = "gpt-4o") -> int:
    enc = tiktoken.encoding_for_model(model)
    total = 0
    for msg in messages:
        total += 4  # per-message overhead
        total += len(enc.encode(msg.get("content", "") or ""))
        if "tool_calls" in msg:
            for tc in msg["tool_calls"]:
                total += len(enc.encode(json.dumps(tc)))
    return total

class TokenBudget:
    def __init__(self, total: int, model: str):
        self.total     = total
        self.model     = model
        self.used      = 0
        self.reserved  = 1024  # always reserve for output

    @property
    def available_for_input(self):
        return self.total - self.reserved - self.used

    def consume(self, tokens: int):
        self.used += tokens
        if self.used > self.total - self.reserved:
            raise TokenBudgetExceeded(tokens_used=self.used)

9. Orchestrator & Supervisor Pattern

Orchestrator: Task Decomposition

The Orchestrator is itself an agent microservice, but its role is planning and coordination rather than execution.

# orchestrator/core.py
class OrchestratorAgent:
    async def execute(self, user_request: str, session_id: str) -> str:
        # Step 1: Decompose into a DAG of sub-tasks
        plan = await self.planner.decompose(user_request)
        # Returns: [{"id": "t1", "agent": "search",    "task": "...", "deps": []},
        #           {"id": "t2", "agent": "summarize",  "task": "...", "deps": ["t1"]},
        #           {"id": "t3", "agent": "email",      "task": "...", "deps": ["t2"]}]

        # Step 2: Execute in topological order, parallel where possible
        results = {}
        for wave in topological_waves(plan):
            # All tasks in a wave have their deps satisfied
            wave_results = await asyncio.gather(*[
                self.supervisor.dispatch(step, results)
                for step in wave
            ])
            for step, result in zip(wave, wave_results):
                results[step["id"]] = result

        # Step 3: Synthesize final output
        return await self.synthesizer.merge(results, user_request)

def topological_waves(plan: list) -> list:
    """Return plan steps grouped into parallel execution waves."""
    completed = set()
    waves = []
    remaining = list(plan)
    while remaining:
        wave = [s for s in remaining if all(d in completed for d in s["deps"])]
        waves.append(wave)
        completed.update(s["id"] for s in wave)
        remaining = [s for s in remaining if s["id"] not in completed]
    return waves

Supervisor: Retry & Escalation

class Supervisor:
    def __init__(self, agent_clients: dict):
        self.agent_clients = agent_clients

    async def dispatch(self, step: dict, context: dict) -> StepResult:
        task_prompt = self._inject_context(step["task"], context, step["deps"])

        for attempt in range(3):
            try:
                return await asyncio.wait_for(
                    self.agent_clients[step["agent"]].run(task_prompt),
                    timeout=60.0
                )
            except asyncio.TimeoutError:
                if attempt == 2:
                    raise SupervisorEscalation(step, "timeout_after_3_attempts")
                await asyncio.sleep(2 ** attempt)  # 1s, 2s, 4s
            except AgentError as e:
                if e.is_unrecoverable:
                    raise SupervisorEscalation(step, str(e))
                await asyncio.sleep(2 ** attempt)

    def _inject_context(self, task: str, results: dict, dep_ids: list) -> str:
        context_parts = [results[dep_id].output for dep_id in dep_ids if dep_id in results]
        if context_parts:
            return f"Context from previous steps:\n{chr(10).join(context_parts)}\n\nTask: {task}"
        return task

10. Security & Authorization

Agent Identity & JWT Verification

Each agent service must verify that incoming requests are from authorized callers. Use short-lived JWT tokens signed by an internal auth service.

# api/middleware.py
from fastapi import Request, HTTPException
from jose import jwt, JWTError

ALLOWED_CALLERS = {"orchestrator", "supervisor", "api-gateway"}

async def verify_agent_token(request: Request):
    token = request.headers.get("Authorization", "").removeprefix("Bearer ")
    if not token:
        raise HTTPException(status_code=401, detail="Missing auth token")
    try:
        payload = jwt.decode(token, PUBLIC_KEY, algorithms=["RS256"])
        caller  = payload.get("sub")
        if caller not in ALLOWED_CALLERS:
            raise HTTPException(status_code=403, detail=f"Caller {caller} not authorized")
        request.state.caller = caller
    except JWTError as e:
        raise HTTPException(status_code=401, detail=f"Invalid token: {e}")

Secrets Management

Never store API keys in environment literals or ConfigMaps. Use Kubernetes Secrets mounted as environment variables, or preferably HashiCorp Vault with the Vault Agent Sidecar.

# k8s/deployment.yaml (secrets section)
env:
  - name: OPENAI_API_KEY
    valueFrom:
      secretKeyRef:
        name: agent-secrets
        key: openai-api-key
  - name: TOOL_REGISTRY_TOKEN
    valueFrom:
      secretKeyRef:
        name: agent-secrets
        key: tool-registry-token

Tool Call Authorization

The Tool Registry enforces agent-level RBAC: which agents can invoke which tools.

# Tool Registry ACL check
TOOL_ACL = {
    "agent-search":    ["web_search", "vector_search", "knowledge_base"],
    "agent-email":     ["send_email", "get_email_thread"],
    "agent-code":      ["code_exec", "git_read", "package_search"],
    "agent-data":      ["sql_query", "csv_read", "chart_generate"],
}

async def check_tool_acl(agent_id: str, tool_name: str):
    allowed_tools = TOOL_ACL.get(agent_id, [])
    if tool_name not in allowed_tools:
        raise PermissionError(f"{agent_id} is not authorized to call {tool_name}")

11. Observability: Traces, Logs, Metrics

Distributed Tracing Setup (OpenTelemetry)

# observability/tracing.py
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.redis  import RedisInstrumentor
from opentelemetry.instrumentation.httpx  import HTTPXClientInstrumentor

def setup_tracing(service_name: str):
    provider = TracerProvider(
        resource=Resource(attributes={SERVICE_NAME: service_name})
    )
    provider.add_span_processor(
        BatchSpanProcessor(OTLPSpanExporter(endpoint=OTEL_ENDPOINT))
    )
    trace.set_tracer_provider(provider)

    # Auto-instrument frameworks
    FastAPIInstrumentor().instrument()
    RedisInstrumentor().instrument()
    HTTPXClientInstrumentor().instrument()

Standard Span Attributes for Agent Calls

Always set these attributes on every agent and LLM span:

# In AgentRunner._run_loop:
span.set_attribute("agent.id",               self.agent_id)
span.set_attribute("agent.task_id",          task.id)
span.set_attribute("agent.session_id",       task.session_id)
span.set_attribute("agent.step",             step)
span.set_attribute("llm.model",              config.model)
span.set_attribute("llm.prompt_tokens",      response.usage.prompt_tokens)
span.set_attribute("llm.completion_tokens",  response.usage.completion_tokens)
span.set_attribute("llm.finish_reason",      response.finish_reason)

# In Tool Registry on invoke:
span.set_attribute("tool.name",              tool_name)
span.set_attribute("tool.version",           tool.version)
span.set_attribute("tool.caller_agent",      agent_id)
span.set_attribute("tool.latency_ms",        latency_ms)

Prometheus Metrics

# observability/metrics.py
from prometheus_client import Counter, Histogram, Gauge

agent_tasks_total = Counter(
    "agent_tasks_total",
    "Total tasks processed",
    ["agent_id", "status"]
)

agent_task_duration = Histogram(
    "agent_task_duration_seconds",
    "Task end-to-end latency",
    ["agent_id"],
    buckets=[0.5, 1, 2, 5, 10, 30, 60, 120]
)

agent_llm_tokens = Counter(
    "agent_llm_tokens_total",
    "LLM tokens consumed",
    ["agent_id", "token_type"]  # token_type: prompt | completion
)

agent_tool_calls = Counter(
    "agent_tool_calls_total",
    "Tool invocations",
    ["agent_id", "tool_name", "status"]
)

agent_steps_per_task = Histogram(
    "agent_steps_per_task",
    "Number of steps per task (runaway guard)",
    ["agent_id"],
    buckets=[1, 2, 3, 5, 8, 10, 15, 20, 25]
)

orchestrator_queue_depth = Gauge(
    "orchestrator_queue_depth",
    "Pending tasks in orchestrator queue"
)

Alert Rules

# alerting/rules.yaml
groups:
  - name: agent-alerts
    rules:
      - alert: AgentHighErrorRate
        expr: rate(agent_tasks_total{status="failed"}[5m]) > 0.05
        for: 2m
        annotations:
          summary: "{{ $labels.agent_id }} failure rate above 5%"

      - alert: AgentRunawayTask
        expr: histogram_quantile(0.99, agent_steps_per_task) > 15
        for: 5m
        annotations:
          summary: "Agent tasks exceeding 15 steps — possible runaway loop"

      - alert: LLMTokenCostSpike
        expr: rate(agent_llm_tokens_total[10m]) > 50000
        for: 5m
        annotations:
          summary: "Token consumption rate spike — check for loops"

      - alert: AgentLatencyHigh
        expr: histogram_quantile(0.99, agent_task_duration_seconds) > 10
        for: 5m
        annotations:
          summary: "p99 task latency above 10s"

Structured Logging

# Never log raw prompts or PII. Log task IDs and outcome codes.
import structlog

log = structlog.get_logger()

log.info("agent.task.completed",
    task_id=task.id,
    session_id=task.session_id,   # hashed in prod
    agent_id=self.agent_id,
    steps=result.steps_used,
    tokens=result.tokens_used,
    duration_ms=result.duration_ms,
    tool_calls=result.tool_calls,
    status=result.status,
    trace_id=get_current_trace_id()
)

12. Deployment on Kubernetes

Deployment Manifest

# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: agent-search
  labels:
    app: agent-search
    version: v1.4.2
    team: ai-platform
spec:
  replicas: 2
  selector:
    matchLabels:
      app: agent-search
  template:
    metadata:
      labels:
        app: agent-search
        version: v1.4.2
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/path:   "/metrics"
        prometheus.io/port:   "8080"
    spec:
      serviceAccountName: agent-search
      containers:
        - name: agent
          image: registry.myco.io/agent-search@sha256:<digest>   # Always pin by digest
          ports:
            - containerPort: 8080    # HTTP API
              name: http
            - containerPort: 50051   # gRPC
              name: grpc
          env:
            - name: AGENT_ID
              value: "agent-search"
            - name: TOOL_REGISTRY_URL
              valueFrom: {configMapKeyRef: {name: agent-config, key: tool-registry-url}}
            - name: REDIS_URL
              valueFrom: {secretKeyRef: {name: agent-secrets, key: redis-url}}
            - name: OPENAI_API_KEY
              valueFrom: {secretKeyRef: {name: agent-secrets, key: openai-api-key}}
            - name: OTEL_EXPORTER_OTLP_ENDPOINT
              valueFrom: {configMapKeyRef: {name: observability-config, key: otel-endpoint}}
          resources:
            requests:
              cpu:    "500m"
              memory: "512Mi"
            limits:
              cpu:    "2"
              memory: "2Gi"
          livenessProbe:
            httpGet:
              path: /health
              port: 8080
            initialDelaySeconds: 10
            periodSeconds:       15
            failureThreshold:    3
          readinessProbe:
            httpGet:
              path: /ready
              port: 8080
            initialDelaySeconds: 5
            periodSeconds:       10
            failureThreshold:    2
          lifecycle:
            preStop:
              exec:
                command: ["/bin/sh", "-c", "sleep 5"]  # drain connections before shutdown
      topologySpreadConstraints:
        - maxSkew:           1
          topologyKey:       kubernetes.io/hostname
          whenUnsatisfiable: DoNotSchedule
          labelSelector:
            matchLabels: {app: agent-search}

Horizontal Pod Autoscaler (Custom Metrics)

Scale on Kafka consumer lag and p99 task latency, not just CPU:

# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: agent-search-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: agent-search
  minReplicas: 2
  maxReplicas: 20
  behavior:
    scaleUp:
      stabilizationWindowSeconds: 60
      policies:
        - type: Pods
          value: 4
          periodSeconds: 60
    scaleDown:
      stabilizationWindowSeconds: 300   # be conservative scaling down
  metrics:
    - type: External
      external:
        metric:
          name: kafka_consumer_group_lag
          selector:
            matchLabels:
              topic: agent.tasks.search
        target:
          type:         AverageValue
          averageValue: "100"
    - type: Resource
      resource:
        name: cpu
        target:
          type:               Utilization
          averageUtilization: 70

PodDisruptionBudget

Ensure at least one replica is always available during rolling updates:

# k8s/pdb.yaml
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
  name: agent-search-pdb
spec:
  minAvailable: 1
  selector:
    matchLabels:
      app: agent-search

13. Scaling Strategies

Per-Agent Scaling Logic

Scaling Signal Metric Scale Up Trigger Scale Down Trigger
Queue-based kafka_consumer_lag lag > 100 msgs lag < 10 msgs
Latency-based agent_task_duration_p99 p99 > 8s p99 < 3s
CPU-based cpu_utilization > 70% < 40%
Custom active_sessions > 500 < 50

Multi-Model Fallback

If the primary LLM is unavailable or rate-limited, automatically route to a fallback:

class LLMClient:
    MODEL_CASCADE = [
        "gpt-4o",             # primary
        "gpt-4o-mini",        # cheaper fallback
        "claude-sonnet-4-6",  # cross-vendor fallback
    ]

    async def complete(self, messages: list, **kwargs) -> LLMResponse:
        for model in self.MODEL_CASCADE:
            try:
                return await self._call_model(model, messages, **kwargs)
            except (RateLimitError, ModelUnavailable):
                log.warning("llm.fallback", from_model=model, reason="rate_limit_or_unavailable")
                continue
        raise AllModelsUnavailable()

14. Fault Tolerance & Retry Strategies

Circuit Breaker on LLM Client

from circuitbreaker import circuit

class LLMClientWithCircuitBreaker:
    @circuit(failure_threshold=5, recovery_timeout=30, expected_exception=LLMError)
    async def complete(self, messages: list, **kwargs) -> LLMResponse:
        return await self._raw_complete(messages, **kwargs)

The circuit opens after 5 consecutive failures and remains open for 30 seconds, serving fallback responses or routing to a secondary model during that window.

Exponential Backoff with Jitter

from tenacity import (
    retry, stop_after_attempt,
    wait_exponential_jitter, retry_if_exception_type
)

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential_jitter(initial=1, max=60),
    retry=retry_if_exception_type((RateLimitError, TimeoutError, ServiceUnavailable))
)
async def call_tool_with_retry(tool_name: str, params: dict):
    return await tool_registry.invoke(tool_name, params)

Dead Letter Queue Handler

# dlq_handler.py — consumes from dead-letter topic
class DLQHandler:
    async def process(self, event: AgentTaskEvent):
        log.error("agent.task.dlq",
            task_id=event.task_id,
            target_agent=event.target_agent,
            attempt_count=event.retry_count,
            original_error=event.last_error
        )

        # Alert on-call if error is novel
        if await self.is_novel_error(event.last_error):
            await self.pagerduty.alert(event)

        # Store for human review dashboard
        await self.db.insert_dlq_item(event)

        # Auto-re-queue with modified params after 1 hour (optional)
        if event.retry_count < 2 and event.auto_retry_eligible:
            await asyncio.sleep(3600)
            event.retry_count += 1
            await self.kafka.send("agent.tasks." + event.target_agent, event)

Step-Level Checkpointing

class CheckpointedAgentRunner(AgentRunner):
    async def _run_loop(self, task: AgentTask, span) -> AgentResult:
        # Restore from checkpoint if available
        checkpoint = await self.redis.get(f"checkpoint:{task.id}")
        if checkpoint:
            state = json.loads(checkpoint)
            messages      = state["messages"]
            total_tokens  = state["total_tokens"]
            start_step    = state["step"] + 1
            log.info("agent.checkpoint.restored", task_id=task.id, step=start_step)
        else:
            context       = await self.memory.load(task.session_id)
            messages      = build_messages(context, task.prompt)
            total_tokens  = 0
            start_step    = 0

        for step in range(start_step, task.max_steps):
            response = await self._complete_with_retry(messages, tool_schemas)
            messages.append(response.message)

            # Persist checkpoint after each step
            await self.redis.setex(
                f"checkpoint:{task.id}",
                3600,
                json.dumps({"messages": messages, "total_tokens": total_tokens, "step": step})
            )

            if response.finish_reason == "stop":
                await self.redis.delete(f"checkpoint:{task.id}")
                break

        return build_result(task, response, total_tokens, step)

15. Testing Agent Microservices

Testing Pyramid

                ┌────────────────────────────┐
                │   E2E / Chaos Tests        │  ← 5%  (Slow, expensive)
                ├────────────────────────────┤
                │   Integration Tests        │  ← 25% (Mock LLM, real Redis)
                ├────────────────────────────┤
                │   Unit Tests               │  ← 70% (Mock everything)
                └────────────────────────────┘

Unit Testing the AgentRunner

# tests/unit/test_agent_runner.py
import pytest
from unittest.mock import AsyncMock, patch

@pytest.fixture
def mock_llm():
    llm = AsyncMock()
    llm.complete.return_value = LLMResponse(
        content="Here is the search result.",
        finish_reason="stop",
        usage=Usage(prompt_tokens=100, completion_tokens=50, total_tokens=150)
    )
    return llm

async def test_agent_completes_in_one_step(mock_llm):
    runner = AgentRunner("agent-search", test_config)
    runner.llm = mock_llm

    result = await runner.run(AgentTask(id="t1", session_id="s1", prompt="find AI news"))

    assert result.status == TaskStatus.COMPLETED
    assert result.steps_used == 1
    assert result.tokens_used == 150
    mock_llm.complete.assert_called_once()

async def test_agent_respects_token_budget(mock_llm):
    mock_llm.complete.return_value = LLMResponse(
        content="...", finish_reason="tool_calls",
        usage=Usage(prompt_tokens=900, completion_tokens=100, total_tokens=1000)
    )
    task = AgentTask(id="t1", session_id="s1", prompt="...", token_budget=500)
    runner = AgentRunner("agent-search", test_config)
    runner.llm = mock_llm

    result = await runner.run(task)
    assert result.error == "token_budget_exceeded"

Integration Testing with a Mock LLM Server

Use a local mock LLM server (e.g., wiremock or a FastAPI stub) that returns deterministic responses for testing tool call flows end-to-end without hitting real APIs.

# tests/integration/test_tool_flow.py
async def test_search_agent_calls_web_search_tool(mock_llm_server, real_redis, real_tool_registry):
    # Configure mock LLM to respond with a tool call on first turn
    mock_llm_server.set_response(step=0, response=TOOL_CALL_RESPONSE)
    mock_llm_server.set_response(step=1, response=FINAL_RESPONSE)

    runner = AgentRunner("agent-search", integration_config)
    result = await runner.run(AgentTask(id="t1", session_id="s1", prompt="Search for AI news"))

    assert result.status == TaskStatus.COMPLETED
    assert result.tool_calls == 1
    assert real_tool_registry.was_invoked("web_search")

Chaos Testing

Use Chaos Mesh or Litmus to test resilience:

  • Pod kill: Kill a random agent pod — verify Supervisor retries succeed
  • Network partition: Block agent→tool-registry traffic — verify circuit breaker opens
  • LLM latency injection: Add 15s delay to LLM calls — verify timeout and fallback activate
  • Kafka partition leader election: Simulate Kafka failover — verify no task loss via consumer offset management

16. CI/CD Pipeline for Agent Services

# .github/workflows/agent-service.yml
name: Agent Service CI/CD

on:
  push:
    paths: ["agents/agent-search/**"]

jobs:
  test:
    runs-on: ubuntu-latest
    services:
      redis:
        image: redis:7-alpine
        ports: ["6379:6379"]
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with: {python-version: "3.12"}
      - run: pip install -e ".[dev]"
      - run: pytest tests/ --cov=agent --cov-fail-under=85

  security-scan:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Trivy vulnerability scan
        uses: aquasecurity/trivy-action@master
        with: {image-ref: "agent-search:${{ github.sha }}", exit-code: "1"}

  build-push:
    needs: [test, security-scan]
    runs-on: ubuntu-latest
    steps:
      - name: Build and push (pinned by digest)
        run: |
          docker buildx build --platform linux/amd64,linux/arm64 \
            -t registry.myco.io/agent-search:${{ github.sha }} \
            --push agents/agent-search/
          # Capture digest for deployment
          DIGEST=$(docker inspect --format='{{index .RepoDigests 0}}' \
            registry.myco.io/agent-search:${{ github.sha }})
          echo "IMAGE_DIGEST=$DIGEST" >> $GITHUB_ENV

  deploy-staging:
    needs: build-push
    runs-on: ubuntu-latest
    steps:
      - name: Deploy to staging
        run: |
          kubectl set image deployment/agent-search \
            agent=registry.myco.io/agent-search@${{ env.IMAGE_DIGEST }} \
            -n staging
          kubectl rollout status deployment/agent-search -n staging --timeout=120s

  smoke-test-staging:
    needs: deploy-staging
    steps:
      - run: python tests/smoke/run_smoke_tests.py --env staging

  deploy-production:
    needs: smoke-test-staging
    environment: production
    steps:
      - name: Rolling deploy to production
        run: |
          kubectl set image deployment/agent-search \
            agent=registry.myco.io/agent-search@${{ env.IMAGE_DIGEST }} \
            -n production
          kubectl rollout status deployment/agent-search -n production --timeout=300s

17. Cost Management & Token Budgeting

Per-Agent Token Accounting

Track token usage per agent, per session, and per user to enable chargebacks and anomaly detection.

class TokenAccountant:
    async def record(self, agent_id: str, session_id: str, usage: Usage):
        # Increment per-agent daily counter
        await self.redis.incrby(f"tokens:{agent_id}:{today()}", usage.total_tokens)
        await self.redis.expire(f"tokens:{agent_id}:{today()}", 86400 * 7)

        # Increment per-session counter (for user billing)
        await self.redis.incrby(f"tokens:session:{session_id}", usage.total_tokens)

        # Write to time-series DB for cost dashboards
        await self.influx.write(
            measurement="llm_tokens",
            tags={"agent_id": agent_id, "model": usage.model},
            fields={"prompt": usage.prompt_tokens, "completion": usage.completion_tokens},
        )

async def get_estimated_cost(agent_id: str) -> float:
    tokens = int(await redis.get(f"tokens:{agent_id}:{today()}") or 0)
    # GPT-4o pricing: $2.50/1M prompt, $10/1M completion (example)
    return (tokens / 1_000_000) * 5.0  # blended estimate

Budget Enforcement at Session Level

MAX_SESSION_TOKENS = 50_000  # hard cap per user session

async def check_session_budget(session_id: str):
    used = int(await redis.get(f"tokens:session:{session_id}") or 0)
    if used > MAX_SESSION_TOKENS:
        raise SessionBudgetExceeded(
            session_id=session_id,
            tokens_used=used,
            limit=MAX_SESSION_TOKENS
        )

18. Production Readiness Checklist

Service-Level Requirements

  • Agent has /health endpoint that checks LLM client connectivity
  • Agent has /ready endpoint that checks memory store (Redis) and Tool Registry reachability
  • All tool calls are schema-validated by Tool Registry before execution
  • Agent-level RBAC enforced: agent X cannot invoke tools it is not authorized for
  • JWT verification on all inter-agent gRPC and HTTP calls
  • Secrets loaded from Kubernetes Secrets or Vault — never from env literals or ConfigMaps

Reliability Requirements

  • Context window size is bounded — no unbounded message history growth
  • Token budget enforced per task with hard ceiling
  • MAX_STEPS guard in place to prevent runaway loops
  • Exponential backoff with jitter on all LLM calls
  • Circuit breaker configured on LLM client (threshold, recovery timeout)
  • Exponential backoff on all tool calls
  • Failed tasks routed to Dead Letter Queue — not silently dropped
  • Step-level checkpointing for tasks expected to exceed 60 seconds
  • Multi-model fallback cascade configured (primary → cheaper → cross-vendor)

Observability Requirements

  • OpenTelemetry distributed tracing with trace context propagation
  • All LLM completions traced with token counts and latency
  • All tool calls traced with tool name, version, and outcome
  • Prometheus metrics exported: task count, duration, token usage, tool calls, step count
  • Alerts configured: high error rate, runaway steps, token cost spike, high latency
  • Structured logging (JSON) with task_id, session_id (hashed), trace_id — no raw prompt content

Deployment Requirements

  • Agent image pinned to digest, not mutable tag (never :latest)
  • HPA configured with appropriate metrics (queue lag and latency, not just CPU)
  • PodDisruptionBudget set (minAvailable >= 1)
  • Pod topology spread constraints configured for HA across nodes
  • Resource requests and limits set (no QoS class "BestEffort")
  • Rolling update strategy with preStop sleep for graceful shutdown
  • Integration tests cover "tool call fails → agent recovers" path
  • Load tests simulate 10× expected peak concurrency before go-live

Cost Control Requirements

  • Token usage recorded per agent and per session
  • Session-level budget cap enforced
  • Token cost alerting configured per agent
  • DLQ monitored — no silent retry storms

19. Reference Architecture Diagram

                              ┌─────────────────────────┐
                              │      API Gateway         │
                              │  (Auth · Rate Limit)     │
                              └────────────┬────────────┘
                                           │
                              ┌────────────▼────────────┐
                              │   Orchestrator Agent     │
                              │  plan · dispatch · merge │
                              └────────────┬────────────┘
                                           │
                              ┌────────────▼────────────┐
                              │       Supervisor         │
                              │   retry · escalate       │
                              └──┬──────────┬───────┬───┘
                                 │          │       │
               ┌─────────────────▼─┐  ┌────▼───┐  ┌▼────────────┐
               │   agent-search    │  │agent-  │  │ agent-code  │
               │  (Kafka consumer) │  │summar- │  │  (gRPC)     │
               └────────┬──────────┘  │ize     │  └──────┬──────┘
                        │             └────┬───┘         │
                        │                  │              │
               ┌────────▼──────────────────▼──────────────▼──────┐
               │                Tool Registry                     │
               │    schema validation · ACL · rate limiting       │
               └──────┬─────────────┬──────────────┬─────────────┘
                      │             │              │
              ┌───────▼──┐  ┌───────▼──┐  ┌───────▼──┐
              │web_search│  │send_email│  │code_exec │
              └──────────┘  └──────────┘  └──────────┘

              ┌────────────────────────────────────────────────┐
              │                Memory Layer                     │
              │  Redis (short-term) · Postgres · Qdrant (vec)  │
              └────────────────────────────────────────────────┘

              ┌────────────────────────────────────────────────┐
              │              Observability Stack                │
              │  OTel Collector · Jaeger · Prometheus · Grafana│
              └────────────────────────────────────────────────┘

roservices as of May 2026. The field is evolving rapidly — always validate LLM client SDK versions and Kubernetes API compatibility for your environment before production deployment.*

About

Build production-grade AI agent systems using microservices. Covers FastAPI, gRPC, Kafka, Kubernetes, OpenTelemetry, and fault-tolerant orchestration patterns in Python.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors