Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
push:
branches: [main]
pull_request:
branches: [main]
# Run on PRs to any branch (not just main)

env:
PYTHON_VERSION: "3.13"
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,4 @@ web/.turbo/
# Environment
.env
!.env.example
server/osa.yaml
4 changes: 4 additions & 0 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ dev-detached:
dev-down:
docker compose -f deploy/docker-compose.yml -f deploy/docker-compose.dev.yml down

# Open the web UI in browser
open-ui:
open http://localhost:8080

# === Individual Service Development ===

# Run server independently (requires database)
Expand Down
8 changes: 7 additions & 1 deletion server/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ COPY osa.yaml ./config.yaml
RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --frozen --no-dev

# Pre-download embedding model to bake into image (avoids runtime download)
RUN /app/.venv/bin/python -c "from sentence_transformers import SentenceTransformer; SentenceTransformer('all-MiniLM-L6-v2')"

# Stage 2: Runtime
FROM python:3.13-slim-bookworm AS runtime

Expand All @@ -43,6 +46,9 @@ WORKDIR /app

# Copy the virtual environment from builder
COPY --from=builder --chown=appuser:appuser /app/.venv /app/.venv

# Copy pre-downloaded embedding model cache (avoids runtime download)
COPY --from=builder --chown=appuser:appuser /root/.cache/huggingface /home/appuser/.cache/huggingface
COPY --from=builder --chown=appuser:appuser /app/osa /app/osa
COPY --from=builder --chown=appuser:appuser /app/sources /app/sources
COPY --from=builder --chown=appuser:appuser /app/migrations /app/migrations
Expand All @@ -65,7 +71,7 @@ USER appuser
EXPOSE 8000

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
HEALTHCHECK --interval=5s --timeout=10s --start-period=5s --retries=3 \
CMD curl --fail http://localhost:8000/api/v1/health || exit 1

ENTRYPOINT ["/app/entrypoint.sh"]
Expand Down
72 changes: 72 additions & 0 deletions server/migrations/versions/add_worker_columns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""add_worker_columns

Add columns and indexes to events table for pull-based worker architecture.

Revision ID: add_worker_columns
Revises: 0d9fbacf8e58
Create Date: 2026-02-02

"""

from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision: str = "add_worker_columns"
down_revision: Union[str, Sequence[str], None] = "0d9fbacf8e58"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Add worker columns to events table."""
# Add new columns for pull-based claiming
op.add_column("events", sa.Column("routing_key", sa.String(255), nullable=True))
op.add_column(
"events", sa.Column("retry_count", sa.Integer(), nullable=False, server_default="0")
)
op.add_column("events", sa.Column("claimed_at", sa.DateTime(timezone=True), nullable=True))
op.add_column(
"events",
sa.Column(
"updated_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()
),
)

# Create partial index for efficient claiming query
# Covers: status=pending/claimed, event_type, routing_key, created_at
op.create_index(
"idx_events_claim",
"events",
["delivery_status", "event_type", "routing_key", "created_at"],
postgresql_where=sa.text("delivery_status IN ('pending', 'claimed')"),
)

# Create partial index for stale claim detection
op.create_index(
"idx_events_stale_claims",
"events",
["claimed_at"],
postgresql_where=sa.text("delivery_status = 'claimed'"),
)

# Create partial index for failed event queries
op.create_index(
"idx_events_failed",
"events",
["event_type", "created_at"],
postgresql_where=sa.text("delivery_status = 'failed'"),
)


def downgrade() -> None:
"""Remove worker columns from events table."""
op.drop_index("idx_events_failed", table_name="events")
op.drop_index("idx_events_stale_claims", table_name="events")
op.drop_index("idx_events_claim", table_name="events")
op.drop_column("events", "updated_at")
op.drop_column("events", "claimed_at")
op.drop_column("events", "retry_count")
op.drop_column("events", "routing_key")
17 changes: 5 additions & 12 deletions server/osa/application/api/rest/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,10 @@
from osa.application.api.v1.routes import events, health, records, search, stats, validation
from osa.application.di import create_container
from osa.config import Config, configure_logging
from osa.domain.index.service import IndexService
from osa.domain.shared.error import OSAError
from osa.infrastructure.event.worker import BackgroundWorker
from osa.infrastructure.event.worker import WorkerPool
from osa.infrastructure.source.discovery import validate_sources_at_startup
from osa.util.di.fastapi import setup_dishka
from osa.util.di.scope import Scope

logger = logging.getLogger(__name__)

Expand All @@ -23,16 +21,11 @@
async def lifespan(app: FastAPI):
container = app.state.dishka_container

# Run background worker (emits ServerStarted internally)
worker = await container.get(BackgroundWorker)
async with worker:
yield
# Run unified worker pool (pull-based event handlers + scheduled tasks)
worker_pool = await container.get(WorkerPool)

# Flush all index backends on shutdown to ensure buffered records are persisted
logger.info("Flushing index backends on shutdown...")
async with container(scope=Scope.UOW) as scope:
index_service = await scope.get(IndexService)
await index_service.flush_all()
async with worker_pool:
yield

await container.close()

Expand Down
5 changes: 5 additions & 0 deletions server/osa/domain/curation/handler/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Curation domain event handlers."""

from osa.domain.curation.handler.auto_approve_curation import AutoApproveCuration

__all__ = ["AutoApproveCuration"]
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
"""AutoApproveCurationTool - auto-approves depositions on validation completion."""
"""AutoApproveCuration - auto-approves depositions on validation completion."""

import logging
from uuid import uuid4

from osa.domain.curation.event.deposition_approved import DepositionApproved
from osa.domain.shared.event import EventId, EventListener
from osa.domain.shared.event import EventHandler, EventId
from osa.domain.shared.outbox import Outbox
from osa.domain.validation.event.validation_completed import ValidationCompleted
from osa.domain.validation.model import RunStatus

logger = logging.getLogger(__name__)


class AutoApproveCurationTool(EventListener[ValidationCompleted]):
class AutoApproveCuration(EventHandler[ValidationCompleted]):
"""Auto-approves validation and emits DepositionApproved. 0 curation = instant approve."""

outbox: Outbox
Expand Down Expand Up @@ -40,6 +40,5 @@ async def handle(self, event: ValidationCompleted) -> None:
)

await self.outbox.append(approved)
# Session commit handled by BackgroundWorker

logger.debug(f"Deposition approved: {event.deposition_srn}")
7 changes: 0 additions & 7 deletions server/osa/domain/curation/listener/__init__.py

This file was deleted.

7 changes: 7 additions & 0 deletions server/osa/domain/index/handler/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""Index domain event handlers."""

from osa.domain.index.handler.fanout_to_index_backends import FanOutToIndexBackends
from osa.domain.index.handler.keyword_index_handler import KeywordIndexHandler
from osa.domain.index.handler.vector_index_handler import VectorIndexHandler

__all__ = ["FanOutToIndexBackends", "KeywordIndexHandler", "VectorIndexHandler"]
Original file line number Diff line number Diff line change
@@ -1,28 +1,34 @@
"""FanOutToIndexBackends - creates per-backend IndexRecord events from RecordPublished."""

import logging
from typing import ClassVar
from uuid import uuid4

from osa.domain.index.event.index_record import IndexRecord
from osa.domain.index.model.registry import IndexRegistry
from osa.domain.record.event.record_published import RecordPublished
from osa.domain.shared.event import EventId, EventListener
from osa.domain.shared.event import EventHandler, EventId
from osa.domain.shared.outbox import Outbox

logger = logging.getLogger(__name__)


class FanOutToIndexBackends(EventListener[RecordPublished]):
class FanOutToIndexBackends(EventHandler[RecordPublished]):
"""Creates per-backend IndexRecord events from RecordPublished.

When a record is published, this listener creates one IndexRecord event
When a record is published, this handler creates one IndexRecord event
per registered backend. Each IndexRecord is stored in the outbox,
enabling independent retry and failure isolation per backend.

This replaces the previous pattern where a single RecordPublished event
triggered immediate indexing to all backends in a single transaction.

Batch processing is used for efficiency when multiple records are
published in quick succession.
"""

__batch_size__: ClassVar[int] = 10

indexes: IndexRegistry
outbox: Outbox

Expand All @@ -38,4 +44,4 @@ async def handle(self, event: RecordPublished) -> None:
record_srn=event.record_srn,
metadata=event.metadata,
)
await self.outbox.append(index_event)
await self.outbox.append(index_event, routing_key=backend_name)
39 changes: 39 additions & 0 deletions server/osa/domain/index/handler/keyword_index_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""KeywordIndexHandler - processes IndexRecord events for keyword backends."""

import logging
from typing import ClassVar

from osa.domain.index.event.index_record import IndexRecord
from osa.domain.index.model.registry import IndexRegistry
from osa.domain.shared.error import SkippedEvents
from osa.domain.shared.event import EventHandler

logger = logging.getLogger(__name__)


class KeywordIndexHandler(EventHandler[IndexRecord]):
"""Processes IndexRecord events for the keyword backend.

Claims events with routing_key="keyword" and processes them immediately
(batch_size=1) since keyword indexing doesn't benefit from batching.
"""

__routing_key__: ClassVar[str | None] = "keyword"
__batch_size__: ClassVar[int] = 1

indexes: IndexRegistry

async def handle(self, event: IndexRecord) -> None:
"""Process a single IndexRecord event."""
backend = self.indexes.get("keyword")
if backend is None:
raise SkippedEvents(
event_ids=[event.id],
reason="Keyword backend not available",
)

record = (str(event.record_srn), event.metadata)

await backend.ingest_batch([record])

logger.debug(f"KeywordIndexHandler: indexed event {event.id}")
49 changes: 49 additions & 0 deletions server/osa/domain/index/handler/vector_index_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""VectorIndexHandler - processes IndexRecord events for vector backends."""

import logging
from typing import ClassVar

from osa.domain.index.event.index_record import IndexRecord
from osa.domain.index.model.registry import IndexRegistry
from osa.domain.shared.error import SkippedEvents
from osa.domain.shared.event import EventHandler

logger = logging.getLogger(__name__)


class VectorIndexHandler(EventHandler[IndexRecord]):
"""Processes IndexRecord events for the vector backend.

Claims events with routing_key="vector" and processes them in batches
for efficient embedding generation.
"""

__routing_key__: ClassVar[str | None] = "vector"
__batch_size__: ClassVar[int] = 100
__batch_timeout__: ClassVar[float] = 5.0

indexes: IndexRegistry

async def handle_batch(self, events: list[IndexRecord]) -> None:
"""Process a batch of IndexRecord events.

Converts events to records and calls ingest_batch on the backend.
"""
if not events:
return

backend = self.indexes.get("vector")
if backend is None:
raise SkippedEvents(
event_ids=[e.id for e in events],
reason="Vector backend not available",
)

# Prepare records for batch ingestion
records = [(str(e.record_srn), e.metadata) for e in events]

logger.debug(f"VectorIndexHandler: ingesting {len(records)} records to backend")

await backend.ingest_batch(records)

logger.debug(f"VectorIndexHandler: ingested {len(events)} records")
9 changes: 0 additions & 9 deletions server/osa/domain/index/listener/__init__.py

This file was deleted.

Loading
Loading