Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:
run: uv python install ${{ env.PYTHON_VERSION }}

- name: Install dependencies
run: uv sync --frozen
run: uv sync --frozen --extra k8s

- name: Check formatting
run: uv run ruff format --check .
Expand Down Expand Up @@ -79,7 +79,7 @@ jobs:
run: uv python install ${{ env.PYTHON_VERSION }}

- name: Install dependencies
run: uv sync --frozen
run: uv sync --frozen --extra k8s

- name: Run type checker
run: uv run ty check osa
Expand Down Expand Up @@ -107,7 +107,7 @@ jobs:
run: uv python install ${{ env.PYTHON_VERSION }}

- name: Install dependencies
run: uv sync --frozen
run: uv sync --frozen --extra k8s

- name: Run unit tests with coverage
run: uv run pytest tests/unit -v --tb=short --cov=osa --cov-report=xml --cov-report=term-missing
Expand Down Expand Up @@ -148,7 +148,7 @@ jobs:
run: uv python install ${{ env.PYTHON_VERSION }}

- name: Install dependencies
run: uv sync --frozen
run: uv sync --frozen --extra k8s

- name: Run contract tests
run: uv run pytest tests/contract -v --tb=short
Expand Down Expand Up @@ -187,7 +187,7 @@ jobs:
run: uv python install ${{ env.PYTHON_VERSION }}

- name: Install dependencies
run: uv sync --frozen
run: uv sync --frozen --extra k8s

- name: Run migrations
run: uv run alembic upgrade head
Expand Down
4 changes: 2 additions & 2 deletions server/osa/application/di.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from osa.infrastructure.event.di import EventProvider
from osa.infrastructure.http.di import HttpProvider
from osa.infrastructure.index.di import IndexProvider
from osa.infrastructure.oci import OciProvider
from osa.infrastructure.k8s.di import RunnerProvider
from osa.infrastructure.persistence import PersistenceProvider
from osa.infrastructure.source.di import SourceProvider
from osa.util.di.scope import Scope
Expand All @@ -42,7 +42,7 @@ def create_container(

return make_async_container(
PersistenceProvider(),
OciProvider(),
RunnerProvider(),
IndexProvider(),
SourceProvider(),
EventProvider(extra_handlers=extra_handlers),
Expand Down
31 changes: 30 additions & 1 deletion server/osa/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import re
import sys
from pathlib import Path
from typing import Any
from typing import Any, Literal

import yaml
from pydantic import BaseModel, field_validator, model_validator
Expand Down Expand Up @@ -83,6 +83,34 @@ class WorkerConfig(BaseModel):
batch_size: int = 100 # Maximum events to fetch per poll cycle


class K8sConfig(BaseModel):
"""Kubernetes-specific runner settings, required when runner.backend == "k8s"."""

namespace: str = "osa"
service_account: str | None = None
data_pvc_name: str = ""
data_mount_path: str = "/data"
image_pull_secrets: list[str] = []
job_ttl_seconds: int = 300


class RunnerConfig(BaseModel):
"""Runner backend selection and Kubernetes configuration."""

backend: Literal["oci", "k8s"] = "oci"
k8s: K8sConfig = K8sConfig()
Comment on lines +86 to +101

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Missing cross-field validation: data_pvc_name required when backend == "k8s"

K8sConfig.data_pvc_name defaults to "". If an operator enables the K8s backend but forgets to set this field, the PVC claim name embedded in every Job spec will be an empty string. The error will only surface at startup via the health-check call to read_namespaced_persistent_volume_claim("", namespace), which returns a cryptic K8s 404. A @model_validator on RunnerConfig would surface the problem at config-parse time with a clear message:

class RunnerConfig(BaseModel):
    backend: Literal["oci", "k8s"] = "oci"
    k8s: K8sConfig = K8sConfig()

    @model_validator(mode="after")
    def validate_k8s_required_fields(self) -> "RunnerConfig":
        if self.backend == "k8s" and not self.k8s.data_pvc_name:
            raise ValueError(
                "runner.k8s.data_pvc_name is required when runner.backend == 'k8s'. "
                "Set OSA_RUNNER__K8S__DATA_PVC_NAME."
            )
        return self


@model_validator(mode="after")
def validate_k8s_required_fields(self) -> Self:
"""Validate that required K8s fields are set when backend is 'k8s'."""
if self.backend == "k8s" and not self.k8s.data_pvc_name:
raise ValueError(
"runner.k8s.data_pvc_name is required when runner.backend == 'k8s'. "
"Set OSA_RUNNER__K8S__DATA_PVC_NAME."
)
return self


# =============================================================================
# Authentication Configuration
# =============================================================================
Expand Down Expand Up @@ -193,6 +221,7 @@ class Config(BaseSettings):
logging: LoggingConfig = LoggingConfig()
worker: WorkerConfig = WorkerConfig() # Background worker settings
auth: AuthConfig # Required - set via OSA_AUTH__JWT__SECRET env var
runner: RunnerConfig = RunnerConfig()
host_data_dir: str | None = None # Host path for OSA_DATA_DIR (sibling container mounts)

model_config = {
Expand Down
1 change: 0 additions & 1 deletion server/osa/domain/shared/model/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ class SourceDefinition(ValueObject):

image: str
digest: str
runner: str = "oci"
config: dict[str, Any] | None = None
limits: SourceLimits = Field(default_factory=SourceLimits)
schedule: SourceScheduleConfig | None = None
Expand Down
2 changes: 2 additions & 0 deletions server/osa/domain/source/port/source_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
from typing import Any, Protocol

from osa.domain.shared.model.source import SourceDefinition
from osa.domain.shared.model.srn import ConventionSRN


@dataclass(frozen=True)
class SourceInputs:
"""Inputs for a source container run."""

convention_srn: ConventionSRN
config: dict[str, Any] | None = None
since: datetime | None = None
limit: int | None = None
Expand Down
1 change: 1 addition & 0 deletions server/osa/domain/source/service/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ async def run_source(

# Build inputs
inputs = SourceInputs(
convention_srn=convention_srn,
config=source.config,
since=since,
limit=limit,
Expand Down
2 changes: 2 additions & 0 deletions server/osa/domain/validation/port/hook_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Protocol, runtime_checkable

from osa.domain.shared.model.hook import HookDefinition
from osa.domain.shared.model.srn import DepositionSRN
from osa.domain.shared.port import Port
from osa.domain.validation.model.hook_result import HookResult

Expand All @@ -15,6 +16,7 @@ class HookInputs:
"""Inputs to pass to a hook container."""

record_json: dict
deposition_srn: DepositionSRN
files_dir: Path | None = None
config: dict | None = None

Expand Down
1 change: 1 addition & 0 deletions server/osa/domain/validation/service/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ async def validate_deposition(
record_json = {"srn": str(deposition_srn), "metadata": metadata}
inputs = HookInputs(
record_json=record_json,
deposition_srn=deposition_srn,
files_dir=Path(files_dir) if files_dir else None,
)

Expand Down
5 changes: 4 additions & 1 deletion server/osa/infrastructure/auth/role_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

from uuid import UUID

from sqlalchemy import delete, insert, select
from sqlalchemy import CursorResult, delete, insert, select
from sqlalchemy.ext.asyncio import AsyncSession

from osa.domain.auth.model.role import Role
from osa.domain.shared.error import InfrastructureError
from osa.domain.auth.model.role_assignment import RoleAssignment, RoleAssignmentId
from osa.domain.auth.model.value import UserId
from osa.domain.auth.port.role_repository import RoleAssignmentRepository
Expand Down Expand Up @@ -61,6 +62,8 @@ async def delete(self, user_id: UserId, role: Role) -> bool:
)
result = await self.session.execute(stmt)
await self.session.flush()
if not isinstance(result, CursorResult):
raise InfrastructureError(f"Expected CursorResult, got {type(result).__name__}")
return result.rowcount > 0

async def get(self, user_id: UserId, role: Role) -> RoleAssignment | None:
Expand Down
6 changes: 6 additions & 0 deletions server/osa/infrastructure/k8s/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""Kubernetes runner infrastructure.

kubernetes-asyncio is an optional dependency. Modules that require it
(di.py, runner.py, source_runner.py, health.py) perform lazy imports
and raise ConfigurationError if the package is not installed.
"""
132 changes: 132 additions & 0 deletions server/osa/infrastructure/k8s/di.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
"""Dishka DI provider for runner infrastructure (OCI or Kubernetes).

Uses Dishka's conditional activation (Marker + when=) to register only
the factories needed for the configured backend. When backend is "oci",
only Docker-related factories activate. When "k8s", only K8s factories
activate. No None placeholders, no unused dependencies resolved.
"""

import logging
from typing import AsyncIterable

import aiodocker
from dishka import Marker, activate, provide

from osa.config import Config
from osa.domain.source.port.source_runner import SourceRunner
from osa.domain.validation.port.hook_runner import HookRunner
from osa.infrastructure.oci.runner import OciHookRunner
from osa.infrastructure.oci.source_runner import OciSourceRunner
from osa.util.di.base import Provider
from osa.util.di.scope import Scope

try:
from kubernetes_asyncio.client import ApiClient
except ImportError:
ApiClient = object # type: ignore[misc,assignment]

logger = logging.getLogger(__name__)

K8S = Marker("k8s")


class RunnerProvider(Provider):
"""Config-driven runner provider.

Uses Dishka conditional activation: factories decorated with
``when=K8S`` only activate when the activator returns True
(i.e. ``config.runner.backend == "k8s"``). Undecorated factories
serve as the default OCI path.
"""

@activate(K8S)
def is_k8s(self, config: Config) -> bool:
return config.runner.backend == "k8s"

# ------------------------------------------------------------------
# OCI backend (default — no when= condition)
# ------------------------------------------------------------------

@provide(scope=Scope.APP)
async def get_docker(self, config: Config) -> AsyncIterable[aiodocker.Docker]:
docker = aiodocker.Docker()
yield docker
await docker.close()

@provide(scope=Scope.UOW)
def get_hook_runner_oci(
self,
docker: aiodocker.Docker,
config: Config,
) -> HookRunner:
return OciHookRunner(docker=docker, host_data_dir=config.host_data_dir)

@provide(scope=Scope.UOW)
def get_source_runner_oci(
self,
docker: aiodocker.Docker,
config: Config,
) -> SourceRunner:
return OciSourceRunner(docker=docker, host_data_dir=config.host_data_dir)

# ------------------------------------------------------------------
# K8s backend (activated when config.runner.backend == "k8s")
# ------------------------------------------------------------------

@provide(when=K8S, scope=Scope.APP)
async def get_k8s_api_client(self, config: Config) -> AsyncIterable[ApiClient]:
from osa.domain.shared.error import ConfigurationError

try:
import kubernetes_asyncio # noqa: F401
except ImportError:
raise ConfigurationError(
"kubernetes-asyncio is required for K8s runner. Install with: pip install osa[k8s]"
)

from kubernetes_asyncio import client as k8s_client
from kubernetes_asyncio import config as k8s_config

try:
k8s_config.load_incluster_config()
except k8s_config.ConfigException:
await k8s_config.load_kube_config()

api_client = k8s_client.ApiClient()

# Startup health check
from osa.infrastructure.k8s.health import check_k8s_health

k8s_cfg = config.runner.k8s
batch_api = k8s_client.BatchV1Api(api_client)
core_api = k8s_client.CoreV1Api(api_client)
await check_k8s_health(
batch_api,
core_api,
namespace=k8s_cfg.namespace,
pvc_name=k8s_cfg.data_pvc_name,
)

logger.info("K8s API client initialized (namespace=%s)", k8s_cfg.namespace)
yield api_client
await api_client.close()

@provide(when=K8S, scope=Scope.UOW)
def get_hook_runner_k8s(
self,
k8s_api_client: ApiClient,
config: Config,
) -> HookRunner:
from osa.infrastructure.k8s.runner import K8sHookRunner

return K8sHookRunner(api_client=k8s_api_client, config=config.runner.k8s)

@provide(when=K8S, scope=Scope.UOW)
def get_source_runner_k8s(
self,
k8s_api_client: ApiClient,
config: Config,
) -> SourceRunner:
from osa.infrastructure.k8s.source_runner import K8sSourceRunner

return K8sSourceRunner(api_client=k8s_api_client, config=config.runner.k8s)
29 changes: 29 additions & 0 deletions server/osa/infrastructure/k8s/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""K8s API error classification.

Maps kubernetes-asyncio ApiException status codes to OSA error types.
"""

from osa.domain.shared.error import ConfigurationError, InfrastructureError, OSAError


def classify_api_error(exc: Exception) -> OSAError:
"""Classify a K8s API error by HTTP status code.

- 403 → ConfigurationError (RBAC misconfiguration, not retried)
- 404 → ConfigurationError (namespace/resource missing, not retried)
- 500, 503 → InfrastructureError (transient, retried by outbox)
- Other → InfrastructureError
"""
status = getattr(exc, "status", 0)
reason = getattr(exc, "reason", str(exc))

if status == 403:
return ConfigurationError(
f"K8s RBAC permission denied: {reason}. "
"Check ServiceAccount permissions for the OSA namespace."
)
if status == 404:
return ConfigurationError(
f"K8s resource not found: {reason}. Check that the namespace and resources exist."
)
return InfrastructureError(f"K8s API error ({status}): {reason}")
Loading
Loading