diff --git a/README.md b/README.md index a4eec5e0..64c4d0b3 100644 --- a/README.md +++ b/README.md @@ -198,3 +198,59 @@ When making changes to Django models, you need to create and apply migrations: - Ensure VS Code Python extension is installed - Check that breakpoints are set in the correct files - Verify the debug server shows "Debug server listening on port 5678" + +## Model Context Protocol (MCP) Server + +The application includes a built-in MCP server that allows AI agents (such as Cursor, Claude Desktop, or Gemini) to interact directly with the backend database to query, create, update, and manage tasks, teams, and users. + +### Local Configuration + +1. Make sure your virtual environment is activated and requirements are installed. +2. In your `.env` file, specify either the user email or user ID of a registered user who already has access to the application: + ```env + MCP_USER_EMAIL='user@example.com' + ``` +3. Test that the MCP server starts up locally: + ```bash + python manage.py run_mcp + ``` + +### Client Integration + +To register this MCP server with your developer clients: + +#### Cursor +1. Go to **Settings > Features > MCP**. +2. Click **+ Add New MCP Server**. +3. Set the following fields: + - **Name**: `todo-backend` + - **Type**: `command` + - **Command**: `/path/to/todo-backend/venv/bin/python /path/to/todo-backend/manage.py run_mcp` +4. Click **+ Add Env Var** and configure `MCP_USER_EMAIL` (or `MCP_USER_ID`). + +#### Claude Desktop +Add the following block to your `claude_desktop_config.json` (usually located at `~/Library/Application Support/Claude/claude_desktop_config.json` on macOS): +```json +{ + "mcpServers": { + "todo-backend": { + "command": "/path/to/todo-backend/venv/bin/python", + "args": ["/path/to/todo-backend/manage.py", "run_mcp"], + "env": { + "MCP_USER_EMAIL": "user@example.com", + "CORS_ALLOWED_ORIGINS": "http://localhost:3000,http://localhost:8000" + } + } + } +} +``` + +### Deployment Considerations + +Since the default command `python manage.py run_mcp` runs the MCP server over standard input/output (stdio), it is designed to run locally alongside your editor or local client. + +If you wish to deploy the MCP server as a remote service (e.g., for a hosted team or multi-user setup): +1. **HTTP/SSE Transport**: FastMCP supports Server-Sent Events (SSE) over HTTP. You can modify the startup call in `run_mcp.py` to use `mcp.run("sse")` which spins up an ASGI/HTTP endpoint rather than `"stdio"`. +2. **Security & Authentication**: Ensure that access to the remote SSE endpoint is protected behind a reverse proxy (e.g., Nginx, Cloudflare Access) requiring API token validation or OAuth headers. +3. **Database Access**: Ensure the deployed environment is correctly configured with environment variables to connect to MongoDB and PostgreSQL. + diff --git a/requirements.txt b/requirements.txt index 832c173c..9d0b2319 100644 --- a/requirements.txt +++ b/requirements.txt @@ -28,4 +28,6 @@ email-validator==2.2.0 testcontainers[mongodb]==4.10.0 drf-spectacular==0.28.0 debugpy==1.8.14 -psycopg2-binary==2.9.9 +psycopg2-binary>=2.9.10 +mcp>=1.0.0 + diff --git a/todo/management/commands/run_mcp.py b/todo/management/commands/run_mcp.py new file mode 100644 index 00000000..ae9e618e --- /dev/null +++ b/todo/management/commands/run_mcp.py @@ -0,0 +1,406 @@ +import os +import json +from datetime import datetime, timezone +from typing import Optional + +from django.core.management.base import BaseCommand +from mcp.server.fastmcp import FastMCP + +from todo.services.task_service import TaskService +from todo.services.user_service import UserService +from todo.services.team_service import TeamService +from todo.repositories.user_repository import UserRepository +from todo.models.user import UserModel +from todo.dto.task_dto import CreateTaskDTO +from todo.constants.task import TaskPriority, TaskStatus + + +# Initialize FastMCP Server +mcp = FastMCP("TodoBackend") + + +def verify_access() -> UserModel: + """ + Verify that the user configured via environment variables is registered in the database. + Only allows access to users who are registered in the application database. + """ + user_id = os.getenv("MCP_USER_ID") + user_email = os.getenv("MCP_USER_EMAIL") + + if not user_id and not user_email: + raise ValueError( + "Access Denied: Neither MCP_USER_ID nor MCP_USER_EMAIL is configured in environment variables. " + "Please configure one of these variables to access the application." + ) + + collection = UserRepository._get_collection() + + if user_id: + try: + from todo.models.common.pyobjectid import PyObjectId + + object_id = PyObjectId(user_id) + doc = collection.find_one({"_id": object_id}) + except Exception: + doc = None + else: + doc = collection.find_one({"email_id": user_email}) + + if not doc: + detail_msg = f"ID '{user_id}'" if user_id else f"email '{user_email}'" + raise ValueError( + f"Access Denied: No registered user found matching {detail_msg}. " + "Only registered users with existing access to the application can use this MCP server." + ) + + return UserModel(**doc) + + +@mcp.tool() +def list_tasks( + status: Optional[str] = None, + priority: Optional[str] = None, + team_id: Optional[str] = None, + page: int = 1, + limit: int = 20, +) -> str: + """ + List and filter tasks. Only shows tasks that the authenticated user has access to. + + Args: + status: Filter by task status (e.g., 'TODO', 'IN_PROGRESS', 'DONE', 'DEFERRED', 'BACKLOG'). + priority: Filter by task priority (e.g., 'LOW', 'MEDIUM', 'HIGH', 'URGENT'). + team_id: Filter tasks by team ID. + page: Page number for pagination (starts at 1). + limit: Max number of tasks to return in a page. + """ + user = verify_access() + user_id_str = str(user.id) + + # Map priority string if provided to ensure it is valid + priority_val = None + if priority: + try: + priority_val = TaskPriority[priority.upper()].value + except KeyError: + return f"Error: Invalid priority '{priority}'. Allowed values: LOW, MEDIUM, HIGH, URGENT." + + # Map status string if provided + status_val = None + if status: + try: + status_val = TaskStatus[status.upper()].value + except KeyError: + return f"Error: Invalid status '{status}'. Allowed values: TODO, IN_PROGRESS, DONE, DEFERRED, BACKLOG." + + try: + response = TaskService.get_tasks( + page=page, + limit=limit, + sort_by="createdAt", + order="desc", + user_id=user_id_str, + team_id=team_id, + status_filter=status_val, + ) + + if response.error: + return f"Error: {response.error.get('message')}" + + tasks_data = [] + for t in response.tasks: + # Optionally filter by priority manually if the repository service didn't support it directly + if priority_val and t.priority != priority_val: + continue + tasks_data.append(t.model_dump(mode="json")) + + return json.dumps( + { + "tasks": tasks_data, + "page": page, + "limit": limit, + "has_more": response.links.next is not None if response.links else False, + }, + indent=2, + ) + + except Exception as e: + return f"Error occurred while listing tasks: {str(e)}" + + +@mcp.tool() +def get_task(task_id: str) -> str: + """ + Get detailed information about a single task by its database ID. + """ + verify_access() + try: + task_dto = TaskService.get_task_by_id(task_id) + return json.dumps(task_dto.model_dump(mode="json"), indent=2) + except Exception as e: + return f"Error: Task not found or retrieval failed. Details: {str(e)}" + + +@mcp.tool() +def create_task( + title: str, + description: Optional[str] = None, + priority: str = "LOW", + status: str = "TODO", + assignee_id: Optional[str] = None, + assignee_type: Optional[str] = None, + due_at: Optional[str] = None, +) -> str: + """ + Create a new task. + + Args: + title: The title of the task. + description: Detailed description of the task. + priority: Priority of the task (LOW, MEDIUM, HIGH, URGENT). + status: Initial status of the task (TODO, IN_PROGRESS, DONE, DEFERRED, BACKLOG). + assignee_id: ID of the user or team to assign this task to. + assignee_type: Either 'user' or 'team'. Required if assignee_id is provided. + due_at: Due date/time in ISO format (e.g. '2026-06-30T12:00:00Z'). + """ + user = verify_access() + user_id_str = str(user.id) + + # Validate assignee parameters + assignee_dict = None + if assignee_id: + if not assignee_type or assignee_type.lower() not in ["user", "team"]: + return "Error: assignee_type must be either 'user' or 'team' when assignee_id is provided." + assignee_dict = { + "assignee_id": assignee_id, + "user_type": assignee_type.lower(), + } + + # Parse priority and status + try: + priority_enum = TaskPriority[priority.upper()] + except KeyError: + return f"Error: Invalid priority '{priority}'. Allowed values: LOW, MEDIUM, HIGH, URGENT." + + try: + status_enum = TaskStatus[status.upper()] + except KeyError: + return f"Error: Invalid status '{status}'. Allowed values: TODO, IN_PROGRESS, DONE, DEFERRED, BACKLOG." + + # Parse due_at datetime + due_dt = None + if due_at: + try: + cleaned_iso = due_at.replace("Z", "+00:00") + due_dt = datetime.fromisoformat(cleaned_iso) + if due_dt.tzinfo is None: + due_dt = due_dt.replace(tzinfo=timezone.utc) + except Exception: + return "Error: Invalid due_at format. Please use ISO 8601 format (e.g., '2026-06-30T12:00:00Z')." + + try: + dto = CreateTaskDTO( + title=title, + description=description, + priority=priority_enum, + status=status_enum, + assignee=assignee_dict, + labels=[], + dueAt=due_dt, + createdBy=user_id_str, + ) + + response = TaskService.create_task(dto) + return json.dumps( + { + "message": "Task created successfully", + "task": response.data.model_dump(mode="json"), + }, + indent=2, + ) + + except Exception as e: + return f"Error creating task: {str(e)}" + + +@mcp.tool() +def update_task( + task_id: str, + title: Optional[str] = None, + description: Optional[str] = None, + priority: Optional[str] = None, + status: Optional[str] = None, + assignee_id: Optional[str] = None, + assignee_type: Optional[str] = None, + due_at: Optional[str] = None, +) -> str: + """ + Update fields of an existing task. + + Args: + task_id: Database ID of the task to update. + title: New title for the task. + description: New description for the task. + priority: New priority (LOW, MEDIUM, HIGH, URGENT). + status: New status (TODO, IN_PROGRESS, DONE, DEFERRED, BACKLOG). + assignee_id: New user or team ID to assign the task to. Set to empty string to unassign. + assignee_type: Required if assignee_id is provided. 'user' or 'team'. + due_at: New due date in ISO format (e.g. '2026-06-30T12:00:00Z'). + """ + user = verify_access() + user_id_str = str(user.id) + + validated_data = {} + + if title is not None: + validated_data["title"] = title + if description is not None: + validated_data["description"] = description + + if priority is not None: + try: + validated_data["priority"] = TaskPriority[priority.upper()] + except KeyError: + return f"Error: Invalid priority '{priority}'. Allowed values: LOW, MEDIUM, HIGH, URGENT." + + if status is not None: + try: + validated_data["status"] = TaskStatus[status.upper()].value + except KeyError: + return f"Error: Invalid status '{status}'. Allowed values: TODO, IN_PROGRESS, DONE, DEFERRED, BACKLOG." + + if due_at is not None: + if due_at == "": + validated_data["dueAt"] = None + else: + try: + cleaned_iso = due_at.replace("Z", "+00:00") + due_dt = datetime.fromisoformat(cleaned_iso) + if due_dt.tzinfo is None: + due_dt = due_dt.replace(tzinfo=timezone.utc) + validated_data["dueAt"] = due_dt + except Exception: + return "Error: Invalid due_at format. Please use ISO 8601 format (e.g., '2026-06-30T12:00:00Z')." + + if assignee_id is not None: + if assignee_id == "": + validated_data["assignee"] = None + else: + if not assignee_type or assignee_type.lower() not in ["user", "team"]: + return "Error: assignee_type must be 'user' or 'team' when assignee_id is provided." + validated_data["assignee"] = { + "assignee_id": assignee_id, + "user_type": assignee_type.lower(), + } + + if not validated_data: + return "Error: No update parameters provided." + + try: + updated_dto = TaskService.update_task_with_assignee_from_dict( + task_id=task_id, + validated_data=validated_data, + user_id=user_id_str, + ) + return json.dumps( + { + "message": "Task updated successfully", + "task": updated_dto.model_dump(mode="json"), + }, + indent=2, + ) + except Exception as e: + return f"Error updating task: {str(e)}" + + +@mcp.tool() +def delete_task(task_id: str) -> str: + """ + Delete a task from the system. Only allowed if the user created it or is assigned to it. + """ + user = verify_access() + user_id_str = str(user.id) + + try: + TaskService.delete_task(task_id, user_id_str) + return f"Task {task_id} successfully deleted." + except Exception as e: + return f"Error deleting task: {str(e)}" + + +@mcp.tool() +def list_users(page: int = 1, limit: int = 50) -> str: + """ + List registered users in the application to retrieve their names and IDs. + """ + verify_access() + try: + users, total_count = UserService.get_all_users(page=page, limit=limit) + users_list = [u.model_dump(mode="json") for u in users] + return json.dumps( + { + "users": users_list, + "total_count": total_count, + "page": page, + "limit": limit, + }, + indent=2, + ) + except Exception as e: + return f"Error listing users: {str(e)}" + + +@mcp.tool() +def list_teams() -> str: + """ + List all teams that the active user belongs to, including team IDs. + """ + user = verify_access() + user_id_str = str(user.id) + + try: + response = TeamService.get_user_teams(user_id_str) + + teams_list = [team.model_dump(mode="json") for team in response.teams] + return json.dumps({"teams": teams_list}, indent=2) + except Exception as e: + return f"Error listing teams: {str(e)}" + + +@mcp.tool() +def search_users(query: str, page: int = 1, limit: int = 10) -> str: + """ + Search registered users by name or email. + + Args: + query: Part of user's name or email to search for. + page: Page number for pagination. + limit: Max number of search results to return. + """ + verify_access() + try: + users, total_count = UserService.search_users(query=query, page=page, limit=limit) + users_list = [{"id": str(u.id), "name": u.name, "email": u.email_id} for u in users] + return json.dumps( + { + "results": users_list, + "total_count": total_count, + "page": page, + "limit": limit, + }, + indent=2, + ) + except Exception as e: + return f"Error searching users: {str(e)}" + + +class Command(BaseCommand): + help = "Starts the MCP (Model Context Protocol) stdio server for Todo-Backend" + + def handle(self, *args, **options): + # Prevent any stdout messages from corrupting the stdio communication channel + # We write a startup notification to stderr instead + self.stderr.write(self.style.SUCCESS("Starting Todo-Backend MCP Server...")) + + # Run the MCP server over standard input/output (stdio) + mcp.run("stdio") diff --git a/todo/tests/unit/commands/test_run_mcp.py b/todo/tests/unit/commands/test_run_mcp.py new file mode 100644 index 00000000..e0399d7f --- /dev/null +++ b/todo/tests/unit/commands/test_run_mcp.py @@ -0,0 +1,266 @@ +import json +import os +from unittest import TestCase +from unittest.mock import patch, MagicMock +from datetime import datetime, timezone +from bson import ObjectId + +from todo.models.user import UserModel +from todo.dto.task_dto import TaskDTO +from todo.dto.user_dto import UserDTO, UsersDTO +from todo.dto.responses.get_tasks_response import GetTasksResponse +from todo.dto.responses.create_task_response import CreateTaskResponse +from todo.dto.responses.get_user_teams_response import GetUserTeamsResponse +from todo.dto.team_dto import TeamDTO +from todo.constants.task import TaskPriority, TaskStatus +from todo.models.common.pyobjectid import PyObjectId + +# Import functions under test +from todo.management.commands.run_mcp import ( + verify_access, + list_tasks, + get_task, + create_task, + update_task, + delete_task, + list_users, + list_teams, + search_users, +) + + +class TestMcpServer(TestCase): + def setUp(self): + self.mock_user_id = "60c72b2f9b1d8e3d8f8e8f8e" + self.mock_user = UserModel( + _id=PyObjectId(self.mock_user_id), + google_id="google_123", + email_id="user@example.com", + name="Test User", + picture="http://example.com/pic.jpg", + created_at=datetime.now(timezone.utc), + ) + + self.env_patcher = patch.dict(os.environ, {}) + self.env_patcher.start() + + def tearDown(self): + self.env_patcher.stop() + + def create_mock_task_dto(self, task_id="task_123", title="Sample Task"): + return TaskDTO( + id=task_id, + displayId="T-1", + title=title, + description="Task Description", + priority=TaskPriority.LOW, + status=TaskStatus.TODO, + assignee=None, + isAcknowledged=False, + labels=[], + startedAt=None, + dueAt=None, + deferredDetails=None, + in_watchlist=False, + createdAt=datetime(2026, 6, 10, tzinfo=timezone.utc), + updatedAt=None, + createdBy=UserDTO(id=self.mock_user_id, name="Test User"), + updatedBy=None, + ) + + @patch("todo.management.commands.run_mcp.UserRepository._get_collection") + def test_verify_access_no_env(self, mock_get_collection): + # Neither MCP_USER_ID nor MCP_USER_EMAIL is configured + if "MCP_USER_ID" in os.environ: + del os.environ["MCP_USER_ID"] + if "MCP_USER_EMAIL" in os.environ: + del os.environ["MCP_USER_EMAIL"] + + with self.assertRaises(ValueError) as context: + verify_access() + self.assertIn("Neither MCP_USER_ID nor MCP_USER_EMAIL is configured", str(context.exception)) + + @patch("todo.management.commands.run_mcp.UserRepository._get_collection") + def test_verify_access_by_id_success(self, mock_get_collection): + os.environ["MCP_USER_ID"] = self.mock_user_id + + mock_collection = MagicMock() + mock_collection.find_one.return_value = { + "_id": ObjectId(self.mock_user_id), + "google_id": "google_123", + "email_id": "user@example.com", + "name": "Test User", + "picture": "http://example.com/pic.jpg", + "created_at": datetime.now(timezone.utc), + } + mock_get_collection.return_value = mock_collection + + user = verify_access() + self.assertEqual(str(user.id), self.mock_user_id) + mock_collection.find_one.assert_called_once_with({"_id": ObjectId(self.mock_user_id)}) + + @patch("todo.management.commands.run_mcp.UserRepository._get_collection") + def test_verify_access_by_email_success(self, mock_get_collection): + os.environ["MCP_USER_EMAIL"] = "user@example.com" + + mock_collection = MagicMock() + mock_collection.find_one.return_value = { + "_id": ObjectId(self.mock_user_id), + "google_id": "google_123", + "email_id": "user@example.com", + "name": "Test User", + "picture": "http://example.com/pic.jpg", + "created_at": datetime.now(timezone.utc), + } + mock_get_collection.return_value = mock_collection + + user = verify_access() + self.assertEqual(user.email_id, "user@example.com") + mock_collection.find_one.assert_called_once_with({"email_id": "user@example.com"}) + + @patch("todo.management.commands.run_mcp.UserRepository._get_collection") + def test_verify_access_not_found(self, mock_get_collection): + os.environ["MCP_USER_EMAIL"] = "user@example.com" + mock_collection = MagicMock() + mock_collection.find_one.return_value = None + mock_get_collection.return_value = mock_collection + + with self.assertRaises(ValueError) as context: + verify_access() + self.assertIn("No registered user found matching email 'user@example.com'", str(context.exception)) + + @patch("todo.management.commands.run_mcp.verify_access") + @patch("todo.management.commands.run_mcp.TaskService.get_tasks") + def test_list_tasks_success(self, mock_get_tasks, mock_verify): + mock_verify.return_value = self.mock_user + mock_task = self.create_mock_task_dto() + + mock_get_tasks.return_value = GetTasksResponse(tasks=[mock_task], links=None) + + result_str = list_tasks() + result = json.loads(result_str) + + self.assertEqual(len(result["tasks"]), 1) + self.assertEqual(result["tasks"][0]["title"], "Sample Task") + mock_get_tasks.assert_called_once() + + @patch("todo.management.commands.run_mcp.verify_access") + @patch("todo.management.commands.run_mcp.TaskService.get_task_by_id") + def test_get_task_success(self, mock_get_task_by_id, mock_verify): + mock_verify.return_value = self.mock_user + mock_task = self.create_mock_task_dto() + + mock_get_task_by_id.return_value = mock_task + + result_str = get_task("task_123") + try: + result = json.loads(result_str) + except Exception as e: + print("\n--- DEBUG GET_TASK OUTPUT ---") + print(result_str) + print("-----------------------------\n") + raise e + + self.assertEqual(result["id"], "task_123") + self.assertEqual(result["title"], "Sample Task") + + @patch("todo.management.commands.run_mcp.verify_access") + @patch("todo.management.commands.run_mcp.TaskService.create_task") + def test_create_task_success(self, mock_create_task, mock_verify): + mock_verify.return_value = self.mock_user + mock_task = self.create_mock_task_dto(task_id="task_created", title="New Task") + + mock_create_task.return_value = CreateTaskResponse(data=mock_task) + + result_str = create_task(title="New Task", priority="HIGH", status="IN_PROGRESS") + result = json.loads(result_str) + + self.assertEqual(result["task"]["id"], "task_created") + self.assertEqual(result["task"]["displayId"], "T-1") + mock_create_task.assert_called_once() + + @patch("todo.management.commands.run_mcp.verify_access") + @patch("todo.management.commands.run_mcp.TaskService.update_task_with_assignee_from_dict") + def test_update_task_success(self, mock_update_task, mock_verify): + mock_verify.return_value = self.mock_user + mock_task = self.create_mock_task_dto(task_id="task_123") + mock_task.status = TaskStatus.DONE + mock_task.priority = TaskPriority.MEDIUM + + mock_update_task.return_value = mock_task + + result_str = update_task(task_id="task_123", status="DONE", priority="MEDIUM") + result = json.loads(result_str) + + self.assertEqual(result["task"]["id"], "task_123") + self.assertEqual(result["task"]["status"], "DONE") + mock_update_task.assert_called_once() + + @patch("todo.management.commands.run_mcp.verify_access") + @patch("todo.management.commands.run_mcp.TaskService.delete_task") + def test_delete_task_success(self, mock_delete_task, mock_verify): + mock_verify.return_value = self.mock_user + + result = delete_task("task_123") + self.assertIn("successfully deleted", result) + mock_delete_task.assert_called_once_with("task_123", self.mock_user_id) + + @patch("todo.management.commands.run_mcp.verify_access") + @patch("todo.management.commands.run_mcp.UserService.get_all_users") + def test_list_users_success(self, mock_get_all_users, mock_verify): + mock_verify.return_value = self.mock_user + mock_user_dto = UsersDTO(id="user_234", name="Other User") + mock_get_all_users.return_value = ([mock_user_dto], 1) + + result_str = list_users() + result = json.loads(result_str) + + self.assertEqual(result["total_count"], 1) + self.assertEqual(result["users"][0]["name"], "Other User") + + @patch("todo.management.commands.run_mcp.verify_access") + @patch("todo.management.commands.run_mcp.TeamService.get_user_teams") + def test_list_teams_success(self, mock_get_user_teams, mock_verify): + mock_verify.return_value = self.mock_user + + mock_team = TeamDTO( + id="team_123", + name="Development Team", + description="Devs", + poc_id=self.mock_user_id, + invite_code="XYZ123", + created_by=self.mock_user_id, + updated_by=self.mock_user_id, + created_at=datetime.now(timezone.utc), + updated_at=datetime.now(timezone.utc), + ) + + mock_get_user_teams.return_value = GetUserTeamsResponse(teams=[mock_team]) + + result_str = list_teams() + result = json.loads(result_str) + + self.assertEqual(len(result["teams"]), 1) + self.assertEqual(result["teams"][0]["name"], "Development Team") + + @patch("todo.management.commands.run_mcp.verify_access") + @patch("todo.management.commands.run_mcp.UserService.search_users") + def test_search_users_success(self, mock_search_users, mock_verify): + mock_verify.return_value = self.mock_user + + mock_user_match = UserModel( + _id=PyObjectId("60c72b2f9b1d8e3d8f8e8f8a"), + google_id="google_456", + email_id="searched@example.com", + name="Searched User", + picture=None, + created_at=datetime.now(timezone.utc), + ) + mock_search_users.return_value = ([mock_user_match], 1) + + result_str = search_users(query="Searched") + result = json.loads(result_str) + + self.assertEqual(result["total_count"], 1) + self.assertEqual(result["results"][0]["name"], "Searched User") + self.assertEqual(result["results"][0]["email"], "searched@example.com")