Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,4 @@ HEALTHCHECK --interval=5s --timeout=10s --start-period=5s --retries=3 \
CMD curl --fail http://localhost:8000/api/v1/health || exit 1

ENTRYPOINT ["/app/entrypoint.sh"]
CMD ["uvicorn", "osa.application.api.rest.app:app", "--host", "0.0.0.0", "--port", "8000"]
CMD ["uvicorn", "--factory", "osa.application.api.rest.app:create_app", "--host", "0.0.0.0", "--port", "8000"]
4 changes: 2 additions & 2 deletions server/Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ default:

# Run development server with hot-reload
dev:
uv run uvicorn osa.application.api.rest.app:app --reload --host 0.0.0.0 --port 8000
uv run uvicorn --factory osa.application.api.rest.app:create_app --reload --host 0.0.0.0 --port 8000

# Run with debug logging
dev-debug:
LOG_LEVEL=DEBUG uv run uvicorn osa.application.api.rest.app:app --reload --host 0.0.0.0 --port 8000
LOG_LEVEL=DEBUG uv run uvicorn --factory osa.application.api.rest.app:create_app --reload --host 0.0.0.0 --port 8000

# === Testing ===

Expand Down
41 changes: 31 additions & 10 deletions server/osa/application/api/rest/app.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import logging
from contextlib import asynccontextmanager
from typing import Any

import logfire
from dishka import Provider as DishkaProvider
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from sqlalchemy.ext.asyncio import AsyncEngine
Expand All @@ -26,6 +28,7 @@
from osa.config import Config, configure_logging
from osa.domain.shared.authorization.startup import validate_all_handlers
from osa.domain.shared.error import OSAError
from osa.domain.shared.event import EventHandler
from osa.infrastructure.event.worker import WorkerPool
from osa.infrastructure.persistence.seed import ensure_system_user
from osa.util.di.fastapi import setup_dishka
Expand All @@ -50,8 +53,30 @@ async def lifespan(app: FastAPI):
await container.close()


def create_app() -> FastAPI:
"""Create FastAPI application."""
def create_app(
*,
providers: list[DishkaProvider] | None = None,
extra_handlers: list[type[EventHandler[Any]]] | None = None,
) -> FastAPI:
"""Create FastAPI application.

This is the main entry point for running OSA. External hosts (e.g.
Amacrin) use the keyword arguments to customise the runtime without
duplicating any app wiring::

app = create_app(
providers=[K8sProvider()],
extra_handlers=[MeterUsage, SendNotification],
)

Args:
providers: Extra Dishka providers that override the built-in
bindings. For example, pass a ``K8sProvider`` to replace the
default OCI container runner with a Kubernetes-based one.
extra_handlers: Extra event handler types registered alongside
the core handlers for subscription routing, WorkerPool
registration, and DI resolution.
"""
# Pydantic Settings populates from env vars at runtime
config = Config() # type: ignore[call-arg]

Expand All @@ -74,7 +99,10 @@ def create_app() -> FastAPI:
logfire.instrument_fastapi(app_instance)

# Setup dependency injection
container = create_container()
container = create_container(
*(providers or []),
extra_handlers=extra_handlers,
)
setup_dishka(container, app_instance)

# Register v1 routes with /api/v1 prefix
Expand Down Expand Up @@ -115,10 +143,3 @@ async def unhandled_exception_handler(request: Request, exc: Exception):
)

return app_instance


# Create app instance for uvicorn
# Note: Logfire must be configured before this module is imported
# In production: start_app.py handles this
# In tests: configure in conftest.py
app = create_app()
32 changes: 24 additions & 8 deletions server/osa/application/di.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,51 @@
from typing import Any

from dishka import AsyncContainer, make_async_container
from dishka import Provider as DishkaProvider

from osa.util.paths import OSAPaths
from osa.config import Config
from osa.domain.auth.util.di import AuthProvider
from osa.domain.discovery.util.di import DiscoveryProvider
from osa.domain.deposition.util.di import DepositionProvider
from osa.domain.discovery.util.di import DiscoveryProvider
from osa.domain.feature.util.di import FeatureProvider
from osa.domain.semantics.util.di.provider import SemanticsProvider
from osa.domain.shared.event import EventHandler
from osa.domain.validation.util.di import ValidationProvider
from osa.infrastructure.auth import AuthInfraProvider
from osa.infrastructure.event.di import EventProvider
from osa.infrastructure.http.di import HttpProvider
from osa.infrastructure.index.di import IndexProvider
from osa.infrastructure.source.di import SourceProvider
from osa.infrastructure.oci import OciProvider
from osa.infrastructure.persistence import PersistenceProvider
from osa.infrastructure.source.di import SourceProvider
from osa.util.di.scope import Scope
from osa.util.paths import OSAPaths


def create_container() -> AsyncContainer:
# Pydantic Settings populates from env vars at runtime
config = Config() # type: ignore[call-arg]
def create_container(
*extra_providers: DishkaProvider,
extra_handlers: list[type[EventHandler[Any]]] | None = None,
) -> AsyncContainer:
"""Create the DI container with all default providers.

# OSAPaths reads OSA_DATA_DIR from environment automatically
Args:
extra_providers: Additional Dishka providers appended after defaults.
Later providers override earlier ones for the same type, so these
can replace any built-in binding (e.g. swap OciProvider for a
Kubernetes runner).
extra_handlers: Additional event handler types to register alongside
the core handlers. They will be included in the subscription
registry, WorkerPool, and DI resolution automatically.
"""
config = Config() # type: ignore[call-arg]
paths = OSAPaths()

return make_async_container(
PersistenceProvider(),
OciProvider(),
IndexProvider(),
SourceProvider(),
EventProvider(),
EventProvider(extra_handlers=extra_handlers),
HttpProvider(),
DepositionProvider(),
FeatureProvider(),
Expand All @@ -39,6 +54,7 @@ def create_container() -> AsyncContainer:
AuthProvider(),
AuthInfraProvider(),
DiscoveryProvider(),
*extra_providers,
context={Config: config, OSAPaths: paths},
scopes=Scope, # type: ignore[arg-type] # Custom scope class
)
7 changes: 7 additions & 0 deletions server/osa/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,13 @@ def base_url(self) -> str:
scheme = "http" if self.domain == "localhost" else "https"
return f"{scheme}://{self.domain}"

@model_validator(mode="after")
def derive_frontend_url(self) -> Self:
"""Derive frontend URL from domain if still the default localhost value."""
if self.frontend.url == "http://localhost:3000":
self.frontend = Frontend(url=self.base_url)
return self
Comment thread
rorybyrne marked this conversation as resolved.

@model_validator(mode="after")
def derive_callback_url(self) -> Self:
"""Derive OAuth callback URL from domain if not explicitly set.
Expand Down
77 changes: 50 additions & 27 deletions server/osa/infrastructure/event/di.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,28 @@
# Type alias for handler list
HandlerTypes = NewType("HandlerTypes", list[type[EventHandler[Any]]])

# All event handlers for WorkerPool registration
HANDLERS: HandlerTypes = HandlerTypes(
[
# Feature handlers (must run before source triggers)
CreateFeatureTables,
InsertRecordFeatures,
# Source handlers
TriggerInitialSourceRun,
PullFromSource,
# Validation handlers
ValidateDeposition,
# Deposition handlers
CreateDepositionFromSource,
ReturnToDraft,
# Curation handlers
AutoApproveCuration,
# Record handlers
ConvertDepositionToRecord,
]
)
# Core event handlers shipped with OSA
_CORE_HANDLERS: list[type[EventHandler[Any]]] = [
# Feature handlers (must run before source triggers)
CreateFeatureTables,
InsertRecordFeatures,
# Source handlers
TriggerInitialSourceRun,
PullFromSource,
# Validation handlers
ValidateDeposition,
# Deposition handlers
CreateDepositionFromSource,
ReturnToDraft,
# Curation handlers
AutoApproveCuration,
# Record handlers
ConvertDepositionToRecord,
]


def build_subscription_registry(handlers: HandlerTypes) -> SubscriptionRegistry:
"""Build a SubscriptionRegistry from the HANDLERS list.
"""Build a SubscriptionRegistry from handler list.

Maps each handler's __event_type__.__name__ → handler.__name__.
"""
Expand All @@ -68,8 +66,37 @@ class EventProvider(Provider):

Handlers, Schedules, and Outbox are UOW-scoped (fresh per unit of work).
WorkerPool and SubscriptionRegistry are APP-scoped singletons.

To register additional event handlers (e.g. from an external package),
pass them to the constructor::

EventProvider(extra_handlers=[MeterUsage, SendNotification])

Extra handlers are merged with the core handlers for subscription
routing, WorkerPool registration, and DI resolution.
"""

def __init__(
self,
*,
extra_handlers: list[type[EventHandler[Any]]] | None = None,
) -> None:
super().__init__()
self._all_handlers = HandlerTypes([*_CORE_HANDLERS, *(extra_handlers or [])])

# Register DI bindings for every handler (core + extra).
# Each handler becomes a UOW-scoped dependency that Dishka can
# instantiate with its declared fields injected.
seen: set[type] = set()
for handler_type in self._all_handlers:
if handler_type in seen:
raise ValueError(
f"Duplicate event handler registration: {handler_type.__name__!r}. "
"Remove it from extra_handlers — it is already a core handler."
)
seen.add(handler_type)
self.provide(handler_type, scope=Scope.UOW)
Comment on lines +85 to +98

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 No guard against duplicate handler registration

_CORE_HANDLERS and extra_handlers are concatenated without deduplication. If a caller passes a handler that already exists in _CORE_HANDLERS, self.provide() will be called twice for the same concrete type. Depending on the Dishka version this will either silently ignore the second registration or raise a DuplicateFactoryError at container start-up.

Adding a deduplication step (or at least a clear error) keeps the failure surface small:

seen: set[type] = set()
for handler_type in self._all_handlers:
    if handler_type in seen:
        raise ValueError(
            f"Duplicate event handler registration: {handler_type.__name__!r}. "
            "Remove it from extra_handlers — it is already registered as a core handler."
        )
    seen.add(handler_type)
    self.provide(handler_type, scope=Scope.UOW)


# UOW-scoped Outbox (wraps EventRepository + SubscriptionRegistry)
@provide(scope=Scope.UOW)
def get_outbox(self, repo: EventRepository, registry: SubscriptionRegistry) -> Outbox:
Expand All @@ -80,17 +107,13 @@ def get_outbox(self, repo: EventRepository, registry: SubscriptionRegistry) -> O
def get_event_log(self, repo: EventRepository) -> EventLog:
return EventLog(repo)

# UOW-scoped providers for handlers
for _handler_type in HANDLERS:
locals()[_handler_type.__name__] = provide(_handler_type, scope=Scope.UOW)

# UOW-scoped provider for SourceSchedule
source_schedule = provide(SourceSchedule, scope=Scope.UOW)

@provide(scope=Scope.APP)
def get_handler_types(self) -> HandlerTypes:
"""Return the handler types for WorkerPool registration."""
return HANDLERS
"""Return all handler types (core + extra) for WorkerPool registration."""
return self._all_handlers

@provide(scope=Scope.APP)
def get_subscription_registry(self, handler_types: HandlerTypes) -> SubscriptionRegistry:
Expand Down
Loading
Loading