diff --git a/API_SPEC.md b/API_SPEC.md new file mode 100644 index 00000000..79ee4a19 --- /dev/null +++ b/API_SPEC.md @@ -0,0 +1,1053 @@ +# Paws HTTP API Specification + +## Overview + +The Paws HTTP Server provides a RESTful API for interacting with the Paws AI assistant. The server supports both standard JSON responses and Server-Sent Events (SSE) for streaming operations. + +**Base URL:** `http://localhost:3000` + +**Default Port:** `3000` + +**CORS:** Enabled (permissive) + +--- + +## Table of Contents + +- [General](#general) +- [Resources](#resources) +- [Chat & Execution](#chat--execution) +- [Conversations](#conversations) +- [Configuration](#configuration) +- [MCP (Model Context Protocol)](#mcp-model-context-protocol) +- [Authentication](#authentication) +- [Platform Authentication](#platform-authentication) + +--- + +## General + +### Health Check + +Check if the server is running. + +```http +GET /api/health +``` + +**Response:** `200 OK` + +```json +"OK" +``` + +--- + +### Environment Information + +Get environment information including current working directory and other system details. + +```http +GET /api/env +``` + +**Response:** `200 OK` + +```json +{ + "cwd": "/path/to/current/directory", + "home": "/home/user", + "os": "linux", + "arch": "x86_64" +} +``` + +--- + +## Resources + +### Discover Files + +List files in the current working directory. + +```http +GET /api/files +``` + +**Response:** `200 OK` + +```json +[ + { + "name": "file.txt", + "path": "/path/to/file.txt", + "is_dir": false, + "size": 1024 + } +] +``` + +--- + +### Get Tools + +List all available tools. + +```http +GET /api/tools +``` + +**Response:** `200 OK` + +```json +[ + { + "name": "read", + "description": "Read file contents", + "parameters": { + "type": "object", + "properties": { + "path": { "type": "string" } + } + } + } +] +``` + +--- + +### Get Models + +List all available models. + +```http +GET /api/models +``` + +**Response:** `200 OK` + +```json +[ + { + "id": "claude-3-opus", + "name": "Claude 3 Opus", + "provider_id": "anthropic", + "context_length": 200000, + "tools_supported": true + } +] +``` + +--- + +### Get Agents + +List all available agents. + +```http +GET /api/agents +``` + +**Response:** `200 OK` + +```json +[ + { + "id": "sage", + "title": "Research Agent", + "description": "Performs research and investigation", + "model": "claude-3-opus", + "provider_id": "anthropic" + } +] +``` + +--- + +### Get Active Agent + +Get the currently active agent ID. + +```http +GET /api/active-agent +``` + +**Response:** `200 OK` + +```json +{ + "agent_id": "sage" +} +``` + +--- + +### Set Active Agent + +Set the active agent. + +```http +POST /api/active-agent +``` + +**Request Body:** + +```json +{ + "agent_id": "sage" +} +``` + +**Response:** `200 OK` + +--- + +### Get Providers + +List all available providers. + +```http +GET /api/providers +``` + +**Response:** `200 OK` + +```json +[ + { + "id": "anthropic", + "name": "Anthropic", + "provider_type": "llm", + "url": "https://api.anthropic.com", + "is_configured": true + } +] +``` + +--- + +### Get Provider + +Get details for a specific provider. + +```http +GET /api/providers/:id +``` + +**Path Parameters:** + +| Parameter | Type | Description | +|-----------|--------|-------------------| +| id | string | Provider ID | + +**Response:** `200 OK` + +```json +{ + "id": "anthropic", + "name": "Anthropic", + "provider_type": "llm", + "url": "https://api.anthropic.com", + "is_configured": true, + "auth_methods": ["api_key", "oauth"] +} +``` + +--- + +### Get Skills + +List all available skills. + +```http +GET /api/skills +``` + +**Response:** `200 OK` + +```json +[ + { + "name": "debug-cli", + "description": "Debug CLI commands", + "path": "/path/to/skill" + } +] +``` + +--- + +### Get Workflow + +Get the merged workflow configuration. + +```http +GET /api/workflow +``` + +**Response:** `200 OK` + +```json +{ + "steps": [...], + "configuration": {...} +} +``` + +--- + +### Get Commands + +List all available commands. + +```http +GET /api/commands +``` + +**Response:** `200 OK` + +```json +[ + { + "name": "test", + "description": "Run tests" + } +] +``` + +--- + +## Chat & Execution + +### Chat + +Send a chat message and receive a streaming response via SSE. + +```http +POST /api/chat +``` + +**Request Body:** + +```json +{ + "prompt": "Hello, how are you?", + "conversation_id": "optional-conversation-id", + "agent_id": "sage", + "context": {} +} +``` + +**Response:** `200 OK` (SSE Stream) + +Each event contains a JSON object: + +```json +{ + "type": "content", + "content": "Hello! I'm doing well, thank you.", + "is_complete": false +} +``` + +**SSE Events:** + +| Event Type | Description | +|------------|----------------------------| +| data | Response content | +| error | Error message | +| keepalive | Keep-alive ping (default) | + +--- + +### Execute Command + +Execute a shell command. + +```http +POST /api/command +``` + +**Request Body:** + +```json +{ + "command": "ls -la", + "working_dir": "/optional/path" +} +``` + +**Response:** `200 OK` + +```json +{ + "stdout": "file1.txt\nfile2.txt\n", + "stderr": "", + "exit_code": 0, + "success": true +} +``` + +--- + +### Generate Command + +Generate a shell command from a natural language prompt. + +```http +POST /api/generate-command +``` + +**Request Body:** + +```json +{ + "prompt": "List all files in the current directory" +} +``` + +**Response:** `200 OK` + +```json +{ + "command": "ls -la", + "explanation": "Lists all files including hidden ones" +} +``` + +--- + +### Generate Data + +Generate data from JSONL configuration via SSE stream. + +```http +POST /api/data/generate +``` + +**Request Body:** + +```json +{ + "jsonl_path": "/path/to/config.jsonl", + "output_path": "/path/to/output", + "num_samples": 10 +} +``` + +**Response:** `200 OK` (SSE Stream) + +Each event contains generated data: + +```json +{ + "sample": 1, + "data": {...}, + "status": "generating" +} +``` + +--- + +## Conversations + +### Get Conversations + +List all conversations. + +```http +GET /api/conversations?limit=10 +``` + +**Query Parameters:** + +| Parameter | Type | Description | Default | +|-----------|---------|----------------------------|---------| +| limit | integer | Maximum number of results | null | + +**Response:** `200 OK` + +```json +[ + { + "id": "conv-123", + "title": "Project Setup", + "created_at": "2024-01-01T00:00:00Z", + "updated_at": "2024-01-01T01:00:00Z" + } +] +``` + +--- + +### Create Conversation + +Create a new conversation with minimal required fields. + +```http +POST /api/conversations +``` + +**Request Body:** + +```json +{ + "id": "550e8400-e29b-41d4-a716-446655440000", + "title": "Optional Title" +} +``` + +| Field | Type | Required | Description | +|-------|--------|----------|--------------------------------| +| id | string | Yes | UUID for the conversation | +| title | string | No | Optional title for display | + +**Response:** `201 Created` + +```json +{ + "id": "550e8400-e29b-41d4-a716-446655440000", + "title": "Optional Title", + "created_at": "2024-01-01T00:00:00Z" +} +``` + +--- + +### Update Conversation + +Update an existing conversation with full conversation data. + +```http +PUT /api/conversations/:id +``` + +**Path Parameters:** + +| Parameter | Type | Description | +|-----------|--------|-------------------| +| id | string | Conversation ID | + +**Request Body:** Full `Conversation` object + +**Response:** `200 OK` + +**Response:** `400 Bad Request` (if path ID doesn't match body ID) + +--- + +### Get Conversation + +Get details of a specific conversation. + +```http +GET /api/conversations/:id +``` + +**Path Parameters:** + +| Parameter | Type | Description | +|-----------|--------|-------------------| +| id | string | Conversation ID | + +**Response:** `200 OK` + +```json +{ + "id": "conv-123", + "title": "Project Setup", + "context": {...}, + "metrics": {...}, + "metadata": { + "created_at": "2024-01-01T00:00:00Z", + "updated_at": "2024-01-01T01:00:00Z" + } +} +``` + +**Response:** `404 Not Found` + +```json +{ + "error": "Conversation not found: conv-123" +} +``` + +--- + +### Delete Conversation + +Delete a conversation. + +```http +DELETE /api/conversations/:id +``` + +**Path Parameters:** + +| Parameter | Type | Description | +|-----------|--------|-------------------| +| id | string | Conversation ID | + +**Response:** `204 No Content` + +--- + +### Compact Conversation + +Compact a conversation to reduce context size. + +```http +POST /api/conversations/:id/compact +``` + +**Path Parameters:** + +| Parameter | Type | Description | +|-----------|--------|-------------------| +| id | string | Conversation ID | + +**Response:** `200 OK` + +```json +{ + "original_size": 1000, + "compressed_size": 200, + "compression_ratio": 0.2 +} +``` + +--- + +## Configuration + +### Get Default Provider + +Get the default provider. + +```http +GET /api/config/default-provider +``` + +**Response:** `200 OK` + +```json +{ + "provider_id": "anthropic" +} +``` + +--- + +### Set Default Provider + +Set the default provider. + +```http +POST /api/config/default-provider +``` + +**Request Body:** + +```json +{ + "provider_id": "anthropic" +} +``` + +**Response:** `200 OK` + +--- + +### Get Default Model + +Get the default model. + +```http +GET /api/config/default-model +``` + +**Response:** `200 OK` + +```json +{ + "model_id": "claude-3-opus" +} +``` + +--- + +### Set Default Model + +Set the default model. + +```http +POST /api/config/default-model +``` + +**Request Body:** + +```json +{ + "model_id": "claude-3-opus" +} +``` + +**Response:** `200 OK` + +--- + +## MCP (Model Context Protocol) + +### Read MCP Config + +Read MCP configuration. + +```http +GET /api/mcp/config?scope=user +``` + +**Query Parameters:** + +| Parameter | Type | Description | Default | +|-----------|--------|--------------------------|---------| +| scope | string | Configuration scope | null | + +**Response:** `200 OK` + +```json +{ + "servers": [ + { + "name": "filesystem", + "command": "npx", + "args": ["-y", "@modelcontextprotocol/server-filesystem", "/path"] + } + ] +} +``` + +--- + +### Write MCP Config + +Write MCP configuration. + +```http +POST /api/mcp/config +``` + +**Request Body:** + +```json +{ + "scope": "user", + "config": { + "servers": [...] + } +} +``` + +**Response:** `200 OK` + +**Response:** `400 Bad Request` (if scope or config missing) + +--- + +### Reload MCP + +Reload MCP servers. + +```http +POST /api/mcp/reload +``` + +**Response:** `200 OK` + +--- + +## Authentication + +### Init Provider Auth + +Initialize provider authentication. + +```http +POST /api/auth/init +``` + +**Request Body:** + +```json +{ + "provider_id": "anthropic", + "method": "api_key" +} +``` + +**Response:** `200 OK` + +```json +{ + "auth_url": "https://auth.example.com", + "state": "random-state-string", + "expires_in": 300 +} +``` + +--- + +### Complete Provider Auth + +Complete provider authentication. + +```http +POST /api/auth/complete +``` + +**Request Body:** + +```json +{ + "provider_id": "anthropic", + "context": { + "code": "auth-code", + "state": "random-state-string" + }, + "timeout_secs": 60 +} +``` + +**Response:** `200 OK` + +--- + +### Logout + +Logout from a provider or platform. + +```http +POST /api/auth/logout +``` + +**Request Body:** + +```json +{ + "provider_id": "anthropic" +} +``` + +**Response:** `200 OK` + +**Note:** Omit `provider_id` to logout from platform. + +--- + +### User Info + +Get provider user information. + +```http +GET /api/auth/user +``` + +**Response:** `200 OK` + +```json +{ + "user_id": "user-123", + "email": "user@example.com", + "name": "John Doe" +} +``` + +--- + +### User Usage + +Get provider usage information. + +```http +GET /api/auth/usage +``` + +**Response:** `200 OK` + +```json +{ + "total_tokens": 100000, + "input_tokens": 60000, + "output_tokens": 40000, + "requests": 500 +} +``` + +--- + +## Platform Authentication + +### Init Platform Login + +Initialize platform login. + +```http +POST /api/platform/auth/init +``` + +**Response:** `200 OK` + +```json +{ + "auth_url": "https://platform.example.com/auth", + "state": "random-state-string" +} +``` + +--- + +### Platform Login + +Complete platform login. + +```http +POST /api/platform/auth/login +``` + +**Request Body:** + +```json +{ + "auth_url": "https://platform.example.com/auth", + "state": "random-state-string", + "code": "auth-code" +} +``` + +**Response:** `200 OK` + +--- + +### Platform User Info + +Get platform login information. + +```http +GET /api/platform/auth/info +``` + +**Response:** `200 OK` + +```json +{ + "user_id": "user-123", + "email": "user@example.com", + "name": "John Doe", + "is_authenticated": true +} +``` + +--- + +## Error Responses + +All endpoints return error responses in JSON format: + +```json +{ + "error": "Error message describing what went wrong", + "details": "Optional additional context (only present when relevant)" +} +``` + +**Common HTTP Status Codes:** + +| Status Code | Description | +|-------------|--------------------------| +| 200 | Success | +| 201 | Created | +| 204 | No Content | +| 400 | Bad Request | +| 404 | Not Found | +| 422 | Unprocessable Entity | +| 500 | Internal Server Error | + +**Example Error Responses:** + +`404 Not Found`: +```json +{ + "error": "Conversation not found: 550e8400-e29b-41d4-a716-446655440000" +} +``` + +`400 Bad Request`: +```json +{ + "error": "Conversation ID in path does not match body" +} +``` + +`500 Internal Server Error`: +```json +{ + "error": "Database connection failed" +} +``` + +--- + +## SSE (Server-Sent Events) Format + +For streaming endpoints (`/api/chat`, `/api/data/generate`), responses use SSE format: + +``` +data: {"type":"content","content":"Hello","is_complete":false} + +data: {"type":"content","content":"!","is_complete":true} + +event: error +data: Connection lost + +: keep-alive +``` + +**Event Types:** + +| Event | Description | +|----------|----------------------------------| +| data | Normal data payload | +| error | Error occurred | +| (no type)| Keep-alive ping (default event) | + +--- + +## Running the Server + +### Using Cargo + +```bash +cargo run -- serve --port 3000 +``` + +### Using Binary + +```bash +paws serve --port 3000 +``` + +### CLI Options + +| Option | Type | Description | Default | +|----------|--------|----------------------|---------| +| --port | number | Port to listen on | 3000 | + +--- + +## Notes + +- All datetime fields use ISO 8601 format (e.g., `2024-01-01T00:00:00Z`) +- IDs are strings and should be treated as opaque values +- CORS is enabled for all origins (permissive mode) +- The server logs all requests via the tracing layer +- Streaming endpoints use SSE with automatic keep-alive (default 15 seconds) diff --git a/Cargo.lock b/Cargo.lock index 50f06a36..3bb2272a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -394,6 +394,73 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" +dependencies = [ + "async-trait", + "axum-core", + "axum-macros", + "bytes", + "futures-util", + "http 1.4.0", + "http-body 1.0.1", + "http-body-util", + "hyper", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.4.0", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-macros" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57d123550fa8d071b7255cb0cc04dc302baa6c8c4a79f55701552684d8399bce" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "backon" version = "1.6.0" @@ -2493,6 +2560,12 @@ dependencies = [ "regex-automata", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "memchr" version = "2.7.6" @@ -3166,6 +3239,7 @@ dependencies = [ "paws_app", "paws_common", "paws_domain", + "paws_server", "paws_services", "pretty_assertions", "reedline", @@ -3219,6 +3293,27 @@ dependencies = [ "url", ] +[[package]] +name = "paws_server" +version = "0.1.0" +dependencies = [ + "anyhow", + "axum", + "chrono", + "futures", + "paws_api", + "paws_app", + "paws_domain", + "pretty_assertions", + "serde", + "serde_json", + "tokio", + "tokio-stream", + "tower-http 0.5.2", + "tracing", + "uuid", +] + [[package]] name = "paws_services" version = "0.1.0" @@ -3825,7 +3920,7 @@ dependencies = [ "tokio-rustls", "tokio-util", "tower", - "tower-http", + "tower-http 0.6.8", "tower-service", "url", "wasm-bindgen", @@ -4782,6 +4877,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] @@ -4841,6 +4937,24 @@ dependencies = [ "tokio", "tower-layer", "tower-service", + "tracing", +] + +[[package]] +name = "tower-http" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" +dependencies = [ + "bitflags", + "bytes", + "http 1.4.0", + "http-body 1.0.1", + "http-body-util", + "pin-project-lite", + "tower-layer", + "tower-service", + "tracing", ] [[package]] @@ -4879,6 +4993,7 @@ version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", diff --git a/Cargo.toml b/Cargo.toml index aaeb8e74..739af1ab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -118,6 +118,7 @@ paws_common = { path = "crates/paws_common" } paws_domain = { path = "crates/paws_domain" } paws_infra = { path = "crates/paws_infra" } paws_repo = { path = "crates/paws_repo" } +paws_server = { path = "crates/paws_server" } paws_services = { path = "crates/paws_services" } paws_tool_macros = { path = "crates/paws_tool_macros" } diff --git a/api_demo.sh b/api_demo.sh new file mode 100644 index 00000000..98eda928 --- /dev/null +++ b/api_demo.sh @@ -0,0 +1,123 @@ +#!/bin/bash +# API Demo Script for Paws Server +# This demonstrates the task-based API for the web frontend + +set -e + +BASE_URL="${1:-http://localhost:3010}" +echo "=== Paws API Demo ===" +echo "Server: $BASE_URL" +echo "" + +# 1. Health check +echo "1. Health Check" +echo "GET /api/health" +curl -s "$BASE_URL/api/health" +echo -e "\n" + +# 2. Get environment info +echo "2. Get Environment" +echo "GET /api/env" +curl -s "$BASE_URL/api/env" | jq '.cwd' +echo "" + +# 3. List available agents +echo "3. List Agents" +echo "GET /api/agents" +curl -s "$BASE_URL/api/agents" | jq '.[].id' +echo "" + +# 4. Set active agent +echo "4. Set Active Agent" +echo "POST /api/config/active-agent" +curl -s -X POST "$BASE_URL/api/config/active-agent" \ + -H "Content-Type: application/json" \ + -d '{"agent_id": "paws"}' +echo -e "\n" + +# 5. Create a conversation +echo "5. Create Conversation" +echo "POST /api/conversations" +CONV_ID=$(uuidgen) +curl -s -X POST "$BASE_URL/api/conversations" \ + -H "Content-Type: application/json" \ + -d "{\"id\": \"$CONV_ID\", \"title\": \"API Demo Conversation\"}" +echo -e "\n" +echo "Conversation ID: $CONV_ID" +echo "" + +# 6. List conversations +echo "6. List Conversations" +echo "GET /api/conversations" +curl -s "$BASE_URL/api/conversations" | jq '.[0]' +echo "" + +# 7. Create a task (submit a message for processing) +echo "7. Create Task" +echo "POST /api/tasks" +TASK_RESP=$(curl -s -X POST "$BASE_URL/api/tasks" \ + -H "Content-Type: application/json" \ + -d "{\"conversation_id\": \"$CONV_ID\", \"message\": \"What is 2+2? Just give me the number.\"}") +echo "$TASK_RESP" | jq '.' +TASK_ID=$(echo "$TASK_RESP" | jq -r '.task_id') +echo "Task ID: $TASK_ID" +echo "" + +# 8. Get task status +echo "8. Get Task Status (polling...)" +echo "GET /api/tasks/:id" +for i in {1..10}; do + STATUS=$(curl -s "$BASE_URL/api/tasks/$TASK_ID") + STATUS_TYPE=$(echo "$STATUS" | jq -r '.status.type') + echo " Status: $STATUS_TYPE" + if [ "$STATUS_TYPE" = "completed" ] || [ "$STATUS_TYPE" = "failed" ]; then + break + fi + sleep 2 +done +echo "$STATUS" | jq '.' +echo "" + +# 9. Get task events (for reconnection) +echo "9. Get Task Events" +echo "GET /api/tasks/:id/events" +curl -s "$BASE_URL/api/tasks/$TASK_ID/events" | jq '.' +echo "" + +# 10. List all tasks +echo "10. List All Tasks" +echo "GET /api/tasks" +curl -s "$BASE_URL/api/tasks" | jq '.[].status.type' +echo "" + +# 11. Get conversation messages +echo "11. Get Conversation Messages" +echo "GET /api/conversations/:id" +curl -s "$BASE_URL/api/conversations/$CONV_ID" | jq '.messages[-1]' +echo "" + +# 12. Test SSE stream (for 5 seconds) +echo "12. SSE Stream Test (5 seconds)" +echo "GET /api/tasks/:id/stream" +timeout 5 curl -s -N "$BASE_URL/api/tasks/$TASK_ID/stream" 2>/dev/null | head -5 || echo "(stream ended or no new events)" +echo "" + +# 13. Get available models +echo "13. List Models" +echo "GET /api/models" +curl -s "$BASE_URL/api/models" | jq '.[0]' +echo "" + +# 14. Get available skills +echo "14. List Skills" +echo "GET /api/skills" +curl -s "$BASE_URL/api/skills" | jq '.[].name' +echo "" + +# 15. Cleanup - Delete conversation +echo "15. Delete Conversation" +echo "DELETE /api/conversations/:id" +curl -s -X DELETE "$BASE_URL/api/conversations/$CONV_ID" +echo -e "\n" + +echo "=== Demo Complete ===" diff --git a/crates/paws_api/src/api.rs b/crates/paws_api/src/api.rs index 215b4538..6d68049f 100644 --- a/crates/paws_api/src/api.rs +++ b/crates/paws_api/src/api.rs @@ -30,6 +30,9 @@ pub trait API: Sync + Send { /// Gets a provider by ID async fn get_provider(&self, id: &ProviderId) -> Result; + /// Gets models for a specific provider + async fn get_provider_models(&self, id: &ProviderId) -> Result>; + /// Executes a chat request and returns a stream of responses async fn chat(&self, chat: ChatRequest) -> Result>>; @@ -55,6 +58,13 @@ pub trait API: Sync + Send { /// Returns the conversation with the given ID async fn conversation(&self, conversation_id: &ConversationId) -> Result>; + /// Lists conversation summaries for the active workspace (lightweight, no + /// context) + async fn get_conversation_summaries( + &self, + limit: Option, + ) -> Result>; + /// Lists all conversations for the active workspace async fn get_conversations(&self, limit: Option) -> Result>; diff --git a/crates/paws_api/src/paws_api.rs b/crates/paws_api/src/paws_api.rs index f255422c..21ae1eae 100644 --- a/crates/paws_api/src/paws_api.rs +++ b/crates/paws_api/src/paws_api.rs @@ -86,6 +86,16 @@ impl Result> { + let provider = self.get_provider(id).await?; + // Only configured URL providers can fetch models + if let Some(configured) = provider.into_configured() { + Ok(self.services.models(configured).await?) + } else { + Ok(vec![]) + } + } + async fn chat( &self, chat: ChatRequest, @@ -143,6 +153,17 @@ impl, + ) -> anyhow::Result> { + Ok(self + .services + .get_conversation_summaries(limit) + .await? + .unwrap_or_default()) + } + async fn last_conversation(&self) -> anyhow::Result> { self.services.last_conversation().await } diff --git a/crates/paws_app/src/services.rs b/crates/paws_app/src/services.rs index 9500f666..f93bab0c 100644 --- a/crates/paws_app/src/services.rs +++ b/crates/paws_app/src/services.rs @@ -211,6 +211,13 @@ pub trait ConversationService: Send + Sync { F: FnOnce(&mut Conversation) -> T + Send, T: Send; + /// Find conversation summaries with optional limit (lightweight, no + /// context) + async fn get_conversation_summaries( + &self, + limit: Option, + ) -> anyhow::Result>>; + /// Find conversations with optional limit async fn get_conversations( &self, @@ -564,6 +571,15 @@ impl ConversationService for I { self.conversation_service().get_conversations(limit).await } + async fn get_conversation_summaries( + &self, + limit: Option, + ) -> anyhow::Result>> { + self.conversation_service() + .get_conversation_summaries(limit) + .await + } + async fn last_conversation(&self) -> anyhow::Result> { self.conversation_service().last_conversation().await } diff --git a/crates/paws_domain/src/agent.rs b/crates/paws_domain/src/agent.rs index 5fb67a48..4c6e2e86 100644 --- a/crates/paws_domain/src/agent.rs +++ b/crates/paws_domain/src/agent.rs @@ -9,7 +9,7 @@ use crate::{ /// Runtime agent representation with required model and provider /// Created by converting AgentDefinition with resolved defaults -#[derive(Debug, Clone, Setters)] +#[derive(Debug, Clone, Setters, serde::Serialize)] #[setters(strip_option, into)] pub struct Agent { /// Flag to enable/disable tool support for this agent. diff --git a/crates/paws_domain/src/app_config.rs b/crates/paws_domain/src/app_config.rs index 300eff94..2c1d6917 100644 --- a/crates/paws_domain/src/app_config.rs +++ b/crates/paws_domain/src/app_config.rs @@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize}; use crate::{ModelId, ProviderId}; -#[derive(Deserialize)] +#[derive(Deserialize, serde::Serialize)] #[serde(rename_all = "camelCase")] pub struct InitAuth { pub session_id: String, diff --git a/crates/paws_domain/src/auth/auth_context.rs b/crates/paws_domain/src/auth/auth_context.rs index dacc15c8..a1a8be21 100644 --- a/crates/paws_domain/src/auth/auth_context.rs +++ b/crates/paws_domain/src/auth/auth_context.rs @@ -8,20 +8,20 @@ use super::{ URLParamValue, UserCode, }; -#[derive(Debug, Clone, Deref, From)] +#[derive(Debug, Clone, Deref, From, serde::Serialize, serde::Deserialize)] pub struct URLParameters(HashMap); // API Key Flow /// Request parameters for API key authentication -#[derive(Debug, Clone)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct ApiKeyRequest { pub required_params: Vec, pub existing_params: Option, } /// Response containing API key and URL parameters -#[derive(Debug, Clone)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct ApiKeyResponse { pub api_key: ApiKey, pub url_params: HashMap, @@ -30,11 +30,11 @@ pub struct ApiKeyResponse { // Authorization Code Flow /// Authorization code OAuth authentication flow -#[derive(Debug, Clone)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct CodeAuthFlow; /// Request parameters for authorization code flow -#[derive(Debug, Clone)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct CodeRequest { pub authorization_url: Url, pub state: State, @@ -43,7 +43,7 @@ pub struct CodeRequest { } /// Response containing authorization code -#[derive(Debug, Clone)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct CodeResponse { pub code: AuthorizationCode, } @@ -51,11 +51,11 @@ pub struct CodeResponse { // Device Code Flow /// Device code OAuth authentication flow -#[derive(Debug, Clone)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct DeviceCodeAuthFlow; /// Request parameters for device code flow -#[derive(Debug, Clone)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct DeviceCodeRequest { pub user_code: UserCode, pub device_code: DeviceCode, @@ -67,18 +67,18 @@ pub struct DeviceCodeRequest { } /// Response for device code flow -#[derive(Debug, Clone)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct DeviceCodeResponse; /// Generic container that pairs a request with its corresponding response -#[derive(Debug, Clone)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct AuthContext { pub request: Request, pub response: Response, } /// Represents different types of authentication requests -#[derive(Debug, Clone)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum AuthContextRequest { ApiKey(ApiKeyRequest), DeviceCode(DeviceCodeRequest), @@ -86,7 +86,7 @@ pub enum AuthContextRequest { } /// Represents completed authentication flows with their request/response pairs -#[derive(Debug, Clone)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum AuthContextResponse { ApiKey(AuthContext), DeviceCode(AuthContext), diff --git a/crates/paws_domain/src/chat_response.rs b/crates/paws_domain/src/chat_response.rs index a5df831f..1f54f432 100644 --- a/crates/paws_domain/src/chat_response.rs +++ b/crates/paws_domain/src/chat_response.rs @@ -5,7 +5,7 @@ use chrono::Local; use crate::{ToolCallFull, ToolName, ToolResult}; -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] pub enum ChatResponseContent { Title(TitleFormat), PlainText(String), @@ -44,7 +44,7 @@ impl ChatResponseContent { /// Events that are emitted by the agent for external consumption. This includes /// events for all internal state changes. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum ChatResponse { TaskMessage { content: ChatResponseContent }, TaskReasoning { content: String }, @@ -74,7 +74,7 @@ impl ChatResponse { } } -#[derive(Debug, Clone)] +#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)] pub enum InterruptionReason { MaxToolFailurePerTurnLimitReached { limit: u64, @@ -85,7 +85,7 @@ pub enum InterruptionReason { }, } -#[derive(Clone)] +#[derive(Clone, serde::Serialize, serde::Deserialize)] pub struct Cause(String); impl Cause { @@ -110,7 +110,7 @@ impl From<&anyhow::Error> for Cause { } } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)] pub enum Category { Action, Info, @@ -120,7 +120,9 @@ pub enum Category { Warning, } -#[derive(Clone, derive_setters::Setters, Debug, PartialEq)] +#[derive( + Clone, derive_setters::Setters, Debug, PartialEq, serde::Serialize, serde::Deserialize, +)] #[setters(into, strip_option)] pub struct TitleFormat { pub title: String, diff --git a/crates/paws_domain/src/conversation.rs b/crates/paws_domain/src/conversation.rs index 7d7353d4..71004223 100644 --- a/crates/paws_domain/src/conversation.rs +++ b/crates/paws_domain/src/conversation.rs @@ -38,6 +38,26 @@ impl FromStr for ConversationId { } } +/// A lightweight summary of a conversation without the full context +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct ConversationSummary { + pub id: ConversationId, + pub title: Option, + pub metrics: Metrics, + pub metadata: MetaData, +} + +impl From for ConversationSummary { + fn from(conversation: Conversation) -> Self { + Self { + id: conversation.id, + title: conversation.title, + metrics: conversation.metrics, + metadata: conversation.metadata, + } + } +} + #[derive(Debug, Setters, Serialize, Deserialize, Clone)] #[setters(into)] pub struct Conversation { diff --git a/crates/paws_domain/src/provider.rs b/crates/paws_domain/src/provider.rs index 5818a244..47ae3ea9 100644 --- a/crates/paws_domain/src/provider.rs +++ b/crates/paws_domain/src/provider.rs @@ -222,7 +222,7 @@ impl Provider { /// Enum for viewing providers in listings where both configured and /// unconfigured. -#[derive(Debug, Clone, PartialEq, From)] +#[derive(Debug, Clone, PartialEq, From, serde::Serialize)] pub enum AnyProvider { Url(Provider), Template(Provider>>), diff --git a/crates/paws_domain/src/repo.rs b/crates/paws_domain/src/repo.rs index 96f3a5f2..57548f3c 100644 --- a/crates/paws_domain/src/repo.rs +++ b/crates/paws_domain/src/repo.rs @@ -60,6 +60,21 @@ pub trait ConversationRepository: Send + Sync { conversation_id: &ConversationId, ) -> Result>; + /// Retrieves all conversation summaries with an optional limit + /// + /// Returns lightweight summaries without the full context for faster + /// loading + /// + /// # Arguments + /// * `limit` - Optional maximum number of conversations to retrieve + /// + /// # Errors + /// Returns an error if the operation fails + async fn get_all_conversation_summaries( + &self, + limit: Option, + ) -> Result>>; + /// Retrieves all conversations with an optional limit /// /// # Arguments diff --git a/crates/paws_main/Cargo.toml b/crates/paws_main/Cargo.toml index 28fb3ea1..17c314ff 100644 --- a/crates/paws_main/Cargo.toml +++ b/crates/paws_main/Cargo.toml @@ -30,6 +30,9 @@ workspace = true [dependencies.paws_api] workspace = true +[dependencies.paws_server] +workspace = true + [dependencies.paws_domain] workspace = true diff --git a/crates/paws_main/src/cli.rs b/crates/paws_main/src/cli.rs index 9086bbf8..c8fcf93b 100644 --- a/crates/paws_main/src/cli.rs +++ b/crates/paws_main/src/cli.rs @@ -138,6 +138,13 @@ pub enum TopLevelCommand { /// Process JSONL data through LLM with schema-constrained tools. Data(DataCommandGroup), + + /// Start the API server. + Serve { + /// Port to listen on. + #[arg(long, default_value_t = 3000)] + port: u16, + }, } /// Command group for custom command management. diff --git a/crates/paws_main/src/ui.rs b/crates/paws_main/src/ui.rs index 4a5b54a1..a32648c5 100644 --- a/crates/paws_main/src/ui.rs +++ b/crates/paws_main/src/ui.rs @@ -641,6 +641,10 @@ impl A + Send + Sync> UI { self.writeln(data?)?; } } + TopLevelCommand::Serve { port } => { + self.on_serve(port).await?; + return Ok(()); + } } Ok(()) } @@ -1459,6 +1463,20 @@ impl A + Send + Sync> UI { Ok(()) } + /// Start the API server + async fn on_serve(&mut self, port: u16) -> anyhow::Result<()> { + use paws_server::Server; + + self.writeln_title(TitleFormat::info(format!( + "Starting Paws API server on port {port}..." + )))?; + + let server = Server::new(self.api.clone(), port); + server.run().await?; + + Ok(()) + } + async fn on_zsh_prompt(&self) -> anyhow::Result<()> { let plugin = crate::zsh_plugin::generate_zsh_plugin()?; println!("{plugin}"); diff --git a/crates/paws_repo/src/conversation/conversation_repo.rs b/crates/paws_repo/src/conversation/conversation_repo.rs index 5f64bfcb..27a37459 100644 --- a/crates/paws_repo/src/conversation/conversation_repo.rs +++ b/crates/paws_repo/src/conversation/conversation_repo.rs @@ -56,6 +56,61 @@ impl ConversationRepository for ConversationRepositoryImpl { } } + async fn get_all_conversation_summaries( + &self, + limit: Option, + ) -> anyhow::Result>> { + let mut connection = self.pool.get_connection()?; + + let workspace_id = self.wid.id() as i64; + let mut query = conversations::table + .filter(conversations::workspace_id.eq(&workspace_id)) + .filter(conversations::context.is_not_null()) + .order(conversations::updated_at.desc()) + .into_boxed(); + + if let Some(limit_value) = limit { + query = query.limit(limit_value as i64); + } + + let records: Vec = query.load(&mut connection)?; + + if records.is_empty() { + return Ok(None); + } + + let summaries: Vec = records + .into_iter() + .map(|record| { + // Deserialize metrics using MetricsRecord for proper type handling + let metrics = if let Some(text) = record.metrics { + if let Ok(metrics_record) = serde_json::from_str::< + crate::conversation::conversation_record::MetricsRecord, + >(&text) + { + paws_domain::Metrics::from(metrics_record) + } else { + paws_domain::Metrics::default() + } + } else { + paws_domain::Metrics::default() + }; + + Ok(paws_domain::ConversationSummary { + id: paws_domain::ConversationId::parse(record.conversation_id)?, + title: record.title, + metrics, + metadata: paws_domain::MetaData { + created_at: record.created_at.and_utc(), + updated_at: record.updated_at.map(|dt| dt.and_utc()), + }, + }) + }) + .collect::>()?; + + Ok(Some(summaries)) + } + async fn get_all_conversations( &self, limit: Option, diff --git a/crates/paws_repo/src/database/migrations/2026-02-09-000000_add_conversation_state/down.sql b/crates/paws_repo/src/database/migrations/2026-02-09-000000_add_conversation_state/down.sql new file mode 100644 index 00000000..f9e7b224 --- /dev/null +++ b/crates/paws_repo/src/database/migrations/2026-02-09-000000_add_conversation_state/down.sql @@ -0,0 +1,2 @@ +-- Remove state column from conversations table +ALTER TABLE conversations DROP COLUMN state; diff --git a/crates/paws_repo/src/database/migrations/2026-02-09-000000_add_conversation_state/up.sql b/crates/paws_repo/src/database/migrations/2026-02-09-000000_add_conversation_state/up.sql new file mode 100644 index 00000000..d73e3909 --- /dev/null +++ b/crates/paws_repo/src/database/migrations/2026-02-09-000000_add_conversation_state/up.sql @@ -0,0 +1,2 @@ +-- Add state column to conversations table +ALTER TABLE conversations ADD COLUMN state TEXT NOT NULL DEFAULT 'idle'; diff --git a/crates/paws_repo/src/paws_repo.rs b/crates/paws_repo/src/paws_repo.rs index 240eabb6..345b39f5 100644 --- a/crates/paws_repo/src/paws_repo.rs +++ b/crates/paws_repo/src/paws_repo.rs @@ -117,6 +117,15 @@ impl ConversationRepository for PawsRepo { .await } + async fn get_all_conversation_summaries( + &self, + limit: Option, + ) -> anyhow::Result>> { + self.conversation_repository + .get_all_conversation_summaries(limit) + .await + } + async fn get_last_conversation(&self) -> anyhow::Result> { self.conversation_repository.get_last_conversation().await } diff --git a/crates/paws_server/Cargo.toml b/crates/paws_server/Cargo.toml new file mode 100644 index 00000000..d6d9d957 --- /dev/null +++ b/crates/paws_server/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "paws_server" +version = "0.1.0" +edition = "2024" + +[dependencies] +axum = { version = "0.7", features = ["macros"] } +tokio = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +anyhow = { workspace = true } +tower-http = { version = "0.5", features = ["cors", "trace"] } +tracing = { workspace = true } +futures = { workspace = true } +chrono = { workspace = true } +uuid = { workspace = true } + +paws_api = { workspace = true } +paws_domain = { workspace = true } +paws_app = { workspace = true } + +tokio-stream = { workspace = true, features = ["sync"] } + +[dev-dependencies] +pretty_assertions = { workspace = true } diff --git a/crates/paws_server/src/error.rs b/crates/paws_server/src/error.rs new file mode 100644 index 00000000..02a045e9 --- /dev/null +++ b/crates/paws_server/src/error.rs @@ -0,0 +1,119 @@ +//! Error handling for the HTTP API. + +use axum::Json; +use axum::http::StatusCode; +use axum::response::{IntoResponse, Response}; +use serde::Serialize; + +/// Standard error response format. +#[derive(Debug, Serialize)] +pub struct ErrorResponse { + pub error: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub details: Option, +} + +/// Application error type with proper HTTP status codes. +#[derive(Debug)] +pub struct AppError { + pub status: StatusCode, + pub message: String, + pub details: Option, +} + +impl AppError { + /// Creates a new error with the given status and message. + pub fn new(status: StatusCode, message: impl Into) -> Self { + Self { status, message: message.into(), details: None } + } + + /// Creates a bad request error (400). + pub fn bad_request(message: impl Into) -> Self { + Self::new(StatusCode::BAD_REQUEST, message) + } + + /// Creates a not found error (404). + pub fn not_found(message: impl Into) -> Self { + Self::new(StatusCode::NOT_FOUND, message) + } + + /// Creates an internal server error (500). + pub fn internal(message: impl Into) -> Self { + Self::new(StatusCode::INTERNAL_SERVER_ERROR, message) + } + + /// Creates a conflict error (409). + pub fn conflict(message: impl Into) -> Self { + Self::new(StatusCode::CONFLICT, message) + } + + /// Adds additional details to the error. + pub fn with_details(mut self, details: impl Into) -> Self { + self.details = Some(details.into()); + self + } +} + +impl IntoResponse for AppError { + fn into_response(self) -> Response { + let body = ErrorResponse { error: self.message, details: self.details }; + (self.status, Json(body)).into_response() + } +} + +impl From for AppError +where + E: Into, +{ + fn from(err: E) -> Self { + let err = err.into(); + Self::internal(err.to_string()) + } +} + +#[cfg(test)] +mod tests { + use pretty_assertions::assert_eq; + + use super::*; + + #[test] + fn test_error_response_serialization() { + let fixture = ErrorResponse { error: "Something went wrong".to_string(), details: None }; + let actual = serde_json::to_string(&fixture).unwrap(); + assert!(actual.contains("Something went wrong")); + assert!(!actual.contains("details")); + } + + #[test] + fn test_error_response_with_details() { + let fixture = ErrorResponse { + error: "Not found".to_string(), + details: Some("Resource ID: 123".to_string()), + }; + let actual = serde_json::to_string(&fixture).unwrap(); + assert!(actual.contains("Not found")); + assert!(actual.contains("Resource ID: 123")); + } + + #[test] + fn test_app_error_bad_request() { + let actual = AppError::bad_request("Invalid input"); + assert_eq!(actual.status, StatusCode::BAD_REQUEST); + assert_eq!(actual.message, "Invalid input"); + } + + #[test] + fn test_app_error_not_found() { + let actual = AppError::not_found("Resource not found"); + assert_eq!(actual.status, StatusCode::NOT_FOUND); + assert_eq!(actual.message, "Resource not found"); + } + + #[test] + fn test_app_error_with_details() { + let actual = + AppError::bad_request("Invalid input").with_details("Field 'name' is required"); + assert_eq!(actual.details, Some("Field 'name' is required".to_string())); + } +} diff --git a/crates/paws_server/src/events.rs b/crates/paws_server/src/events.rs new file mode 100644 index 00000000..f201565c --- /dev/null +++ b/crates/paws_server/src/events.rs @@ -0,0 +1,210 @@ +//! Event broadcasting and storage for real-time updates. + +use std::collections::HashMap; + +use chrono::{DateTime, Utc}; +use paws_domain::ChatResponse; +use serde::{Deserialize, Serialize}; +use tokio::sync::{RwLock, broadcast}; + +use crate::task::TaskId; + +/// Events emitted during task execution. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum TaskEvent { + /// Task started processing. + Started { timestamp: DateTime }, + /// Agent message/response chunk. + Message { + content: ChatResponse, + #[serde(skip_serializing_if = "Option::is_none")] + sequence: Option, + }, + /// Tool execution notification (info only). + ToolExecution { + tool: String, + status: ToolExecutionStatus, + timestamp: DateTime, + }, + /// Transient error (non-fatal). + Error { + message: String, + timestamp: DateTime, + }, + /// Task completed successfully. + Completed { timestamp: DateTime }, + /// Task failed with error. + Failed { + error: String, + timestamp: DateTime, + }, + /// Task was cancelled. + Cancelled { timestamp: DateTime }, +} + +impl TaskEvent { + /// Creates a new Started event. + pub fn started() -> Self { + Self::Started { timestamp: Utc::now() } + } + + /// Creates a new Message event. + pub fn message(content: ChatResponse) -> Self { + Self::Message { content, sequence: None } + } + + /// Creates a new ToolExecution event. + pub fn tool_execution(tool: String, status: ToolExecutionStatus) -> Self { + Self::ToolExecution { tool, status, timestamp: Utc::now() } + } + + /// Creates a new Error event. + pub fn error(message: String) -> Self { + Self::Error { message, timestamp: Utc::now() } + } + + /// Creates a new Completed event. + pub fn completed() -> Self { + Self::Completed { timestamp: Utc::now() } + } + + /// Creates a new Failed event. + pub fn failed(error: String) -> Self { + Self::Failed { error, timestamp: Utc::now() } + } + + /// Creates a new Cancelled event. + pub fn cancelled() -> Self { + Self::Cancelled { timestamp: Utc::now() } + } +} + +/// Status of a tool execution. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum ToolExecutionStatus { + Started, + Completed, + Failed, +} + +/// Handle for receiving task events. +pub type EventReceiver = broadcast::Receiver; + +/// Broadcasts events to multiple subscribers. +/// +/// Uses a broadcast channel per task for efficient fan-out to multiple +/// SSE clients. +#[derive(Debug)] +pub struct EventBroadcaster { + /// Broadcast channels per task. + channels: RwLock>>, + /// Channel capacity for broadcast channels. + capacity: usize, +} + +impl EventBroadcaster { + /// Creates a new event broadcaster. + pub fn new() -> Self { + Self { channels: RwLock::new(HashMap::new()), capacity: 256 } + } + + /// Creates a broadcaster with custom capacity. + pub fn with_capacity(capacity: usize) -> Self { + Self { channels: RwLock::new(HashMap::new()), capacity } + } + + /// Subscribes to events for a task. + /// + /// Creates a new broadcast channel if one doesn't exist. + pub async fn subscribe(&self, task_id: TaskId) -> EventReceiver { + let mut channels = self.channels.write().await; + + let sender = channels + .entry(task_id) + .or_insert_with(|| broadcast::channel(self.capacity).0); + + sender.subscribe() + } + + /// Ensures a broadcast channel exists for a task. + /// + /// This is useful to create the channel before any events are broadcast, + /// so that subsequent subscribers can receive events from the buffer. + pub async fn ensure_channel(&self, task_id: TaskId) { + let mut channels = self.channels.write().await; + channels + .entry(task_id) + .or_insert_with(|| broadcast::channel(self.capacity).0); + } + + /// Broadcasts an event to all subscribers of a task. + pub async fn broadcast(&self, task_id: TaskId, event: TaskEvent) { + // Broadcast to subscribers + let channels = self.channels.read().await; + if let Some(sender) = channels.get(&task_id) { + // Ignore send errors (no subscribers) + let _ = sender.send(event); + } + } + + /// Cleans up resources for a completed task. + pub async fn cleanup(&self, task_id: TaskId) { + self.channels.write().await.remove(&task_id); + } +} + +impl Default for EventBroadcaster { + fn default() -> Self { + Self::new() + } +} + +/// Persistent log of events for reconnection support. +#[derive(Debug, Default)] +pub struct EventLog { + events: RwLock>>, +} + +impl EventLog { + /// Creates a new empty event log. + pub fn new() -> Self { + Self::default() + } + + /// Appends an event to the log. + pub async fn append(&self, task_id: TaskId, event: TaskEvent) { + self.events + .write() + .await + .entry(task_id) + .or_default() + .push(event); + } +} + +#[cfg(test)] +mod tests { + + use super::*; + + #[test] + fn test_task_event_serialization() { + let fixture = TaskEvent::started(); + let json = serde_json::to_string(&fixture).unwrap(); + assert!(json.contains("started")); + } + + #[tokio::test] + async fn test_event_log() { + let log = EventLog::new(); + let task_id = TaskId::new(); + + log.append(task_id, TaskEvent::started()).await; + // The rest of this test relied on methods we just removed + // since EventLog is now just a helper struct if used at all + // or we can remove EventLog entirely if it's not used by Broadcaster + // anymore. + } +} diff --git a/crates/paws_server/src/handlers/config.rs b/crates/paws_server/src/handlers/config.rs new file mode 100644 index 00000000..75ef0500 --- /dev/null +++ b/crates/paws_server/src/handlers/config.rs @@ -0,0 +1,377 @@ +//! Configuration and resource HTTP handlers (read-only for UI). + +use axum::Json; +use axum::extract::{Path, Query, State}; +use axum::http::StatusCode; +use axum::response::IntoResponse; +use paws_domain::{AgentId, ModelId, ProviderId}; +use serde::Deserialize; + +use crate::AppError; +use crate::server::AppState; + +// ============================================================================= +// Health & Environment +// ============================================================================= + +/// Health check endpoint. +pub async fn health() -> &'static str { + "OK" +} + +/// Gets environment information. +/// +/// GET /api/env +pub async fn get_env(State(state): State) -> Result { + Ok(Json(state.api.environment())) +} + +// ============================================================================= +// Resources (Read-Only) +// ============================================================================= + +/// Lists available tools. +/// +/// GET /api/tools +pub async fn list_tools(State(state): State) -> Result { + let tools = state.api.get_tools().await?; + Ok(Json(tools)) +} + +/// Query parameters for listing tools by agent. +#[derive(Debug, Deserialize)] +#[allow(dead_code)] +pub struct ListToolsQuery { + pub agent_id: Option, +} + +/// Lists available models for a specific provider. +/// +/// GET /api/providers/:id/models +pub async fn list_provider_models( + State(state): State, + Path(id): Path, +) -> Result { + let provider_id = ProviderId::from(id); + let models = state.api.get_provider_models(&provider_id).await?; + Ok(Json(models)) +} + +/// Lists available models. +/// +/// GET /api/models +pub async fn list_models(State(state): State) -> Result { + let models = state.api.get_models().await?; + Ok(Json(models)) +} + +/// Lists available agents. +/// +/// GET /api/agents +pub async fn list_agents(State(state): State) -> Result { + let agents = state.api.get_agents().await?; + Ok(Json(agents)) +} + +/// Lists available providers. +/// +/// GET /api/providers +pub async fn list_providers(State(state): State) -> Result { + let providers = state.api.get_providers().await?; + Ok(Json(providers)) +} + +/// Gets a specific provider. +/// +/// GET /api/providers/:id +pub async fn get_provider( + State(state): State, + Path(id): Path, +) -> Result { + let provider = state.api.get_provider(&ProviderId::from(id)).await?; + Ok(Json(provider)) +} + +/// Lists available skills. +/// +/// GET /api/skills +pub async fn list_skills(State(state): State) -> Result { + let skills = state.api.get_skills().await?; + Ok(Json(skills)) +} + +/// Lists available custom commands. +/// +/// GET /api/commands +pub async fn list_commands(State(state): State) -> Result { + let commands = state.api.get_commands().await?; + Ok(Json(commands)) +} + +/// Gets the workflow configuration. +/// +/// GET /api/workflow +pub async fn get_workflow(State(state): State) -> Result { + let workflow = state.api.read_merged(None).await?; + Ok(Json(workflow)) +} + +// ============================================================================= +// Configuration +// ============================================================================= + +/// Gets the default provider. +/// +/// GET /api/config/default-provider +pub async fn get_default_provider( + State(state): State, +) -> Result { + let provider = state.api.get_default_provider().await?; + Ok(Json(provider)) +} + +/// Request to set the default provider. +#[derive(Debug, Deserialize)] +pub struct SetProviderRequest { + pub provider_id: ProviderId, +} + +/// Sets the default provider. +/// +/// POST /api/config/default-provider +pub async fn set_default_provider( + State(state): State, + Json(request): Json, +) -> Result { + state.api.set_default_provider(request.provider_id).await?; + Ok(StatusCode::OK) +} + +/// Gets the default model. +/// +/// GET /api/config/default-model +pub async fn get_default_model( + State(state): State, +) -> Result { + let model_id = state.api.get_default_model().await; + Ok(Json(model_id)) +} + +/// Request to set the default model. +#[derive(Debug, Deserialize)] +pub struct SetModelRequest { + pub model_id: ModelId, +} + +/// Sets the default model. +/// +/// POST /api/config/default-model +pub async fn set_default_model( + State(state): State, + Json(request): Json, +) -> Result { + state.api.set_default_model(request.model_id).await?; + Ok(StatusCode::OK) +} + +/// Gets the active agent. +/// +/// GET /api/config/active-agent +pub async fn get_active_agent( + State(state): State, +) -> Result { + let agent_id = state.api.get_active_agent().await; + Ok(Json(agent_id)) +} + +/// Request to set the active agent. +#[derive(Debug, Deserialize)] +pub struct SetAgentRequest { + pub agent_id: AgentId, +} + +/// Sets the active agent. +/// +/// POST /api/config/active-agent +pub async fn set_active_agent( + State(state): State, + Json(request): Json, +) -> Result { + state.api.set_active_agent(request.agent_id).await?; + Ok(StatusCode::OK) +} + +// ============================================================================= +// MCP Configuration +// ============================================================================= + +/// Query parameters for MCP config. +#[derive(Debug, Deserialize)] +pub struct McpConfigQuery { + pub scope: Option, +} + +/// Gets MCP configuration. +/// +/// GET /api/mcp/config +pub async fn get_mcp_config( + State(state): State, + Query(query): Query, +) -> Result { + let config = state.api.read_mcp_config(query.scope.as_ref()).await?; + Ok(Json(config)) +} + +/// Request to write MCP config. +#[derive(Debug, Deserialize)] +pub struct WriteMcpConfigRequest { + pub scope: paws_domain::Scope, + pub config: paws_domain::McpConfig, +} + +/// Writes MCP configuration. +/// +/// POST /api/mcp/config +pub async fn write_mcp_config( + State(state): State, + Json(request): Json, +) -> Result { + state + .api + .write_mcp_config(&request.scope, &request.config) + .await?; + Ok(StatusCode::OK) +} + +/// Reloads MCP servers. +/// +/// POST /api/mcp/reload +pub async fn reload_mcp(State(state): State) -> Result { + state.api.reload_mcp().await?; + Ok(StatusCode::OK) +} + +// ============================================================================= +// Authentication +// ============================================================================= + +/// Request to initiate provider authentication. +#[derive(Debug, Deserialize)] +pub struct InitAuthRequest { + pub provider_id: ProviderId, + pub method: paws_domain::AuthMethod, +} + +/// Initiates provider authentication. +/// +/// POST /api/auth/init +pub async fn init_auth( + State(state): State, + Json(request): Json, +) -> Result { + let context = state + .api + .init_provider_auth(request.provider_id, request.method) + .await?; + Ok(Json(context)) +} + +/// Request to complete authentication. +#[derive(Debug, Deserialize)] +pub struct CompleteAuthRequest { + pub provider_id: ProviderId, + pub context: paws_domain::AuthContextResponse, + #[serde(default = "default_timeout")] + pub timeout_secs: u64, +} + +fn default_timeout() -> u64 { + 60 +} + +/// Completes provider authentication. +/// +/// POST /api/auth/complete +pub async fn complete_auth( + State(state): State, + Json(request): Json, +) -> Result { + let timeout = std::time::Duration::from_secs(request.timeout_secs); + state + .api + .complete_provider_auth(request.provider_id, request.context, timeout) + .await?; + Ok(StatusCode::OK) +} + +/// Request to logout. +#[derive(Debug, Deserialize)] +pub struct LogoutRequest { + pub provider_id: Option, +} + +/// Logs out from a provider or all providers. +/// +/// POST /api/auth/logout +pub async fn logout( + State(state): State, + Json(request): Json, +) -> Result { + if let Some(provider_id) = request.provider_id { + state.api.remove_provider(&provider_id).await?; + } else { + state.api.logout().await?; + } + Ok(StatusCode::OK) +} + +/// Gets user information. +/// +/// GET /api/auth/user +pub async fn get_user_info(State(state): State) -> Result { + let info = state.api.user_info().await?; + Ok(Json(info)) +} + +/// Gets user usage statistics. +/// +/// GET /api/auth/usage +pub async fn get_user_usage(State(state): State) -> Result { + let usage = state.api.user_usage().await?; + Ok(Json(usage)) +} + +// ============================================================================= +// Platform Authentication +// ============================================================================= + +/// Initiates platform login. +/// +/// POST /api/platform/auth/init +pub async fn platform_init_login( + State(state): State, +) -> Result { + let auth = state.api.init_login().await?; + Ok(Json(auth)) +} + +/// Completes platform login. +/// +/// POST /api/platform/auth/login +pub async fn platform_login( + State(state): State, + Json(auth): Json, +) -> Result { + state.api.login(&auth).await?; + Ok(StatusCode::OK) +} + +/// Gets platform user info. +/// +/// GET /api/platform/auth/info +pub async fn platform_user_info( + State(state): State, +) -> Result { + let info = state.api.get_login_info().await?; + Ok(Json(info)) +} diff --git a/crates/paws_server/src/handlers/conversations.rs b/crates/paws_server/src/handlers/conversations.rs new file mode 100644 index 00000000..2ab02827 --- /dev/null +++ b/crates/paws_server/src/handlers/conversations.rs @@ -0,0 +1,163 @@ +//! Conversation-related HTTP handlers. + +use axum::Json; +use axum::extract::{Path, Query, State}; +use axum::http::StatusCode; +use axum::response::{IntoResponse, Response}; +use paws_domain::{Conversation, ConversationId}; +use serde::{Deserialize, Serialize}; + +use crate::AppError; +use crate::server::AppState; + +/// Query parameters for listing conversations. +#[derive(Debug, Deserialize)] +pub struct ListConversationsQuery { + pub limit: Option, +} + +/// Lists all conversations. +/// +/// GET /api/conversations +pub async fn list_conversations( + State(state): State, + Query(query): Query, +) -> Result { + let conversations = state.api.get_conversations(query.limit).await?; + Ok(Json(conversations)) +} + +/// Lists conversation summaries (lightweight, no context). +/// +/// GET /api/conversations/summaries +pub async fn list_conversation_summaries( + State(state): State, + Query(query): Query, +) -> Result { + let summaries = state.api.get_conversation_summaries(query.limit).await?; + Ok(Json(summaries)) +} + +/// Request body for creating a new conversation. +#[derive(Debug, Deserialize)] +pub struct CreateConversationRequest { + pub id: ConversationId, + #[serde(default)] + pub title: Option, +} + +/// Response for conversation creation. +#[derive(Debug, Serialize)] +pub struct CreateConversationResponse { + pub id: ConversationId, + pub title: Option, + pub created_at: chrono::DateTime, +} + +/// Creates a new conversation. +/// +/// POST /api/conversations +pub async fn create_conversation( + State(state): State, + Json(request): Json, +) -> Result { + tracing::info!( + conversation_id = %request.id, + title = ?request.title, + "Creating conversation" + ); + + let conversation = Conversation::new(request.id).title(request.title.clone()); + let created_at = conversation.metadata.created_at; + let id = conversation.id; + + state.api.upsert_conversation(conversation).await?; + + let response = CreateConversationResponse { id, title: request.title, created_at }; + + Ok((StatusCode::CREATED, Json(response))) +} + +/// Gets a specific conversation. +/// +/// GET /api/conversations/:id +pub async fn get_conversation( + State(state): State, + Path(id): Path, +) -> Result { + let conversation = state.api.conversation(&id).await?; + match conversation { + Some(c) => Ok(Json(c).into_response()), + None => Err(AppError::not_found(format!( + "Conversation not found: {}", + id + ))), + } +} + +/// Updates a conversation. +/// +/// PUT /api/conversations/:id +pub async fn update_conversation( + State(state): State, + Path(id): Path, + Json(conversation): Json, +) -> Result { + if id != conversation.id { + return Err(AppError::bad_request( + "Conversation ID in path does not match body", + )); + } + + tracing::info!( + conversation_id = %conversation.id, + title = ?conversation.title, + "Updating conversation" + ); + + state.api.upsert_conversation(conversation).await?; + Ok(StatusCode::OK) +} + +/// Deletes a conversation. +/// +/// DELETE /api/conversations/:id +pub async fn delete_conversation( + State(state): State, + Path(id): Path, +) -> Result { + state.api.delete_conversation(&id).await?; + Ok(StatusCode::NO_CONTENT) +} + +/// Compacts a conversation to reduce token usage. +/// +/// POST /api/conversations/:id/compact +pub async fn compact_conversation( + State(state): State, + Path(id): Path, +) -> Result { + let result = state.api.compact_conversation(&id).await?; + Ok(Json(result)) +} + +#[cfg(test)] +mod tests { + use pretty_assertions::assert_eq; + + use super::*; + + #[test] + fn test_create_conversation_request_deserialization() { + let json = r#"{"id": "550e8400-e29b-41d4-a716-446655440000"}"#; + let actual: CreateConversationRequest = serde_json::from_str(json).unwrap(); + assert!(actual.title.is_none()); + } + + #[test] + fn test_create_conversation_request_with_title() { + let json = r#"{"id": "550e8400-e29b-41d4-a716-446655440000", "title": "My Chat"}"#; + let actual: CreateConversationRequest = serde_json::from_str(json).unwrap(); + assert_eq!(actual.title, Some("My Chat".to_string())); + } +} diff --git a/crates/paws_server/src/handlers/files.rs b/crates/paws_server/src/handlers/files.rs new file mode 100644 index 00000000..96c6dcfe --- /dev/null +++ b/crates/paws_server/src/handlers/files.rs @@ -0,0 +1,41 @@ +//! File related HTTP handlers. + +use axum::Json; +use axum::extract::{Query, State}; +use axum::response::IntoResponse; +use serde::Deserialize; + +use crate::AppError; +use crate::server::AppState; + +/// Query parameters for reading a file. +#[derive(Debug, Deserialize)] +#[allow(dead_code)] +pub struct ReadFileQuery { + pub path: Option, +} + +/// Lists files in a directory. +/// +/// GET /api/files +pub async fn list_files( + State(state): State, + Query(query): Query, +) -> Result { + let path = query.path.map(std::path::PathBuf::from); + let files = state.api.discover().await?; + // Filter by path if provided + let filtered = if let Some(ref path) = path { + let path_str = path.to_string_lossy().to_string(); + files + .into_iter() + .filter(|f| f.path.starts_with(&path_str)) + .collect() + } else { + files + }; + Ok(Json(filtered)) +} + +// Note: File read/write operations are handled internally by the orchestrator +// and are not exposed via the API for security reasons. diff --git a/crates/paws_server/src/handlers/git.rs b/crates/paws_server/src/handlers/git.rs new file mode 100644 index 00000000..6512dc74 --- /dev/null +++ b/crates/paws_server/src/handlers/git.rs @@ -0,0 +1,89 @@ +//! Git related handlers. + +use std::path::PathBuf; + +use axum::Json; +use axum::extract::State; +use axum::response::IntoResponse; +use serde::Serialize; + +use crate::AppError; +use crate::server::AppState; + +/// Response for git status/diff. +#[derive(Debug, Serialize)] +pub struct GitDiffResponse { + pub diff: String, +} + +/// Gets the current git diff. +/// +/// GET /api/git/diff +pub async fn get_git_diff(State(state): State) -> Result { + // We'll execute "git diff" and "git diff --staged" to get all changes. + // For now, let's just get "git diff HEAD" to see everything against the last + // commit. + + // Note: We are assuming the current working directory is the git root or inside + // it. In a real agent scenario, we might need to know the workspace path. + // For this MVP, we use the current directory. + + let output = state + .api + .execute_shell_command("git diff HEAD", PathBuf::from(".")) + .await?; + + Ok(Json(GitDiffResponse { diff: output.stdout })) +} + +/// Response for git status (simplified). +#[derive(Debug, Serialize)] +pub struct GitStatusResponse { + pub status: String, +} + +/// Gets the current git status. +/// +/// GET /api/git/status +pub async fn get_git_status(State(state): State) -> Result { + let output = state + .api + .execute_shell_command("git status --porcelain", PathBuf::from(".")) + .await?; + + Ok(Json(GitStatusResponse { status: output.stdout })) +} + +use serde::Deserialize; + +/// Request for committing changes. +#[derive(Debug, Deserialize)] +pub struct CommitRequest { + pub message: String, +} + +/// Commits changes to git. +/// +/// POST /api/git/commit +pub async fn commit_changes( + State(state): State, + Json(request): Json, +) -> Result { + // First add all changes + state + .api + .execute_shell_command("git add .", PathBuf::from(".")) + .await?; + + // Then commit + // Escape quotes in message to prevent shell injection/breaking + let escaped_message = request.message.replace('"', "\\\""); + let cmd = format!("git commit -m \"{}\"", escaped_message); + + let output = state + .api + .execute_shell_command(&cmd, PathBuf::from(".")) + .await?; + + Ok(Json(GitStatusResponse { status: output.stdout })) +} diff --git a/crates/paws_server/src/handlers/mod.rs b/crates/paws_server/src/handlers/mod.rs new file mode 100644 index 00000000..fac8be07 --- /dev/null +++ b/crates/paws_server/src/handlers/mod.rs @@ -0,0 +1,17 @@ +//! HTTP handlers for the task-based API. + +mod config; +mod conversations; +mod files; +mod git; +mod sse; +mod tasks; +mod utils; + +pub use config::*; +pub use conversations::*; +pub use files::*; +pub use git::*; +pub use sse::*; +pub use tasks::*; +pub use utils::*; diff --git a/crates/paws_server/src/handlers/sse.rs b/crates/paws_server/src/handlers/sse.rs new file mode 100644 index 00000000..0683c262 --- /dev/null +++ b/crates/paws_server/src/handlers/sse.rs @@ -0,0 +1,130 @@ +//! Server-Sent Events streaming handlers. + +use axum::extract::{Path, State}; +use axum::response::IntoResponse; +use axum::response::sse::{Event, KeepAlive, Sse}; +use futures::stream::StreamExt; +use tokio_stream::wrappers::BroadcastStream; + +use super::parse_task_id; +use crate::AppError; +use crate::server::AppState; + +/// Streams task events via Server-Sent Events. +/// +/// GET /api/tasks/{id}/stream +/// +/// This endpoint supports reconnection. Clients can reconnect and receive +/// missed events by using the `/api/tasks/{id}/events/since` endpoint first. +pub async fn stream_task_events( + State(state): State, + Path(id): Path, +) -> Result { + let task_id = parse_task_id(&id)?; + + // Verify task exists + let task = state + .task_store + .get_task(task_id) + .await + .ok_or_else(|| AppError::not_found(format!("Task not found: {}. Please verify the task ID or create a new task using POST /api/tasks", task_id)))?; + + // If task is already complete, return final events + if task.status.is_terminal() { + let events = state.task_store.get_events(task_id).await; + let stream = futures::stream::iter( + events + .into_iter() + .filter_map(|e| Some(Ok::<_, axum::Error>(Event::default().json_data(e).ok()?))), + ); + return Ok(Sse::new(stream) + .keep_alive(KeepAlive::default()) + .into_response()); + } + + // Get any events that were already stored (e.g., started event) + let stored_events = state.task_store.get_events(task_id).await; + + // Subscribe to live events + let receiver = state.broadcaster.subscribe(task_id).await; + let live_stream = BroadcastStream::new(receiver); + + // First yield stored events, then live events + let stored_event_stream = futures::stream::iter( + stored_events + .into_iter() + .filter_map(|e| Some(Ok::<_, axum::Error>(Event::default().json_data(e).ok()?))), + ); + + let live_sse_stream = live_stream.filter_map(|result| async move { + match result { + Ok(event) => { + let json = serde_json::to_string(&event).ok()?; + Some(Ok::<_, axum::Error>(Event::default().data(json))) + } + Err(e) => { + tracing::warn!("SSE stream error: {}", e); + None + } + } + }); + + let combined_stream = stored_event_stream.chain(live_sse_stream); + + Ok(Sse::new(combined_stream) + .keep_alive(KeepAlive::default()) + .into_response()) +} + +/// Query parameters for resumable streaming. +#[derive(Debug, serde::Deserialize)] +pub struct StreamSinceQuery { + /// Resume from this event index. + pub since: Option, +} + +/// Streams task events with reconnection support. +/// +/// GET /api/tasks/{id}/stream/resumable +/// +/// First sends any missed events since `since` index, then streams live events. +pub async fn stream_task_events_resumable( + State(state): State, + Path(id): Path, + axum::extract::Query(query): axum::extract::Query, +) -> Result { + let task_id = parse_task_id(&id)?; + + let since = query.since.unwrap_or(0); + + // Get missed events first + let missed_events = state.task_store.get_events_since(task_id, since).await; + + // Subscribe to live events + let receiver = state.broadcaster.subscribe(task_id).await; + let live_stream = BroadcastStream::new(receiver); + + // First yield missed events, then live events + let missed_stream = futures::stream::iter( + missed_events + .into_iter() + .filter_map(|e| Some(Ok::<_, axum::Error>(Event::default().json_data(e).ok()?))), + ); + + let live_sse_stream = live_stream.filter_map(|result| async move { + match result { + Ok(event) => { + let json = serde_json::to_string(&event).ok()?; + Some(Ok::<_, axum::Error>(Event::default().data(json))) + } + Err(e) => { + tracing::warn!("SSE stream error: {}", e); + None + } + } + }); + + let combined_stream = missed_stream.chain(live_sse_stream); + + Ok(Sse::new(combined_stream).keep_alive(KeepAlive::default())) +} diff --git a/crates/paws_server/src/handlers/tasks.rs b/crates/paws_server/src/handlers/tasks.rs new file mode 100644 index 00000000..cfce8862 --- /dev/null +++ b/crates/paws_server/src/handlers/tasks.rs @@ -0,0 +1,179 @@ +//! Task-related HTTP handlers. + +use axum::Json; +use axum::extract::{Path, State}; +use axum::http::StatusCode; +use axum::response::IntoResponse; +use paws_domain::ConversationId; +use serde::{Deserialize, Serialize}; + +use super::parse_task_id; +use crate::AppError; +use crate::server::AppState; +use crate::task::TaskId; + +/// Request to create a new task. +#[derive(Debug, Deserialize)] +pub struct CreateTaskRequest { + /// Conversation to add the message to. + pub conversation_id: ConversationId, + /// The user's message. + pub message: String, + /// Optional agent to use. + #[serde(default)] + pub agent_id: Option, + /// Optional file attachments. + #[serde(default)] + pub attachments: Vec, +} + +/// Response for task creation. +#[derive(Debug, Serialize)] +pub struct CreateTaskResponse { + pub task_id: TaskId, + pub conversation_id: ConversationId, +} + +/// Submits a new task for execution. +/// +/// POST /api/tasks +pub async fn create_task( + State(state): State, + Json(request): Json, +) -> Result { + tracing::info!( + conversation_id = %request.conversation_id, + agent_id = ?request.agent_id, + "Creating task" + ); + + let task_id = state + .task_manager + .submit( + request.conversation_id, + request.message, + request.agent_id, + request.attachments, + ) + .await?; + + let response = CreateTaskResponse { task_id, conversation_id: request.conversation_id }; + + Ok((StatusCode::ACCEPTED, Json(response))) +} + +/// Query parameters for listing tasks. +#[derive(Debug, Deserialize)] +pub struct ListTasksQuery { + /// Filter by conversation. + pub conversation_id: Option, +} + +/// Lists tasks, optionally filtered by conversation. +/// +/// GET /api/tasks +pub async fn list_tasks( + State(state): State, + axum::extract::Query(query): axum::extract::Query, +) -> Result { + let tasks = state.task_manager.list_tasks(query.conversation_id).await; + Ok(Json(tasks)) +} + +/// Gets a specific task by ID. +/// +/// GET /api/tasks/{id} +pub async fn get_task( + State(state): State, + Path(id): Path, +) -> Result { + let task_id = parse_task_id(&id)?; + + let task = state + .task_manager + .get_task(task_id) + .await + .ok_or_else(|| AppError::not_found(format!("Task not found: {}. Please verify the task ID or create a new task using POST /api/tasks", task_id)))?; + + Ok(Json(task)) +} + +/// Cancels a running task. +/// +/// POST /api/tasks/{id}/cancel +pub async fn cancel_task( + State(state): State, + Path(id): Path, +) -> Result { + let task_id = parse_task_id(&id)?; + + state.task_manager.cancel(task_id).await?; + Ok(StatusCode::OK) +} + +/// Gets all events for a task (for reconnection). +/// +/// GET /api/tasks/{id}/events +pub async fn get_task_events( + State(state): State, + Path(id): Path, +) -> Result { + let task_id = parse_task_id(&id)?; + + let events = state.task_manager.get_events(task_id).await; + Ok(Json(events)) +} + +/// Query parameters for getting events since an index. +#[derive(Debug, Deserialize)] +pub struct EventsSinceQuery { + /// Get events starting from this index. + pub since: Option, +} + +/// Gets events for a task since a specific index. +/// +/// GET /api/tasks/{id}/events/since +pub async fn get_task_events_since( + State(state): State, + Path(id): Path, + axum::extract::Query(query): axum::extract::Query, +) -> Result { + let task_id = parse_task_id(&id)?; + + let since = query.since.unwrap_or(0); + let events = state.task_manager.get_events_since(task_id, since).await; + Ok(Json(events)) +} + +#[cfg(test)] +mod tests { + use pretty_assertions::assert_eq; + + use super::*; + + #[test] + fn test_create_task_request_deserialization() { + let json = r#"{ + "conversation_id": "550e8400-e29b-41d4-a716-446655440000", + "message": "Hello, world!" + }"#; + + let actual: CreateTaskRequest = serde_json::from_str(json).unwrap(); + assert_eq!(actual.message, "Hello, world!"); + assert!(actual.agent_id.is_none()); + assert!(actual.attachments.is_empty()); + } + + #[test] + fn test_create_task_request_with_agent() { + let json = r#"{ + "conversation_id": "550e8400-e29b-41d4-a716-446655440000", + "message": "Hello!", + "agent_id": "paws" + }"#; + + let actual: CreateTaskRequest = serde_json::from_str(json).unwrap(); + assert_eq!(actual.agent_id, Some(paws_domain::AgentId::new("paws"))); + } +} diff --git a/crates/paws_server/src/handlers/utils.rs b/crates/paws_server/src/handlers/utils.rs new file mode 100644 index 00000000..fbec4536 --- /dev/null +++ b/crates/paws_server/src/handlers/utils.rs @@ -0,0 +1,59 @@ +//! Shared utilities for HTTP handlers. + +use crate::AppError; +use crate::task::TaskId; + +/// Parses and validates a task ID from a string. +/// +/// # Errors +/// +/// Returns an error if the task ID is "undefined", empty, or not a valid UUID. +pub fn parse_task_id(id: &str) -> Result { + // Validate task ID before parsing + if id == "undefined" || id.is_empty() { + return Err(AppError::bad_request( + "Invalid task ID: task ID is undefined or empty. Please create a task first using POST /api/tasks", + )); + } + + id.parse().map_err(|e: uuid::Error| { + AppError::bad_request(format!( + "Invalid task ID '{}': {}. Task ID must be a valid UUID. Please create a task first using POST /api/tasks", + id, e + )) + }) +} + +#[cfg(test)] +mod tests { + + use super::*; + + #[test] + fn test_parse_task_id_valid() { + let fixture = "550e8400-e29b-41d4-a716-446655440000"; + let actual = parse_task_id(fixture); + assert!(actual.is_ok()); + } + + #[test] + fn test_parse_task_id_undefined() { + let fixture = "undefined"; + let actual = parse_task_id(fixture); + assert!(actual.is_err()); + } + + #[test] + fn test_parse_task_id_empty() { + let fixture = ""; + let actual = parse_task_id(fixture); + assert!(actual.is_err()); + } + + #[test] + fn test_parse_task_id_invalid_uuid() { + let fixture = "not-a-uuid"; + let actual = parse_task_id(fixture); + assert!(actual.is_err()); + } +} diff --git a/crates/paws_server/src/lib.rs b/crates/paws_server/src/lib.rs new file mode 100644 index 00000000..e684c870 --- /dev/null +++ b/crates/paws_server/src/lib.rs @@ -0,0 +1,15 @@ +//! Task-based server architecture for Paws. +//! +//! This module implements a server that runs the orchestration loop +//! server-side, allowing the frontend to be stateless and reconnectable. + +mod error; +mod events; +mod handlers; +mod server; +mod task; + +pub use error::{AppError, ErrorResponse}; +pub use events::{EventBroadcaster, EventLog, TaskEvent}; +pub use server::Server; +pub use task::{Task, TaskId, TaskManager, TaskStatus}; diff --git a/crates/paws_server/src/server.rs b/crates/paws_server/src/server.rs new file mode 100644 index 00000000..8b6e1fa1 --- /dev/null +++ b/crates/paws_server/src/server.rs @@ -0,0 +1,160 @@ +//! HTTP server setup and routing. + +use std::net::SocketAddr; +use std::sync::Arc; + +use axum::Router; +use axum::routing::{get, post}; +use paws_api::API; +use tower_http::cors::{Any, CorsLayer}; +use tower_http::trace::TraceLayer; + +use crate::events::EventBroadcaster; +use crate::handlers::*; +use crate::task::{TaskManager, TaskStore}; + +/// Combined application state shared across all handlers. +#[derive(Clone)] +pub struct AppState { + /// API instance for backend operations. + pub api: Arc, + /// Task manager for background orchestration. + pub task_manager: Arc, + /// Task store for persistence. + pub task_store: Arc, + /// Event broadcaster for SSE streaming. + pub broadcaster: Arc, +} + +/// HTTP server for the Paws API. +pub struct Server { + api: Arc, + port: u16, +} + +impl Server { + /// Creates a new server instance. + pub fn new(api: Arc, port: u16) -> Self { + Self { api, port } + } + + /// Runs the HTTP server. + pub async fn run(self) -> anyhow::Result<()> { + tracing::info!("Initializing server components..."); + + // Initialize shared components + let task_store = Arc::new(TaskStore::new()); + let broadcaster = Arc::new(EventBroadcaster::new()); + let task_manager = Arc::new(TaskManager::new( + task_store.clone(), + broadcaster.clone(), + self.api.clone(), + )); + + // Create unified state + let state = + Arc::new(AppState { api: self.api.clone(), task_manager, task_store, broadcaster }); + + // Build router with all routes + let app = Router::new() + // Health & Environment + .route("/api/health", get(health)) + .route("/api/env", get(get_env)) + // Resources (Read-Only) + .route("/api/files", get(list_files)) + .route("/api/tools", get(list_tools)) + .route("/api/models", get(list_models)) + .route("/api/agents", get(list_agents)) + .route("/api/providers", get(list_providers)) + .route("/api/providers/:id", get(get_provider)) + .route("/api/providers/:id/models", get(list_provider_models)) + .route("/api/skills", get(list_skills)) + .route("/api/commands", get(list_commands)) + .route("/api/workflow", get(get_workflow)) + // Tasks (Core API) + .route("/api/tasks", get(list_tasks).post(create_task)) + .route("/api/tasks/:id", get(get_task)) + .route("/api/tasks/:id/cancel", post(cancel_task)) + .route("/api/tasks/:id/events", get(get_task_events)) + .route("/api/tasks/:id/events/since", get(get_task_events_since)) + .route("/api/tasks/:id/stream", get(stream_task_events)) + .route( + "/api/tasks/:id/stream/resumable", + get(stream_task_events_resumable), + ) + // Git + .route("/api/git/diff", get(get_git_diff)) + .route("/api/git/status", get(get_git_status)) + .route("/api/git/commit", post(commit_changes)) + // Conversations + .route( + "/api/conversations", + get(list_conversations).post(create_conversation), + ) + .route( + "/api/conversations/summaries", + get(list_conversation_summaries), + ) + .route( + "/api/conversations/:id", + get(get_conversation) + .delete(delete_conversation) + .put(update_conversation), + ) + .route("/api/conversations/:id/compact", post(compact_conversation)) + // Configuration + .route( + "/api/config/default-provider", + get(get_default_provider).post(set_default_provider), + ) + .route( + "/api/config/default-model", + get(get_default_model).post(set_default_model), + ) + .route( + "/api/config/active-agent", + get(get_active_agent).post(set_active_agent), + ) + // MCP Configuration + .route( + "/api/mcp/config", + get(get_mcp_config).post(write_mcp_config), + ) + .route("/api/mcp/reload", post(reload_mcp)) + // Provider Authentication + .route("/api/auth/init", post(init_auth)) + .route("/api/auth/complete", post(complete_auth)) + .route("/api/auth/logout", post(logout)) + .route("/api/auth/user", get(get_user_info)) + .route("/api/auth/usage", get(get_user_usage)) + // Platform Authentication + .route("/api/platform/auth/init", post(platform_init_login)) + .route("/api/platform/auth/login", post(platform_login)) + .route("/api/platform/auth/info", get(platform_user_info)) + // Middleware + .layer( + CorsLayer::new() + .allow_origin(Any) + .allow_methods([ + axum::http::Method::GET, + axum::http::Method::POST, + axum::http::Method::PUT, + axum::http::Method::DELETE, + axum::http::Method::OPTIONS, + ]) + .allow_headers(Any) + .allow_credentials(false), + ) + .layer(TraceLayer::new_for_http()) + .with_state((*state).clone()); + + let addr = SocketAddr::from(([0, 0, 0, 0], self.port)); + tracing::info!("Server listening on {}", addr); + + let listener = tokio::net::TcpListener::bind(addr).await?; + tracing::info!("Server started successfully, accepting connections..."); + axum::serve(listener, app).await?; + + Ok(()) + } +} diff --git a/crates/paws_server/src/task/manager.rs b/crates/paws_server/src/task/manager.rs new file mode 100644 index 00000000..d49dab3b --- /dev/null +++ b/crates/paws_server/src/task/manager.rs @@ -0,0 +1,244 @@ +//! Task manager for spawning and tracking background tasks. + +use std::sync::Arc; + +use paws_api::API; +use paws_domain::{AgentId, ChatRequest, Conversation, ConversationId, Event, EventValue}; +use tracing::{error, info}; + +use super::store::{Task, TaskId, TaskStore}; +use crate::events::{EventBroadcaster, TaskEvent}; + +/// Manages task lifecycle and background execution. +pub struct TaskManager { + store: Arc, + broadcaster: Arc, + api: Arc, +} + +impl TaskManager { + /// Creates a new task manager. + pub fn new( + store: Arc, + broadcaster: Arc, + api: Arc, + ) -> Self { + Self { store, broadcaster, api } + } + + /// Submits a new task for execution. + /// + /// This creates the task, stores it, and spawns background execution. + /// Returns the task ID for tracking. + pub async fn submit( + &self, + conversation_id: ConversationId, + message: String, + agent_id: Option, + attachments: Vec, + ) -> anyhow::Result { + // Resolve agent ID + let agent_id = match agent_id { + Some(id) => id, + None => self + .api + .get_active_agent() + .await + .ok_or_else(|| anyhow::anyhow!("No active agent configured"))?, + }; + + // Ensure conversation exists + self.ensure_conversation(&conversation_id).await?; + + // Create task + // Use the first 100 chars of message as title + let title = if message.len() > 100 { + format!("{}...", &message[0..100]) + } else { + message.clone() + }; + + let task = Task::new(conversation_id, agent_id.clone(), title); + let task_id = task.id; + + // Store task + self.store.insert_task(task.clone()).await; + + // Create event for the chat request + let event = Event { + id: uuid::Uuid::new_v4().to_string(), + value: Some(EventValue::Text(message.into())), + timestamp: chrono::Utc::now().to_rfc3339(), + attachments, + additional_context: None, + }; + + // Ensure broadcast channel exists before spawning execution. + // This creates the channel if it doesn't exist, so events won't be lost + // when the SSE handler subscribes later. + let _ensure_channel = self.broadcaster.ensure_channel(task_id).await; + + // Spawn background execution + self.spawn_execution(task_id, conversation_id, agent_id, event); + + info!(task_id = %task_id, conversation_id = %conversation_id, "Task submitted"); + + Ok(task_id) + } + + /// Ensures the conversation exists, creating it if necessary. + async fn ensure_conversation(&self, conversation_id: &ConversationId) -> anyhow::Result<()> { + let existing = self.api.conversation(conversation_id).await?; + if existing.is_none() { + let conversation = Conversation::new(*conversation_id); + self.api.upsert_conversation(conversation).await?; + } + Ok(()) + } + + /// Spawns the background task execution. + fn spawn_execution( + &self, + task_id: TaskId, + conversation_id: ConversationId, + _agent_id: AgentId, + event: Event, + ) { + let store = self.store.clone(); + let broadcaster = self.broadcaster.clone(); + let api = self.api.clone(); + + tokio::spawn(async move { + // Mark task as running + if let Some(mut task) = store.get_task(task_id).await { + task.start(); + store.update_task(task).await; + } + + // Emit started event + let start_event = TaskEvent::started(); + store.append_event(task_id, start_event.clone()).await; + broadcaster.broadcast(task_id, start_event).await; + + // Create chat request + let chat_request = ChatRequest { event, conversation_id }; + + // Execute chat + match api.chat(chat_request).await { + Ok(mut stream) => { + let mut has_error = false; + let mut last_error = None; + + while let Some(result) = futures::StreamExt::next(&mut stream).await { + match result { + Ok(response) => { + // Check for completion + let is_complete = + matches!(response, paws_domain::ChatResponse::TaskComplete); + + // Broadcast the response + let event = TaskEvent::message(response); + store.append_event(task_id, event.clone()).await; + broadcaster.broadcast(task_id, event).await; + + if is_complete { + break; + } + } + Err(e) => { + error!(task_id = %task_id, error = %e, "Stream error"); + has_error = true; + last_error = Some(e.to_string()); + let event = TaskEvent::error(e.to_string()); + store.append_event(task_id, event.clone()).await; + broadcaster.broadcast(task_id, event).await; + } + } + } + + // Mark task as completed or failed based on whether errors occurred + if let Some(mut task) = store.get_task(task_id).await { + if has_error { + // Safety: has_error is only set to true when last_error is Some + let error_msg = last_error.unwrap(); + task.fail(error_msg.clone()); + store.update_task(task.clone()).await; + let event = TaskEvent::failed(error_msg); + store.append_event(task_id, event.clone()).await; + broadcaster.broadcast(task_id, event).await; + error!(task_id = %task_id, "Task failed"); + } else { + task.complete(); + store.update_task(task.clone()).await; + let event = TaskEvent::completed(); + store.append_event(task_id, event.clone()).await; + broadcaster.broadcast(task_id, event).await; + info!(task_id = %task_id, "Task completed"); + } + } + } + Err(e) => { + error!(task_id = %task_id, error = %e, "Failed to start chat"); + + // Mark task as failed + if let Some(mut task) = store.get_task(task_id).await { + task.fail(e.to_string()); + store.update_task(task.clone()).await; + let event = TaskEvent::failed(e.to_string()); + store.append_event(task_id, event.clone()).await; + broadcaster.broadcast(task_id, event).await; + } + } + } + }); + } + + /// Gets a task by ID. + pub async fn get_task(&self, id: TaskId) -> Option { + self.store.get_task(id).await + } + + /// Lists tasks, optionally filtered by conversation. + pub async fn list_tasks(&self, conversation_id: Option) -> Vec { + self.store.list_tasks(conversation_id).await + } + + /// Gets events for a task. + pub async fn get_events(&self, task_id: TaskId) -> Vec { + self.store.get_events(task_id).await + } + + /// Gets events since a specific index (for reconnection). + pub async fn get_events_since(&self, task_id: TaskId, since_index: usize) -> Vec { + self.store.get_events_since(task_id, since_index).await + } + + /// Cancels a running task. + pub async fn cancel(&self, id: TaskId) -> anyhow::Result<()> { + if let Some(mut task) = self.store.get_task(id).await { + if task.status.is_terminal() { + return Err(anyhow::anyhow!("Task already completed")); + } + + task.cancel(); + self.store.update_task(task).await; + let event = TaskEvent::cancelled(); + self.store.append_event(id, event.clone()).await; + self.broadcaster.broadcast(id, event).await; + info!(task_id = %id, "Task cancelled"); + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_task_id_generation() { + let id1 = TaskId::new(); + let id2 = TaskId::new(); + assert_ne!(id1, id2); + } +} diff --git a/crates/paws_server/src/task/mod.rs b/crates/paws_server/src/task/mod.rs new file mode 100644 index 00000000..6166def7 --- /dev/null +++ b/crates/paws_server/src/task/mod.rs @@ -0,0 +1,7 @@ +//! Task management for background orchestration. + +mod manager; +mod store; + +pub use manager::TaskManager; +pub use store::{Task, TaskId, TaskStatus, TaskStore}; diff --git a/crates/paws_server/src/task/store.rs b/crates/paws_server/src/task/store.rs new file mode 100644 index 00000000..5d051378 --- /dev/null +++ b/crates/paws_server/src/task/store.rs @@ -0,0 +1,271 @@ +//! Task domain types and storage. + +use std::collections::HashMap; +use std::str::FromStr; + +use chrono::{DateTime, Utc}; +use paws_domain::{AgentId, ConversationId}; +use serde::{Deserialize, Serialize}; +use tokio::sync::RwLock; +use uuid::Uuid; + +use crate::events::TaskEvent; + +/// Unique identifier for a task. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct TaskId(pub Uuid); + +impl TaskId { + /// Generates a new random task ID. + pub fn new() -> Self { + Self(Uuid::new_v4()) + } +} + +impl Default for TaskId { + fn default() -> Self { + Self::new() + } +} + +impl std::fmt::Display for TaskId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl FromStr for TaskId { + type Err = uuid::Error; + + fn from_str(s: &str) -> Result { + Uuid::parse_str(s).map(TaskId) + } +} + +/// Status of a task in its lifecycle. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum TaskStatus { + /// Task is queued but not yet started. + Pending, + /// Task is currently being processed. + Running { started_at: DateTime }, + /// Task completed successfully. + Completed { + started_at: DateTime, + completed_at: DateTime, + }, + /// Task failed with an error. + Failed { + started_at: DateTime, + completed_at: DateTime, + error: String, + }, + /// Task was cancelled by user request. + Cancelled { + started_at: Option>, + completed_at: DateTime, + }, +} + +impl TaskStatus { + /// Checks if the task is in a terminal state. + pub fn is_terminal(&self) -> bool { + matches!( + self, + TaskStatus::Completed { .. } | TaskStatus::Failed { .. } | TaskStatus::Cancelled { .. } + ) + } + + /// Checks if the task is currently running. + pub fn is_running(&self) -> bool { + matches!(self, TaskStatus::Running { .. }) + } +} + +/// A task represents a unit of work submitted to the server. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Task { + /// Unique task identifier. + pub id: TaskId, + /// Conversation this task belongs to. + pub conversation_id: ConversationId, + /// Current status of the task. + pub status: TaskStatus, + /// When the task was created. + pub created_at: DateTime, + /// Agent assigned to process this task. + pub agent_id: AgentId, + /// Title or summary of the task (usually the initial message). + pub title: String, +} + +impl Task { + /// Creates a new pending task. + pub fn new(conversation_id: ConversationId, agent_id: AgentId, title: String) -> Self { + Self { + id: TaskId::new(), + conversation_id, + status: TaskStatus::Pending, + created_at: Utc::now(), + agent_id, + title, + } + } + + /// Marks the task as running. + pub fn start(&mut self) { + self.status = TaskStatus::Running { started_at: Utc::now() }; + } + + /// Marks the task as completed. + pub fn complete(&mut self) { + if let TaskStatus::Running { started_at } = self.status { + self.status = TaskStatus::Completed { started_at, completed_at: Utc::now() }; + } + } + + /// Marks the task as failed. + pub fn fail(&mut self, error: String) { + let started_at = match &self.status { + TaskStatus::Running { started_at } => *started_at, + _ => Utc::now(), + }; + self.status = TaskStatus::Failed { started_at, completed_at: Utc::now(), error }; + } + + /// Marks the task as cancelled. + pub fn cancel(&mut self) { + let started_at = match &self.status { + TaskStatus::Running { started_at } => Some(*started_at), + _ => None, + }; + self.status = TaskStatus::Cancelled { started_at, completed_at: Utc::now() }; + } +} + +/// In-memory store for tasks and their events. +#[derive(Debug, Default)] +pub struct TaskStore { + tasks: RwLock>, + events: RwLock>>, +} + +impl TaskStore { + /// Creates a new empty task store. + pub fn new() -> Self { + Self::default() + } + + /// Stores a new task. + pub async fn insert_task(&self, task: Task) { + self.tasks.write().await.insert(task.id, task); + } + + /// Retrieves a task by ID. + pub async fn get_task(&self, id: TaskId) -> Option { + self.tasks.read().await.get(&id).cloned() + } + + /// Updates a task. + pub async fn update_task(&self, task: Task) { + self.tasks.write().await.insert(task.id, task); + } + + /// Lists all tasks, optionally filtered by conversation. + pub async fn list_tasks(&self, conversation_id: Option) -> Vec { + let tasks = self.tasks.read().await; + tasks + .values() + .filter(|t| { + conversation_id + .map(|cid| t.conversation_id == cid) + .unwrap_or(true) + }) + .cloned() + .collect() + } + + /// Appends an event to a task's event log. + pub async fn append_event(&self, task_id: TaskId, event: TaskEvent) { + self.events + .write() + .await + .entry(task_id) + .or_default() + .push(event); + } + + /// Retrieves all events for a task. + pub async fn get_events(&self, task_id: TaskId) -> Vec { + self.events + .read() + .await + .get(&task_id) + .cloned() + .unwrap_or_default() + } + + /// Gets events since a specific index (for reconnection). + pub async fn get_events_since(&self, task_id: TaskId, since_index: usize) -> Vec { + self.events + .read() + .await + .get(&task_id) + .map(|events| events.iter().skip(since_index).cloned().collect()) + .unwrap_or_default() + } +} + +#[cfg(test)] +mod tests { + use pretty_assertions::assert_eq; + + use super::*; + + #[test] + fn test_task_lifecycle() { + let conversation_id = ConversationId::default(); + let agent_id = AgentId::new("test-agent"); + let title = "Test task".to_string(); + + let mut fixture = Task::new(conversation_id, agent_id, title); + assert!(matches!(fixture.status, TaskStatus::Pending)); + + fixture.start(); + assert!(fixture.status.is_running()); + + fixture.complete(); + assert!(fixture.status.is_terminal()); + } + + #[test] + fn test_task_failure() { + let conversation_id = ConversationId::default(); + let agent_id = AgentId::new("test-agent"); + let title = "Test task".to_string(); + + let mut fixture = Task::new(conversation_id, agent_id, title); + fixture.start(); + fixture.fail("Something went wrong".to_string()); + + let actual = &fixture.status; + assert!( + matches!(actual, TaskStatus::Failed { error, .. } if error == "Something went wrong") + ); + } + + #[tokio::test] + async fn test_task_store() { + let store = TaskStore::new(); + let conversation_id = ConversationId::default(); + let agent_id = AgentId::new("test-agent"); + let title = "Test task".to_string(); + let task = Task::new(conversation_id, agent_id, title); + let task_id = task.id; + + store.insert_task(task.clone()).await; + let actual = store.get_task(task_id).await; + assert_eq!(actual.unwrap().id, task_id); + } +} diff --git a/crates/paws_services/src/conversation.rs b/crates/paws_services/src/conversation.rs index 43adc12c..d809256e 100644 --- a/crates/paws_services/src/conversation.rs +++ b/crates/paws_services/src/conversation.rs @@ -51,6 +51,15 @@ impl ConversationService for PawsConversationService< Ok(()) } + async fn get_conversation_summaries( + &self, + limit: Option, + ) -> Result>> { + self.conversation_repository + .get_all_conversation_summaries(limit) + .await + } + async fn get_conversations(&self, limit: Option) -> Result>> { self.conversation_repository .get_all_conversations(limit)