From abefc6a3b9f1167429aae9c05632811a4bfcae73 Mon Sep 17 00:00:00 2001 From: VenzQ Date: Sat, 31 Jan 2026 17:08:53 +0100 Subject: [PATCH] Add Celery configuration --- docker/Dockerfile.dev | 2 +- docker/Dockerfile.prod | 2 +- docker/docker-compose.yml | 26 ++++++++++++++++++++++ main.py | 23 ------------------- requirements.txt | 2 ++ src/__init__.py | 1 + src/celery_app.py | 28 +++++++++++++++++++++++ src/main.py | 47 +++++++++++++++++++++++++++++++++++++++ src/tasks.py | 18 +++++++++++++++ tests/test_main.py | 2 +- 10 files changed, 125 insertions(+), 26 deletions(-) delete mode 100644 main.py create mode 100644 src/__init__.py create mode 100644 src/celery_app.py create mode 100644 src/main.py create mode 100644 src/tasks.py diff --git a/docker/Dockerfile.dev b/docker/Dockerfile.dev index 5ddedc2..c702a96 100644 --- a/docker/Dockerfile.dev +++ b/docker/Dockerfile.dev @@ -17,4 +17,4 @@ COPY . . EXPOSE 8000 # Run the application with reload for development -CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"] +CMD ["uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"] diff --git a/docker/Dockerfile.prod b/docker/Dockerfile.prod index 73aaad1..bb16216 100644 --- a/docker/Dockerfile.prod +++ b/docker/Dockerfile.prod @@ -16,4 +16,4 @@ COPY . . EXPOSE 8000 # Run the application -CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"] +CMD ["uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 7946b53..26cc534 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -1,4 +1,23 @@ services: + rabbitmq: + image: rabbitmq:3.12-management-alpine + container_name: rabbitmq + ports: + - "5672:5672" # AMQP port + - "15672:15672" # Management UI port + environment: + - RABBITMQ_DEFAULT_USER=guest + - RABBITMQ_DEFAULT_PASS=guest + volumes: + - rabbitmq_data:/var/lib/rabbitmq + restart: unless-stopped + healthcheck: + test: ["CMD", "rabbitmq-diagnostics", "ping"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 30s + api: build: context: .. @@ -10,6 +29,10 @@ services: - ..:/app environment: - PYTHONUNBUFFERED=1 + - CELERY_BROKER_URL=amqp://guest:guest@rabbitmq:5672// + depends_on: + rabbitmq: + condition: service_healthy restart: unless-stopped healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8000/health"] @@ -17,3 +40,6 @@ services: timeout: 10s retries: 3 start_period: 40s + +volumes: + rabbitmq_data: diff --git a/main.py b/main.py deleted file mode 100644 index 55edb8b..0000000 --- a/main.py +++ /dev/null @@ -1,23 +0,0 @@ -from fastapi import FastAPI -from fastapi.middleware.cors import CORSMiddleware - -app = FastAPI(title="FastAPI Starter", description="A starter FastAPI application", version="1.0.0") - -# CORS middleware -app.add_middleware( - CORSMiddleware, - allow_origins=["*"], - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], -) - - -@app.get("/") -async def root(): - return {"message": "Hello World", "status": "ok"} - - -@app.get("/health") -async def health(): - return {"status": "healthy"} diff --git a/requirements.txt b/requirements.txt index c1fb96e..1878a3e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,5 @@ fastapi==0.104.1 uvicorn[standard]==0.24.0 python-multipart==0.0.6 +celery==5.3.4 +kombu==5.3.4 \ No newline at end of file diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..09c3ba8 --- /dev/null +++ b/src/__init__.py @@ -0,0 +1 @@ +# FastAPI application package diff --git a/src/celery_app.py b/src/celery_app.py new file mode 100644 index 0000000..a01fbce --- /dev/null +++ b/src/celery_app.py @@ -0,0 +1,28 @@ +import os + +from celery import Celery + +# RabbitMQ connection URL +broker_url = os.getenv("CELERY_BROKER_URL") +# RPC backend for results (uses RabbitMQ RPC, no additional service needed) +result_backend = os.getenv("CELERY_RESULT_BACKEND") + +# Create Celery instance +celery_app = Celery( + "fastapi_app", + broker=broker_url, + backend=result_backend, + include=["src.tasks"], +) + +# Celery configuration +celery_app.conf.update( + task_serializer="json", + accept_content=["json"], + result_serializer="json", + timezone="UTC", + enable_utc=True, + task_track_started=True, + task_time_limit=30 * 60, + task_soft_time_limit=25 * 60, +) diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..ca543fa --- /dev/null +++ b/src/main.py @@ -0,0 +1,47 @@ +from fastapi import FastAPI, HTTPException +from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel + +from src.tasks import process_message + +app = FastAPI(title="FastAPI Starter", description="A starter FastAPI application", version="1.0.0") + +# CORS middleware +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + + +class MessageRequest(BaseModel): + message: str + + +@app.get("/") +async def root(): + return {"message": "Hello World", "status": "ok"} + + +@app.get("/health") +async def health(): + return {"status": "healthy"} + + +@app.post("/send-task") +async def send_task(request: MessageRequest): + """ + Endpoint do wysyłania zadań do kolejki RabbitMQ przez Celery. + """ + try: + task = process_message.delay(request.message) + return { + "status": "success", + "message": "Task sent to queue", + "task_id": task.id, + "message_content": request.message, + } + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to send task: {str(e)}") from e diff --git a/src/tasks.py b/src/tasks.py new file mode 100644 index 0000000..2b86d1d --- /dev/null +++ b/src/tasks.py @@ -0,0 +1,18 @@ +from datetime import datetime + +from src.celery_app import celery_app + + +@celery_app.task(name="tasks.process_message") +def process_message(message: str) -> dict: + """ + Przykładowe zadanie Celery, które przetwarza wiadomość. + W rzeczywistej aplikacji tutaj można wykonać ciężkie operacje. + """ + # Symulacja przetwarzania + result = { + "status": "processed", + "message": message, + "processed_at": datetime.utcnow().isoformat() + "Z", + } + return result diff --git a/tests/test_main.py b/tests/test_main.py index 5d98849..34c4af9 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -3,7 +3,7 @@ import pytest from fastapi.testclient import TestClient -from main import app +from src.main import app @pytest.fixture