Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
push:
branches: [main]
pull_request:
branches: [main]
# Run on PRs to any branch (not just main)

env:
PYTHON_VERSION: "3.13"
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,4 @@ web/.turbo/
# Environment
.env
!.env.example
server/osa.yaml
72 changes: 72 additions & 0 deletions server/migrations/versions/add_worker_columns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""add_worker_columns

Add columns and indexes to events table for pull-based worker architecture.

Revision ID: add_worker_columns
Revises: 0d9fbacf8e58
Create Date: 2026-02-02

"""

from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision: str = "add_worker_columns"
down_revision: Union[str, Sequence[str], None] = "0d9fbacf8e58"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Add worker columns to events table."""
# Add new columns for pull-based claiming
op.add_column("events", sa.Column("routing_key", sa.String(255), nullable=True))
op.add_column(
"events", sa.Column("retry_count", sa.Integer(), nullable=False, server_default="0")
)
op.add_column("events", sa.Column("claimed_at", sa.DateTime(timezone=True), nullable=True))
op.add_column(
"events",
sa.Column(
"updated_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()
),
)

# Create partial index for efficient claiming query
# Covers: status=pending/claimed, event_type, routing_key, created_at
op.create_index(
"idx_events_claim",
"events",
["delivery_status", "event_type", "routing_key", "created_at"],
postgresql_where=sa.text("delivery_status IN ('pending', 'claimed')"),
)

# Create partial index for stale claim detection
op.create_index(
"idx_events_stale_claims",
"events",
["claimed_at"],
postgresql_where=sa.text("delivery_status = 'claimed'"),
)

# Create partial index for failed event queries
op.create_index(
"idx_events_failed",
"events",
["event_type", "created_at"],
postgresql_where=sa.text("delivery_status = 'failed'"),
)


def downgrade() -> None:
"""Remove worker columns from events table."""
op.drop_index("idx_events_failed", table_name="events")
op.drop_index("idx_events_stale_claims", table_name="events")
op.drop_index("idx_events_claim", table_name="events")
op.drop_column("events", "updated_at")
op.drop_column("events", "claimed_at")
op.drop_column("events", "retry_count")
op.drop_column("events", "routing_key")
9 changes: 5 additions & 4 deletions server/osa/application/api/rest/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from osa.config import Config, configure_logging
from osa.domain.index.service import IndexService
from osa.domain.shared.error import OSAError
from osa.infrastructure.event.worker import BackgroundWorker
from osa.infrastructure.event.worker import WorkerPool
from osa.infrastructure.source.discovery import validate_sources_at_startup
from osa.util.di.fastapi import setup_dishka
from osa.util.di.scope import Scope
Expand All @@ -23,9 +23,10 @@
async def lifespan(app: FastAPI):
container = app.state.dishka_container

# Run background worker (emits ServerStarted internally)
worker = await container.get(BackgroundWorker)
async with worker:
# Run unified worker pool (pull-based event handlers + scheduled tasks)
worker_pool = await container.get(WorkerPool)

async with worker_pool:
yield

# Flush all index backends on shutdown to ensure buffered records are persisted
Expand Down
5 changes: 5 additions & 0 deletions server/osa/domain/curation/handler/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Curation domain event handlers."""

from osa.domain.curation.handler.auto_approve_curation import AutoApproveCuration

__all__ = ["AutoApproveCuration"]
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
"""AutoApproveCurationTool - auto-approves depositions on validation completion."""
"""AutoApproveCuration - auto-approves depositions on validation completion."""

import logging
from uuid import uuid4

from osa.domain.curation.event.deposition_approved import DepositionApproved
from osa.domain.shared.event import EventId, EventListener
from osa.domain.shared.event import EventHandler, EventId
from osa.domain.shared.outbox import Outbox
from osa.domain.validation.event.validation_completed import ValidationCompleted
from osa.domain.validation.model import RunStatus

logger = logging.getLogger(__name__)


class AutoApproveCurationTool(EventListener[ValidationCompleted]):
class AutoApproveCuration(EventHandler[ValidationCompleted]):
"""Auto-approves validation and emits DepositionApproved. 0 curation = instant approve."""

outbox: Outbox
Expand Down Expand Up @@ -40,6 +40,5 @@ async def handle(self, event: ValidationCompleted) -> None:
)

await self.outbox.append(approved)
# Session commit handled by BackgroundWorker

logger.debug(f"Deposition approved: {event.deposition_srn}")
7 changes: 0 additions & 7 deletions server/osa/domain/curation/listener/__init__.py

This file was deleted.

2 changes: 2 additions & 0 deletions server/osa/domain/index/event/index_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ class IndexRecord(Event):
backend_name: Target backend name (e.g., "vector", "keyword").
record_srn: Structured Resource Name of the record.
metadata: Record metadata to index.
routing_key: Optional routing key for worker filtering (typically matches backend_name).
"""

id: EventId
backend_name: str
record_srn: RecordSRN
metadata: dict[str, Any]
routing_key: str | None = None
7 changes: 7 additions & 0 deletions server/osa/domain/index/handler/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""Index domain event handlers."""

from osa.domain.index.handler.fanout_to_index_backends import FanOutToIndexBackends
from osa.domain.index.handler.keyword_index_handler import KeywordIndexHandler
from osa.domain.index.handler.vector_index_handler import VectorIndexHandler

__all__ = ["FanOutToIndexBackends", "KeywordIndexHandler", "VectorIndexHandler"]
Original file line number Diff line number Diff line change
@@ -1,28 +1,34 @@
"""FanOutToIndexBackends - creates per-backend IndexRecord events from RecordPublished."""

import logging
from typing import ClassVar
from uuid import uuid4

from osa.domain.index.event.index_record import IndexRecord
from osa.domain.index.model.registry import IndexRegistry
from osa.domain.record.event.record_published import RecordPublished
from osa.domain.shared.event import EventId, EventListener
from osa.domain.shared.event import EventHandler, EventId
from osa.domain.shared.outbox import Outbox

logger = logging.getLogger(__name__)


class FanOutToIndexBackends(EventListener[RecordPublished]):
class FanOutToIndexBackends(EventHandler[RecordPublished]):
"""Creates per-backend IndexRecord events from RecordPublished.

When a record is published, this listener creates one IndexRecord event
When a record is published, this handler creates one IndexRecord event
per registered backend. Each IndexRecord is stored in the outbox,
enabling independent retry and failure isolation per backend.

This replaces the previous pattern where a single RecordPublished event
triggered immediate indexing to all backends in a single transaction.

Batch processing is used for efficiency when multiple records are
published in quick succession.
"""

__batch_size__: ClassVar[int] = 10

indexes: IndexRegistry
outbox: Outbox

Expand All @@ -37,5 +43,6 @@ async def handle(self, event: RecordPublished) -> None:
backend_name=backend_name,
record_srn=event.record_srn,
metadata=event.metadata,
routing_key=backend_name, # Route to backend-specific worker
)
await self.outbox.append(index_event)
await self.outbox.append(index_event, routing_key=backend_name)
36 changes: 36 additions & 0 deletions server/osa/domain/index/handler/keyword_index_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""KeywordIndexHandler - processes IndexRecord events for keyword backends."""

import logging
from typing import ClassVar

from osa.domain.index.event.index_record import IndexRecord
from osa.domain.index.model.registry import IndexRegistry
from osa.domain.shared.event import EventHandler

logger = logging.getLogger(__name__)


class KeywordIndexHandler(EventHandler[IndexRecord]):
"""Processes IndexRecord events for the keyword backend.

Claims events with routing_key="keyword" and processes them immediately
(batch_size=1) since keyword indexing doesn't benefit from batching.
"""

__routing_key__: ClassVar[str | None] = "keyword"
__batch_size__: ClassVar[int] = 1

indexes: IndexRegistry

async def handle(self, event: IndexRecord) -> None:
"""Process a single IndexRecord event."""
backend = self.indexes.get("keyword")
if backend is None:
logger.warning("Keyword backend not available, skipping event")
return

record = (str(event.record_srn), event.metadata)

await backend.ingest_batch([record])

logger.debug(f"KeywordIndexHandler: indexed event {event.id}")
46 changes: 46 additions & 0 deletions server/osa/domain/index/handler/vector_index_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""VectorIndexHandler - processes IndexRecord events for vector backends."""

import logging
from typing import ClassVar

from osa.domain.index.event.index_record import IndexRecord
from osa.domain.index.model.registry import IndexRegistry
from osa.domain.shared.event import EventHandler

logger = logging.getLogger(__name__)


class VectorIndexHandler(EventHandler[IndexRecord]):
"""Processes IndexRecord events for the vector backend.

Claims events with routing_key="vector" and processes them in batches
for efficient embedding generation.
"""

__routing_key__: ClassVar[str | None] = "vector"
__batch_size__: ClassVar[int] = 100
__batch_timeout__: ClassVar[float] = 5.0

indexes: IndexRegistry

async def handle_batch(self, events: list[IndexRecord]) -> None:
"""Process a batch of IndexRecord events.

Converts events to records and calls ingest_batch on the backend.
"""
if not events:
return

backend = self.indexes.get("vector")
if backend is None:
logger.warning("Vector backend not available, skipping batch")
return

# Prepare records for batch ingestion
records = [(str(e.record_srn), e.metadata) for e in events]

logger.debug(f"VectorIndexHandler: ingesting {len(records)} records to backend")

await backend.ingest_batch(records)

logger.debug(f"VectorIndexHandler: ingested {len(events)} records")
9 changes: 0 additions & 9 deletions server/osa/domain/index/listener/__init__.py

This file was deleted.

Loading
Loading