Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
push:
branches: [main]
pull_request:
branches: [main]
# Run on PRs to any branch (not just main)

env:
PYTHON_VERSION: "3.13"
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,4 @@ web/.turbo/
# Environment
.env
!.env.example
server/osa.yaml
2 changes: 2 additions & 0 deletions Justfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# OSA Monorepo Justfile
# Production deployment and development orchestration commands

mod server

default:
@just --list

Expand Down
3 changes: 2 additions & 1 deletion server/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ COPY migrations/ ./migrations/
COPY alembic.ini ./
COPY entrypoint.sh ./
COPY README.md ./
COPY config.demo.yaml ./config.yaml
COPY osa.yaml ./config.yaml

# Install the project
RUN --mount=type=cache,target=/root/.cache/uv \
Expand Down Expand Up @@ -55,6 +55,7 @@ ENV PATH="/app/.venv/bin:$PATH"
# Create data directory
RUN mkdir -p /data && chown appuser:appuser /data
ENV OSA_DATA_DIR=/data
ENV OSA_CONFIG_FILE=/app/config.yaml

COPY --from=builder --chown=appuser:appuser /app/entrypoint.sh /app/entrypoint.sh
RUN chmod +x /app/entrypoint.sh
Expand Down
72 changes: 72 additions & 0 deletions server/migrations/versions/add_worker_columns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""add_worker_columns

Add columns and indexes to events table for pull-based worker architecture.

Revision ID: add_worker_columns
Revises: 0d9fbacf8e58
Create Date: 2026-02-02

"""

from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision: str = "add_worker_columns"
down_revision: Union[str, Sequence[str], None] = "0d9fbacf8e58"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Add worker columns to events table."""
# Add new columns for pull-based claiming
op.add_column("events", sa.Column("routing_key", sa.String(255), nullable=True))
op.add_column(
"events", sa.Column("retry_count", sa.Integer(), nullable=False, server_default="0")
)
op.add_column("events", sa.Column("claimed_at", sa.DateTime(timezone=True), nullable=True))
op.add_column(
"events",
sa.Column(
"updated_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()
),
)

# Create partial index for efficient claiming query
# Covers: status=pending/claimed, event_type, routing_key, created_at
op.create_index(
"idx_events_claim",
"events",
["delivery_status", "event_type", "routing_key", "created_at"],
postgresql_where=sa.text("delivery_status IN ('pending', 'claimed')"),
)

# Create partial index for stale claim detection
op.create_index(
"idx_events_stale_claims",
"events",
["claimed_at"],
postgresql_where=sa.text("delivery_status = 'claimed'"),
)

# Create partial index for failed event queries
op.create_index(
"idx_events_failed",
"events",
["event_type", "created_at"],
postgresql_where=sa.text("delivery_status = 'failed'"),
)


def downgrade() -> None:
"""Remove worker columns from events table."""
op.drop_index("idx_events_failed", table_name="events")
op.drop_index("idx_events_stale_claims", table_name="events")
op.drop_index("idx_events_claim", table_name="events")
op.drop_column("events", "updated_at")
op.drop_column("events", "claimed_at")
op.drop_column("events", "retry_count")
op.drop_column("events", "routing_key")
9 changes: 5 additions & 4 deletions server/osa/application/api/rest/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from osa.config import Config, configure_logging
from osa.domain.index.service import IndexService
from osa.domain.shared.error import OSAError
from osa.infrastructure.event.worker import BackgroundWorker
from osa.infrastructure.event.worker import WorkerPool
from osa.infrastructure.source.discovery import validate_sources_at_startup
from osa.util.di.fastapi import setup_dishka
from osa.util.di.scope import Scope
Expand All @@ -23,9 +23,10 @@
async def lifespan(app: FastAPI):
container = app.state.dishka_container

# Run background worker (emits ServerStarted internally)
worker = await container.get(BackgroundWorker)
async with worker:
# Run unified worker pool (pull-based event handlers + scheduled tasks)
worker_pool = await container.get(WorkerPool)

async with worker_pool:
yield

# Flush all index backends on shutdown to ensure buffered records are persisted
Expand Down
11 changes: 11 additions & 0 deletions server/osa/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,23 @@ def file(self) -> str | None:
return os.environ.get("OSA_LOG_FILE")


class WorkerConfig(BaseModel):
"""Background worker configuration (nested in Config, uses env_nested_delimiter).

Controls the behavior of the outbox polling worker that processes events.
"""

poll_interval: float = 0.5 # Seconds between outbox polls
batch_size: int = 100 # Maximum events to fetch per poll cycle


class Config(BaseSettings):
# These are BaseModel, so env_nested_delimiter handles their env vars
server: Server = Server()
frontend: Frontend = Frontend()
database: DatabaseConfig = DatabaseConfig()
logging: LoggingConfig = LoggingConfig()
worker: WorkerConfig = WorkerConfig() # Background worker settings
indexes: list[IndexConfig] = [] # list of index configs
sources: list[SourceConfig] = [] # list of source configs

Expand Down
5 changes: 5 additions & 0 deletions server/osa/domain/curation/handler/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Curation domain event handlers."""

from osa.domain.curation.handler.auto_approve_curation import AutoApproveCuration

__all__ = ["AutoApproveCuration"]
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
"""AutoApproveCurationTool - auto-approves depositions on validation completion."""
"""AutoApproveCuration - auto-approves depositions on validation completion."""

import logging
from uuid import uuid4

from osa.domain.curation.event.deposition_approved import DepositionApproved
from osa.domain.shared.event import EventId, EventListener
from osa.domain.shared.event import EventHandler, EventId
from osa.domain.shared.outbox import Outbox
from osa.domain.validation.event.validation_completed import ValidationCompleted
from osa.domain.validation.model import RunStatus

logger = logging.getLogger(__name__)


class AutoApproveCurationTool(EventListener[ValidationCompleted]):
class AutoApproveCuration(EventHandler[ValidationCompleted]):
"""Auto-approves validation and emits DepositionApproved. 0 curation = instant approve."""

outbox: Outbox
Expand Down Expand Up @@ -40,6 +40,5 @@ async def handle(self, event: ValidationCompleted) -> None:
)

await self.outbox.append(approved)
# Session commit handled by BackgroundWorker

logger.debug(f"Deposition approved: {event.deposition_srn}")
7 changes: 0 additions & 7 deletions server/osa/domain/curation/listener/__init__.py

This file was deleted.

5 changes: 5 additions & 0 deletions server/osa/domain/index/event/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Index domain events."""

from osa.domain.index.event.index_record import IndexRecord

__all__ = ["IndexRecord"]
28 changes: 28 additions & 0 deletions server/osa/domain/index/event/index_record.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""IndexRecord event - per-backend indexing request for a single record."""

from typing import Any

from osa.domain.shared.event import Event, EventId
from osa.domain.shared.model.srn import RecordSRN


class IndexRecord(Event):
"""Request to index a single record into a specific backend.

This event is created by FanOutToIndexBackends when a RecordPublished
event is received. Each backend gets its own IndexRecord event,
enabling independent retry and failure isolation.

Attributes:
id: Unique event identifier (inherited from Event).
backend_name: Target backend name (e.g., "vector", "keyword").
record_srn: Structured Resource Name of the record.
metadata: Record metadata to index.
routing_key: Optional routing key for worker filtering (typically matches backend_name).
"""

id: EventId
backend_name: str
record_srn: RecordSRN
metadata: dict[str, Any]
routing_key: str | None = None
7 changes: 7 additions & 0 deletions server/osa/domain/index/handler/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""Index domain event handlers."""

from osa.domain.index.handler.fanout_to_index_backends import FanOutToIndexBackends
from osa.domain.index.handler.keyword_index_handler import KeywordIndexHandler
from osa.domain.index.handler.vector_index_handler import VectorIndexHandler

__all__ = ["FanOutToIndexBackends", "KeywordIndexHandler", "VectorIndexHandler"]
48 changes: 48 additions & 0 deletions server/osa/domain/index/handler/fanout_to_index_backends.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""FanOutToIndexBackends - creates per-backend IndexRecord events from RecordPublished."""

import logging
from typing import ClassVar
from uuid import uuid4

from osa.domain.index.event.index_record import IndexRecord
from osa.domain.index.model.registry import IndexRegistry
from osa.domain.record.event.record_published import RecordPublished
from osa.domain.shared.event import EventHandler, EventId
from osa.domain.shared.outbox import Outbox

logger = logging.getLogger(__name__)


class FanOutToIndexBackends(EventHandler[RecordPublished]):
"""Creates per-backend IndexRecord events from RecordPublished.

When a record is published, this handler creates one IndexRecord event
per registered backend. Each IndexRecord is stored in the outbox,
enabling independent retry and failure isolation per backend.

This replaces the previous pattern where a single RecordPublished event
triggered immediate indexing to all backends in a single transaction.

Batch processing is used for efficiency when multiple records are
published in quick succession.
"""

__batch_size__: ClassVar[int] = 10

indexes: IndexRegistry
outbox: Outbox

async def handle(self, event: RecordPublished) -> None:
"""Create IndexRecord events for each registered backend."""
backend_names = list(self.indexes)
logger.debug(f"FanOut: {event.record_srn} -> {len(backend_names)} backends")

for backend_name in backend_names:
index_event = IndexRecord(
id=EventId(uuid4()),
backend_name=backend_name,
record_srn=event.record_srn,
metadata=event.metadata,
routing_key=backend_name, # Route to backend-specific worker
)
await self.outbox.append(index_event, routing_key=backend_name)
36 changes: 36 additions & 0 deletions server/osa/domain/index/handler/keyword_index_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""KeywordIndexHandler - processes IndexRecord events for keyword backends."""

import logging
from typing import ClassVar

from osa.domain.index.event.index_record import IndexRecord
from osa.domain.index.model.registry import IndexRegistry
from osa.domain.shared.event import EventHandler

logger = logging.getLogger(__name__)


class KeywordIndexHandler(EventHandler[IndexRecord]):
"""Processes IndexRecord events for the keyword backend.

Claims events with routing_key="keyword" and processes them immediately
(batch_size=1) since keyword indexing doesn't benefit from batching.
"""

__routing_key__: ClassVar[str | None] = "keyword"
__batch_size__: ClassVar[int] = 1

indexes: IndexRegistry

async def handle(self, event: IndexRecord) -> None:
"""Process a single IndexRecord event."""
backend = self.indexes.get("keyword")
if backend is None:
logger.warning("Keyword backend not available, skipping event")
return

record = (str(event.record_srn), event.metadata)

await backend.ingest_batch([record])

logger.debug(f"KeywordIndexHandler: indexed event {event.id}")
46 changes: 46 additions & 0 deletions server/osa/domain/index/handler/vector_index_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""VectorIndexHandler - processes IndexRecord events for vector backends."""

import logging
from typing import ClassVar

from osa.domain.index.event.index_record import IndexRecord
from osa.domain.index.model.registry import IndexRegistry
from osa.domain.shared.event import EventHandler

logger = logging.getLogger(__name__)


class VectorIndexHandler(EventHandler[IndexRecord]):
"""Processes IndexRecord events for the vector backend.

Claims events with routing_key="vector" and processes them in batches
for efficient embedding generation.
"""

__routing_key__: ClassVar[str | None] = "vector"
__batch_size__: ClassVar[int] = 100
__batch_timeout__: ClassVar[float] = 5.0

indexes: IndexRegistry

async def handle_batch(self, events: list[IndexRecord]) -> None:
"""Process a batch of IndexRecord events.

Converts events to records and calls ingest_batch on the backend.
"""
if not events:
return

backend = self.indexes.get("vector")
if backend is None:
logger.warning("Vector backend not available, skipping batch")
return

# Prepare records for batch ingestion
records = [(str(e.record_srn), e.metadata) for e in events]

logger.debug(f"VectorIndexHandler: ingesting {len(records)} records to backend")

await backend.ingest_batch(records)

logger.debug(f"VectorIndexHandler: ingested {len(events)} records")
6 changes: 0 additions & 6 deletions server/osa/domain/index/listener/__init__.py

This file was deleted.

Loading