A production-grade, microservices-based conversational AI agent built with LangGraph, FastAPI, and Gemini 1.5 Flash. The system handles multi-turn conversations, answers product questions using RAG, and captures qualified leads.
autostream-agent/
│
├── gateway/
│ ├── main.py
│ ├── router.py
│ └── Dockerfile
│
├── services/
│ ├── intent_service/
│ │ ├── main.py
│ │ ├── classifier.py
│ │ └── Dockerfile
│ │
│ ├── rag_service/
│ │ ├── main.py
│ │ ├── retriever.py
│ │ ├── Dockerfile
│ │ └── knowledge_base/
│ │ └── autostream_kb.json
│ │
│ └── lead_service/
│ ├── main.py
│ ├── validator.py
│ ├── lead_capture.py
│ └── Dockerfile
│
├── orchestrator/
│ ├── main.py
│ ├── graph.py
│ ├── state.py
│ ├── nodes.py
│ └── Dockerfile
│
├── data/ # Persistent storage (excluded from git)
│ ├── conversations/
│ │ └── conversations.json # Auto-created: session states
│ ├── leads/
│ │ └── leads.json # Auto-created: captured leads
│ └── README.md
│
├── docker-compose.yml
├── requirements.txt
├── .env
├── .gitignore
└── README.md
- Docker and Docker Compose installed
- Google Gemini API key (Get one here)
- Clone the repository:
git clone <repository-url>
cd autostream-agent- Create a
.envfile from the example:
cp .env.example .env- Add your Gemini API key to
.env:
GEMINI_API_KEY=your_actual_api_key_here
- Build and start all services:
docker-compose up --build- The API Gateway will be available at
http://localhost:8000
Send a chat request:
curl -X POST http://localhost:8000/chat \
-H "Content-Type: application/json" \
-d '{"message": "Hello!", "session_id": "test-123"}'Check service health:
curl http://localhost:8000/health
curl http://localhost:8001/health
curl http://localhost:8002/health
curl http://localhost:8003/health
curl http://localhost:8004/healthThis system uses a microservices architecture orchestrated by LangGraph to create a stateful, multi-turn conversational agent.
Why LangGraph? LangGraph provides a graph-based workflow engine that's perfect for complex conversational flows. Unlike simple chain-based approaches (like LangChain LCEL), LangGraph allows conditional routing, state persistence across turns, and clear separation of concerns. Each node in the graph represents a discrete operation (intent classification, knowledge retrieval, lead collection), making the system modular and testable.
State Management: Conversation state is maintained in-memory per session ID in the orchestrator service. The ConversationState TypedDict tracks message history, current intent, retrieved knowledge, partially collected lead information, and the generated response. This state flows through the LangGraph nodes, with each node reading and updating relevant fields. The state persists across multiple turns, enabling context-aware responses and progressive lead collection.
Service Isolation: Each microservice owns a specific domain: Intent Service handles classification, RAG Service manages knowledge retrieval with FAISS vector search, Lead Service validates and captures leads, and the Orchestrator coordinates the conversation flow. Services communicate only via HTTP, ensuring they can be deployed, scaled, and updated independently. The API Gateway provides a single entry point, abstracting the internal topology from clients.
To integrate this agent with WhatsApp Business API, you would add a whatsapp_service container that acts as a webhook receiver.
-
Add WhatsApp Service Container:
- Create
services/whatsapp_service/with a FastAPI app - Expose
POST /webhookendpoint to receive WhatsApp Cloud API callbacks - Add to
docker-compose.ymlwith port mapping (e.g.,8005:8005)
- Create
-
Webhook Verification:
- Implement
GET /webhookfor WhatsApp's verification challenge - Verify
X-Hub-Signature-256header on incoming POST requests using your app secret - Reject requests with invalid signatures to prevent spoofing
- Implement
-
Message Processing Flow:
- Extract user message and sender ID from webhook payload
- Use sender ID as
session_idfor conversation continuity - Forward message to
POST http://gateway:8000/chat - Receive agent response from orchestrator
-
Response Delivery:
- Use WhatsApp Cloud API's Send Message endpoint
- Include recipient phone number and message text
- Handle rate limits and delivery failures with retry logic
- Log all sent messages for debugging
-
Configuration:
- Store WhatsApp credentials (phone number ID, access token, app secret) in
.env - Add
WHATSAPP_VERIFY_TOKENfor webhook verification - Configure webhook URL in Meta Developer Console
- Store WhatsApp credentials (phone number ID, access token, app secret) in
@app.post("/webhook")
async def whatsapp_webhook(request: Request):
# Verify signature
signature = request.headers.get("X-Hub-Signature-256")
if not verify_signature(await request.body(), signature):
raise HTTPException(403)
# Extract message
data = await request.json()
message = data["entry"][0]["changes"][0]["value"]["messages"][0]
sender = message["from"]
text = message["text"]["body"]
# Call orchestrator
response = await httpx.post(
"http://gateway:8000/chat",
json={"message": text, "session_id": sender}
)
# Send response via WhatsApp API
await send_whatsapp_message(sender, response.json()["response"])
return {"status": "ok"}| Service | Method | Path | Request Body | Response |
|---|---|---|---|---|
| Gateway | GET | /health |
- | {"status": "ok", "service": "gateway"} |
| Gateway | POST | /chat |
{"message": str, "session_id": str} |
{"response": str, "session_id": str, "intent": str, "lead_captured": bool} |
| Gateway | POST | /classify |
{"message": str, "session_id": str} |
{"intent": str, "session_id": str} |
| Gateway | POST | /retrieve |
{"query": str, "top_k": int} |
{"results": [{"content": str, "score": float}]} |
| Gateway | POST | /lead |
{"name": str, "email": str, "platform": str} |
{"status": str, "lead_id": str, "message": str} |
| Intent Service | GET | /health |
- | {"status": "ok", "service": "intent"} |
| Intent Service | POST | /classify |
{"message": str, "session_id": str} |
{"intent": str, "session_id": str} |
| RAG Service | GET | /health |
- | {"status": "ok", "service": "rag", "index_loaded": bool} |
| RAG Service | POST | /retrieve |
{"query": str, "top_k": int} |
{"results": [{"content": str, "score": float}]} |
| Lead Service | GET | /health |
- | {"status": "ok", "service": "lead"} |
| Lead Service | POST | /lead |
{"name": str, "email": str, "platform": str} |
{"status": str, "lead_id": str, "message": str} |
| Orchestrator | GET | /health |
- | {"status": "ok", "service": "orchestrator"} |
| Orchestrator | POST | /chat |
{"message": str, "session_id": str} |
{"response": str, "session_id": str, "intent": str, "lead_captured": bool} |
User: "Hi there!"
Agent: "Hello! Welcome to AutoStream. How can I help you today?"
User: "What are your pricing plans?"
Agent: "We offer two plans:
- Basic: $29/month for 10 videos, 720p resolution
- Pro: $79/month for unlimited videos, 4K resolution with AI captions and 24/7 support"
User: "I want to sign up for the Pro plan"
Agent: "Great choice! To get started, may I have your name?"
User: "John Doe"
Agent: "Thanks John! What's your email address?"
User: "john@example.com"
Agent: "Perfect! Which platform do you create content for?"
User: "YouTube"
Agent: "Thank you! Your information has been captured. Lead ID: LEAD-12345. Our team will reach out soon!"
The system uses dual-layer storage for reliability:
- In-Memory: Fast access for active sessions (orchestrator RAM)
- Persistent Files: JSON files that survive container restarts
Conversations (./data/conversations/conversations.json):
- Stores all conversation history by session_id
- Saved after each message exchange
- Loaded from disk when resuming existing sessions
- Format:
{ "session_id": { "messages": [...], "intent": "...", "collected_lead": {...}, ... } }
Leads (./data/leads/leads.json):
- Captured leads written immediately
- Append-only array format
- Each lead includes: lead_id, name, email, platform, timestamp, status
- Format:
[{ "lead_id": "LEAD-12345", "name": "...", "email": "...", ... }]
- Survives container restarts and system reboots
- Human-readable JSON for easy inspection
- No external database dependencies needed
- Simple backup: just copy the
data/folder
# Backup
cp -r ./data ./data-backup-$(date +%Y%m%d)
# Restore
cp -r ./data-backup-20260412 ./data- Services won't start: Ensure
GEMINI_API_KEYis set in.env - RAG Service unhealthy: Check that
autostream_kb.jsonexists and is valid JSON - Connection refused errors: Wait 30 seconds for all services to initialize
- Gemini API errors: Verify your API key has sufficient quota
- Data not persisting: Check that
./data/directory exists and has write permissions
MIT