Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,6 @@ htmlcov/
.pytest_cache/
.mypy_cache/
.ruff_cache/

# DB
celery_results.db
17 changes: 17 additions & 0 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,23 @@ services:
timeout: 10s
retries: 3
start_period: 40s
command: uvicorn src.main:app --host 0.0.0.0 --port 8000 --reload

celery-worker:
build:
context: ..
dockerfile: docker/Dockerfile.dev
container_name: celery-worker
volumes:
- ..:/app
environment:
- PYTHONUNBUFFERED=1
- CELERY_BROKER_URL=amqp://guest:guest@rabbitmq:5672//
depends_on:
rabbitmq:
condition: service_healthy
restart: unless-stopped
command: celery -A src.celery_app worker --loglevel=info

volumes:
rabbitmq_data:
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ uvicorn[standard]==0.24.0
python-multipart==0.0.6
celery==5.3.4
kombu==5.3.4
pydantic-settings==2.1.0
pydantic-settings==2.1.0
sqlalchemy==2.0.23
1 change: 1 addition & 0 deletions src/celery_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@
task_track_started=True,
task_time_limit=30 * 60,
task_soft_time_limit=25 * 60,
result_backend=settings.CELERY_RESULT_BACKEND,
)
19 changes: 17 additions & 2 deletions src/config.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
from functools import lru_cache
from pathlib import Path
from typing import Literal

from pydantic import Field, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict

# Get the project root directory (parent of src/)
PROJECT_ROOT = Path(__file__).parent.parent
ENV_FILE = PROJECT_ROOT / ".env"


class Settings(BaseSettings):
"""Centralna konfiguracja aplikacji z walidacją."""

model_config = SettingsConfigDict(
env_file=".env",
env_file=str(ENV_FILE),
env_file_encoding="utf-8",
case_sensitive=False,
extra="ignore",
Expand All @@ -28,9 +33,19 @@ class Settings(BaseSettings):
# Celery
CELERY_BROKER_URL: str = Field(..., description="URL brokera Celery (RabbitMQ/Redis)")
CELERY_RESULT_BACKEND: str = Field(
default="", description="Backend dla wyników Celery (opcjonalne)"
default=f"db+sqlite:///{PROJECT_ROOT.resolve()}/celery_results.db",
description="Backend dla wyników Celery (domyślnie SQLite)",
)

@field_validator("CELERY_RESULT_BACKEND")
@classmethod
def validate_celery_backend(cls, v: str) -> str:
"""Zapewnia, że backend zawsze ma wartość (domyślnie SQLite)."""
if not v or v.strip() == "":
db_path = PROJECT_ROOT.resolve() / "celery_results.db"
return f"db+sqlite:///{db_path}"
return v

@field_validator("CELERY_BROKER_URL")
@classmethod
def validate_celery_broker(cls, v: str) -> str:
Expand Down
29 changes: 29 additions & 0 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from celery.result import AsyncResult # type: ignore[import-untyped]
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel

from src.celery_app import celery_app
from src.config import get_settings
from src.tasks import process_message

Expand Down Expand Up @@ -52,3 +54,30 @@ async def send_task(request: MessageRequest):
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to send task: {str(e)}") from e


@app.get("/task/{task_id}")
async def get_task_status(task_id: str):
"""
Endpoint do sprawdzania statusu i wyniku zadania Celery.
"""
try:
task_result = AsyncResult(task_id, app=celery_app)
response = {
"task_id": task_id,
"status": task_result.state,
}

if task_result.state == "PENDING":
response["message"] = "Task is waiting to be processed"
elif task_result.state == "PROGRESS":
response["current"] = task_result.info.get("current", 0)
response["total"] = task_result.info.get("total", 0)
elif task_result.state == "SUCCESS":
response["result"] = task_result.result
elif task_result.state == "FAILURE":
response["error"] = str(task_result.info)

return response
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to get task status: {str(e)}") from e
8 changes: 7 additions & 1 deletion src/tasks.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
import logging
from datetime import datetime

from src.celery_app import celery_app

logger = logging.getLogger(__name__)


@celery_app.task(name="tasks.process_message")
def process_message(message: str) -> dict:
"""
Przykładowe zadanie Celery, które przetwarza wiadomość.
W rzeczywistej aplikacji tutaj można wykonać ciężkie operacje.
"""
# Symulacja przetwarzania
logger.info(f"Rozpoczynam przetwarzanie wiadomości: {message}")

result = {
"status": "processed",
"message": message,
"processed_at": datetime.utcnow().isoformat() + "Z",
}

logger.info(f"Zakończono przetwarzanie wiadomości: {message}")
return result
10 changes: 10 additions & 0 deletions terraform/cloudwatch.tf
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,13 @@ resource "aws_cloudwatch_log_group" "rabbitmq" {
Name = "${var.project_name}-rabbitmq-logs"
}
}

# CloudWatch Log Group dla Celery Worker
resource "aws_cloudwatch_log_group" "celery_worker" {
name = "/ecs/${var.project_name}-celery-worker"
retention_in_days = 7

tags = {
Name = "${var.project_name}-celery-worker-logs"
}
}
10 changes: 0 additions & 10 deletions terraform/data.tf
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,3 @@ data "aws_caller_identity" "current" {}
data "aws_ecr_repository" "app" {
name = var.ecr_repository_name
}

# Data source do odczytu wszystkich parametrów z Parameter Store
data "aws_ssm_parameters_by_path" "app_secrets" {
path = "/${var.project_name}"
recursive = true

depends_on = [
aws_ssm_parameter.celery_broker_url
]
}
107 changes: 105 additions & 2 deletions terraform/ecs.tf
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,20 @@ resource "aws_ecs_task_definition" "rabbitmq" {
execution_role_arn = aws_iam_role.ecs_task_execution.arn
task_role_arn = aws_iam_role.ecs_task.arn

volume {
name = "rabbitmq-data"

efs_volume_configuration {
file_system_id = aws_efs_file_system.rabbitmq.id
root_directory = "/"
transit_encryption = "ENABLED"
authorization_config {
access_point_id = aws_efs_access_point.rabbitmq.id
iam = "ENABLED"
}
}
}

container_definitions = jsonencode([
{
name = "rabbitmq"
Expand Down Expand Up @@ -44,6 +58,14 @@ resource "aws_ecs_task_definition" "rabbitmq" {
}
]

mountPoints = [
{
sourceVolume = "rabbitmq-data"
containerPath = "/var/lib/rabbitmq"
readOnly = false
}
]

logConfiguration = {
logDriver = "awslogs"
options = {
Expand Down Expand Up @@ -88,7 +110,10 @@ resource "aws_ecs_service" "rabbitmq" {
}

depends_on = [
aws_iam_role_policy_attachment.ecs_task_execution
aws_iam_role_policy_attachment.ecs_task_execution,
aws_efs_mount_target.rabbitmq,
aws_efs_access_point.rabbitmq,
aws_iam_role_policy.ecs_task_execution_efs
]

tags = {
Expand Down Expand Up @@ -119,7 +144,12 @@ resource "aws_ecs_task_definition" "app" {
]

environment = local.environment_vars
secrets = local.ssm_secrets
secrets = [
{
name = "CELERY_BROKER_URL"
valueFrom = "arn:aws:ssm:${var.aws_region}:${data.aws_caller_identity.current.account_id}:parameter/${var.project_name}/celery_broker_url"
}
]

logConfiguration = {
logDriver = "awslogs"
Expand Down Expand Up @@ -170,3 +200,76 @@ resource "aws_ecs_service" "app" {
Name = "${var.project_name}-service"
}
}

# ECS Task Definition dla Celery Worker
resource "aws_ecs_task_definition" "celery_worker" {
family = "${var.project_name}-celery-worker"
network_mode = "awsvpc"
requires_compatibilities = ["FARGATE"]
cpu = 256
memory = 512
execution_role_arn = aws_iam_role.ecs_task_execution.arn
task_role_arn = aws_iam_role.ecs_task.arn

container_definitions = jsonencode([
{
name = "${var.project_name}-celery-worker"
image = "${data.aws_ecr_repository.app.repository_url}:latest"

environment = local.environment_vars
secrets = [
{
name = "CELERY_BROKER_URL"
valueFrom = "arn:aws:ssm:${var.aws_region}:${data.aws_caller_identity.current.account_id}:parameter/${var.project_name}/celery_broker_url"
}
]

command = [
"celery",
"-A",
"src.celery_app",
"worker",
"--loglevel=info"
]

logConfiguration = {
logDriver = "awslogs"
options = {
"awslogs-group" = aws_cloudwatch_log_group.celery_worker.name
"awslogs-region" = var.aws_region
"awslogs-stream-prefix" = "ecs"
}
}
}
])

tags = {
Name = "${var.project_name}-celery-worker-task"
}
}

# ECS Service dla Celery Worker
resource "aws_ecs_service" "celery_worker" {
name = "${var.project_name}-celery-worker-service"
cluster = aws_ecs_cluster.main.id
task_definition = aws_ecs_task_definition.celery_worker.arn
desired_count = 1
launch_type = "FARGATE"

network_configuration {
subnets = [aws_subnet.public.id]
security_groups = [aws_security_group.celery_worker.id]
assign_public_ip = true
}

depends_on = [
aws_iam_role_policy_attachment.ecs_task_execution,
aws_ecs_service.rabbitmq,
aws_ssm_parameter.celery_broker_url,
aws_iam_role_policy.ecs_task_execution_ssm
]

tags = {
Name = "${var.project_name}-celery-worker-service"
}
}
68 changes: 68 additions & 0 deletions terraform/efs.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Security Group dla EFS
resource "aws_security_group" "efs" {
name = "${var.project_name}-efs-sg"
description = "Security group for EFS - allows NFS access from RabbitMQ ECS tasks"
vpc_id = aws_vpc.main.id

ingress {
description = "NFS from RabbitMQ ECS tasks"
from_port = 2049
to_port = 2049
protocol = "tcp"
security_groups = [aws_security_group.rabbitmq.id]
}

egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}

tags = {
Name = "${var.project_name}-efs-sg"
}
}

# EFS File System - zoptymalizowany pod kątem kosztów
resource "aws_efs_file_system" "rabbitmq" {
creation_token = "${var.project_name}-rabbitmq-efs"
encrypted = true

performance_mode = "generalPurpose"
throughput_mode = "bursting"

tags = {
Name = "${var.project_name}-rabbitmq-efs"
}
}

# EFS Mount Target
resource "aws_efs_mount_target" "rabbitmq" {
file_system_id = aws_efs_file_system.rabbitmq.id
subnet_id = aws_subnet.public.id
security_groups = [aws_security_group.efs.id]
}

# EFS Access Point dla RabbitMQ - root permissions aby móc ustawić uprawnienia
resource "aws_efs_access_point" "rabbitmq" {
file_system_id = aws_efs_file_system.rabbitmq.id

posix_user {
gid = 0 # root GID - pozwala root wykonywać operacje
uid = 0 # root UID - pozwala root wykonywać operacje
}

root_directory {
path = "/"
creation_info {
owner_gid = 999
owner_uid = 999
permissions = "777" # Pozwól na zapis dla wszystkich - EFS access point zapewnia izolację
}
}

tags = {
Name = "${var.project_name}-rabbitmq-access-point"
}
}
Loading