diff --git a/.gitignore b/.gitignore index 0303da6..f59e49d 100644 --- a/.gitignore +++ b/.gitignore @@ -67,3 +67,6 @@ htmlcov/ .pytest_cache/ .mypy_cache/ .ruff_cache/ + +# DB +celery_results.db \ No newline at end of file diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 26cc534..d13d799 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -40,6 +40,23 @@ services: timeout: 10s retries: 3 start_period: 40s + command: uvicorn src.main:app --host 0.0.0.0 --port 8000 --reload + + celery-worker: + build: + context: .. + dockerfile: docker/Dockerfile.dev + container_name: celery-worker + volumes: + - ..:/app + environment: + - PYTHONUNBUFFERED=1 + - CELERY_BROKER_URL=amqp://guest:guest@rabbitmq:5672// + depends_on: + rabbitmq: + condition: service_healthy + restart: unless-stopped + command: celery -A src.celery_app worker --loglevel=info volumes: rabbitmq_data: diff --git a/requirements.txt b/requirements.txt index f80db32..92014a1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,5 @@ uvicorn[standard]==0.24.0 python-multipart==0.0.6 celery==5.3.4 kombu==5.3.4 -pydantic-settings==2.1.0 \ No newline at end of file +pydantic-settings==2.1.0 +sqlalchemy==2.0.23 \ No newline at end of file diff --git a/src/celery_app.py b/src/celery_app.py index 7b6cc9f..c568813 100644 --- a/src/celery_app.py +++ b/src/celery_app.py @@ -22,4 +22,5 @@ task_track_started=True, task_time_limit=30 * 60, task_soft_time_limit=25 * 60, + result_backend=settings.CELERY_RESULT_BACKEND, ) diff --git a/src/config.py b/src/config.py index a0a74b6..ee3d7bb 100644 --- a/src/config.py +++ b/src/config.py @@ -1,15 +1,20 @@ from functools import lru_cache +from pathlib import Path from typing import Literal from pydantic import Field, field_validator from pydantic_settings import BaseSettings, SettingsConfigDict +# Get the project root directory (parent of src/) +PROJECT_ROOT = Path(__file__).parent.parent +ENV_FILE = PROJECT_ROOT / ".env" + class Settings(BaseSettings): """Centralna konfiguracja aplikacji z walidacją.""" model_config = SettingsConfigDict( - env_file=".env", + env_file=str(ENV_FILE), env_file_encoding="utf-8", case_sensitive=False, extra="ignore", @@ -28,9 +33,19 @@ class Settings(BaseSettings): # Celery CELERY_BROKER_URL: str = Field(..., description="URL brokera Celery (RabbitMQ/Redis)") CELERY_RESULT_BACKEND: str = Field( - default="", description="Backend dla wyników Celery (opcjonalne)" + default=f"db+sqlite:///{PROJECT_ROOT.resolve()}/celery_results.db", + description="Backend dla wyników Celery (domyślnie SQLite)", ) + @field_validator("CELERY_RESULT_BACKEND") + @classmethod + def validate_celery_backend(cls, v: str) -> str: + """Zapewnia, że backend zawsze ma wartość (domyślnie SQLite).""" + if not v or v.strip() == "": + db_path = PROJECT_ROOT.resolve() / "celery_results.db" + return f"db+sqlite:///{db_path}" + return v + @field_validator("CELERY_BROKER_URL") @classmethod def validate_celery_broker(cls, v: str) -> str: diff --git a/src/main.py b/src/main.py index 73d5517..093181b 100644 --- a/src/main.py +++ b/src/main.py @@ -1,7 +1,9 @@ +from celery.result import AsyncResult # type: ignore[import-untyped] from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel +from src.celery_app import celery_app from src.config import get_settings from src.tasks import process_message @@ -52,3 +54,30 @@ async def send_task(request: MessageRequest): } except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to send task: {str(e)}") from e + + +@app.get("/task/{task_id}") +async def get_task_status(task_id: str): + """ + Endpoint do sprawdzania statusu i wyniku zadania Celery. + """ + try: + task_result = AsyncResult(task_id, app=celery_app) + response = { + "task_id": task_id, + "status": task_result.state, + } + + if task_result.state == "PENDING": + response["message"] = "Task is waiting to be processed" + elif task_result.state == "PROGRESS": + response["current"] = task_result.info.get("current", 0) + response["total"] = task_result.info.get("total", 0) + elif task_result.state == "SUCCESS": + response["result"] = task_result.result + elif task_result.state == "FAILURE": + response["error"] = str(task_result.info) + + return response + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to get task status: {str(e)}") from e diff --git a/src/tasks.py b/src/tasks.py index 2b86d1d..25dd61e 100644 --- a/src/tasks.py +++ b/src/tasks.py @@ -1,7 +1,10 @@ +import logging from datetime import datetime from src.celery_app import celery_app +logger = logging.getLogger(__name__) + @celery_app.task(name="tasks.process_message") def process_message(message: str) -> dict: @@ -9,10 +12,13 @@ def process_message(message: str) -> dict: Przykładowe zadanie Celery, które przetwarza wiadomość. W rzeczywistej aplikacji tutaj można wykonać ciężkie operacje. """ - # Symulacja przetwarzania + logger.info(f"Rozpoczynam przetwarzanie wiadomości: {message}") + result = { "status": "processed", "message": message, "processed_at": datetime.utcnow().isoformat() + "Z", } + + logger.info(f"Zakończono przetwarzanie wiadomości: {message}") return result diff --git a/terraform/cloudwatch.tf b/terraform/cloudwatch.tf index fbc6160..79efdd6 100644 --- a/terraform/cloudwatch.tf +++ b/terraform/cloudwatch.tf @@ -17,3 +17,13 @@ resource "aws_cloudwatch_log_group" "rabbitmq" { Name = "${var.project_name}-rabbitmq-logs" } } + +# CloudWatch Log Group dla Celery Worker +resource "aws_cloudwatch_log_group" "celery_worker" { + name = "/ecs/${var.project_name}-celery-worker" + retention_in_days = 7 + + tags = { + Name = "${var.project_name}-celery-worker-logs" + } +} \ No newline at end of file diff --git a/terraform/data.tf b/terraform/data.tf index a1d067b..9f8ee39 100644 --- a/terraform/data.tf +++ b/terraform/data.tf @@ -10,13 +10,3 @@ data "aws_caller_identity" "current" {} data "aws_ecr_repository" "app" { name = var.ecr_repository_name } - -# Data source do odczytu wszystkich parametrów z Parameter Store -data "aws_ssm_parameters_by_path" "app_secrets" { - path = "/${var.project_name}" - recursive = true - - depends_on = [ - aws_ssm_parameter.celery_broker_url - ] -} diff --git a/terraform/ecs.tf b/terraform/ecs.tf index 80d82e8..f510314 100644 --- a/terraform/ecs.tf +++ b/terraform/ecs.tf @@ -17,6 +17,20 @@ resource "aws_ecs_task_definition" "rabbitmq" { execution_role_arn = aws_iam_role.ecs_task_execution.arn task_role_arn = aws_iam_role.ecs_task.arn + volume { + name = "rabbitmq-data" + + efs_volume_configuration { + file_system_id = aws_efs_file_system.rabbitmq.id + root_directory = "/" + transit_encryption = "ENABLED" + authorization_config { + access_point_id = aws_efs_access_point.rabbitmq.id + iam = "ENABLED" + } + } + } + container_definitions = jsonencode([ { name = "rabbitmq" @@ -44,6 +58,14 @@ resource "aws_ecs_task_definition" "rabbitmq" { } ] + mountPoints = [ + { + sourceVolume = "rabbitmq-data" + containerPath = "/var/lib/rabbitmq" + readOnly = false + } + ] + logConfiguration = { logDriver = "awslogs" options = { @@ -88,7 +110,10 @@ resource "aws_ecs_service" "rabbitmq" { } depends_on = [ - aws_iam_role_policy_attachment.ecs_task_execution + aws_iam_role_policy_attachment.ecs_task_execution, + aws_efs_mount_target.rabbitmq, + aws_efs_access_point.rabbitmq, + aws_iam_role_policy.ecs_task_execution_efs ] tags = { @@ -119,7 +144,12 @@ resource "aws_ecs_task_definition" "app" { ] environment = local.environment_vars - secrets = local.ssm_secrets + secrets = [ + { + name = "CELERY_BROKER_URL" + valueFrom = "arn:aws:ssm:${var.aws_region}:${data.aws_caller_identity.current.account_id}:parameter/${var.project_name}/celery_broker_url" + } + ] logConfiguration = { logDriver = "awslogs" @@ -170,3 +200,76 @@ resource "aws_ecs_service" "app" { Name = "${var.project_name}-service" } } + +# ECS Task Definition dla Celery Worker +resource "aws_ecs_task_definition" "celery_worker" { + family = "${var.project_name}-celery-worker" + network_mode = "awsvpc" + requires_compatibilities = ["FARGATE"] + cpu = 256 + memory = 512 + execution_role_arn = aws_iam_role.ecs_task_execution.arn + task_role_arn = aws_iam_role.ecs_task.arn + + container_definitions = jsonencode([ + { + name = "${var.project_name}-celery-worker" + image = "${data.aws_ecr_repository.app.repository_url}:latest" + + environment = local.environment_vars + secrets = [ + { + name = "CELERY_BROKER_URL" + valueFrom = "arn:aws:ssm:${var.aws_region}:${data.aws_caller_identity.current.account_id}:parameter/${var.project_name}/celery_broker_url" + } + ] + + command = [ + "celery", + "-A", + "src.celery_app", + "worker", + "--loglevel=info" + ] + + logConfiguration = { + logDriver = "awslogs" + options = { + "awslogs-group" = aws_cloudwatch_log_group.celery_worker.name + "awslogs-region" = var.aws_region + "awslogs-stream-prefix" = "ecs" + } + } + } + ]) + + tags = { + Name = "${var.project_name}-celery-worker-task" + } +} + +# ECS Service dla Celery Worker +resource "aws_ecs_service" "celery_worker" { + name = "${var.project_name}-celery-worker-service" + cluster = aws_ecs_cluster.main.id + task_definition = aws_ecs_task_definition.celery_worker.arn + desired_count = 1 + launch_type = "FARGATE" + + network_configuration { + subnets = [aws_subnet.public.id] + security_groups = [aws_security_group.celery_worker.id] + assign_public_ip = true + } + + depends_on = [ + aws_iam_role_policy_attachment.ecs_task_execution, + aws_ecs_service.rabbitmq, + aws_ssm_parameter.celery_broker_url, + aws_iam_role_policy.ecs_task_execution_ssm + ] + + tags = { + Name = "${var.project_name}-celery-worker-service" + } +} diff --git a/terraform/efs.tf b/terraform/efs.tf new file mode 100644 index 0000000..31a6604 --- /dev/null +++ b/terraform/efs.tf @@ -0,0 +1,68 @@ +# Security Group dla EFS +resource "aws_security_group" "efs" { + name = "${var.project_name}-efs-sg" + description = "Security group for EFS - allows NFS access from RabbitMQ ECS tasks" + vpc_id = aws_vpc.main.id + + ingress { + description = "NFS from RabbitMQ ECS tasks" + from_port = 2049 + to_port = 2049 + protocol = "tcp" + security_groups = [aws_security_group.rabbitmq.id] + } + + egress { + from_port = 0 + to_port = 0 + protocol = "-1" + cidr_blocks = ["0.0.0.0/0"] + } + + tags = { + Name = "${var.project_name}-efs-sg" + } +} + +# EFS File System - zoptymalizowany pod kątem kosztów +resource "aws_efs_file_system" "rabbitmq" { + creation_token = "${var.project_name}-rabbitmq-efs" + encrypted = true + + performance_mode = "generalPurpose" + throughput_mode = "bursting" + + tags = { + Name = "${var.project_name}-rabbitmq-efs" + } +} + +# EFS Mount Target +resource "aws_efs_mount_target" "rabbitmq" { + file_system_id = aws_efs_file_system.rabbitmq.id + subnet_id = aws_subnet.public.id + security_groups = [aws_security_group.efs.id] +} + +# EFS Access Point dla RabbitMQ - root permissions aby móc ustawić uprawnienia +resource "aws_efs_access_point" "rabbitmq" { + file_system_id = aws_efs_file_system.rabbitmq.id + + posix_user { + gid = 0 # root GID - pozwala root wykonywać operacje + uid = 0 # root UID - pozwala root wykonywać operacje + } + + root_directory { + path = "/" + creation_info { + owner_gid = 999 + owner_uid = 999 + permissions = "777" # Pozwól na zapis dla wszystkich - EFS access point zapewnia izolację + } + } + + tags = { + Name = "${var.project_name}-rabbitmq-access-point" + } +} diff --git a/terraform/iam.tf b/terraform/iam.tf index 2033e28..f8079b3 100644 --- a/terraform/iam.tf +++ b/terraform/iam.tf @@ -61,6 +61,32 @@ resource "aws_iam_role_policy" "ecs_task_execution_ssm" { }) } +# IAM Policy for EFS access +resource "aws_iam_role_policy" "ecs_task_execution_efs" { + name = "${var.project_name}-ecs-task-execution-efs-policy" + role = aws_iam_role.ecs_task_execution.id + + policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + Effect = "Allow" + Action = [ + "elasticfilesystem:ClientMount", + "elasticfilesystem:ClientWrite", + "elasticfilesystem:ClientRootAccess" + ] + Resource = aws_efs_file_system.rabbitmq.arn + Condition = { + StringEquals = { + "elasticfilesystem:AccessPointArn" = aws_efs_access_point.rabbitmq.arn + } + } + } + ] + }) +} + # IAM Role for ECS Task resource "aws_iam_role" "ecs_task" { name = "${var.project_name}-ecs-task-role" diff --git a/terraform/parameter_store.tf b/terraform/parameter_store.tf index 5ff8307..11c289a 100644 --- a/terraform/parameter_store.tf +++ b/terraform/parameter_store.tf @@ -17,20 +17,6 @@ resource "aws_ssm_parameter" "celery_broker_url" { # Lokalne zmienne do mapowania parametrów locals { - # Automatyczne mapowanie parametrów na secrets dla ECS - ssm_secrets = [ - for param_path in data.aws_ssm_parameters_by_path.app_secrets.names : { - name = upper( - replace( - replace(param_path, "/${var.project_name}/", ""), - "/", - "_" - ) - ) - valueFrom = "arn:aws:ssm:${var.aws_region}:${data.aws_caller_identity.current.account_id}:parameter${param_path}" - } - ] - # Zwykłe zmienne środowiskowe (nie-secrets) environment_vars = [ { diff --git a/terraform/security_groups.tf b/terraform/security_groups.tf index a263c80..5c668b3 100644 --- a/terraform/security_groups.tf +++ b/terraform/security_groups.tf @@ -39,11 +39,11 @@ resource "aws_security_group" "rabbitmq" { } ingress { - description = "Management UI from ECS tasks" - from_port = 15672 - to_port = 15672 + description = "AMQP from Celery worker" + from_port = 5672 + to_port = 5672 protocol = "tcp" - security_groups = [aws_security_group.ecs_tasks.id] + security_groups = [aws_security_group.celery_worker.id] } egress { @@ -57,3 +57,21 @@ resource "aws_security_group" "rabbitmq" { Name = "${var.project_name}-rabbitmq-sg" } } + +# Security Group dla Celery Worker +resource "aws_security_group" "celery_worker" { + name = "${var.project_name}-celery-worker-sg" + description = "Security group for Celery worker tasks" + vpc_id = aws_vpc.main.id + + egress { + from_port = 0 + to_port = 0 + protocol = "-1" + cidr_blocks = ["0.0.0.0/0"] + } + + tags = { + Name = "${var.project_name}-celery-worker-sg" + } +}