Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions server/migrations/versions/source_agnostic_records.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""source_agnostic_records

Replace deposition_srn + indexes with source (JSONB) + convention_srn.
No data migration needed — no production data exists.

Revision ID: source_agnostic_records
Revises: add_device_authorizations
Create Date: 2026-03-24

"""

from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision: str = "source_agnostic_records"
down_revision: Union[str, Sequence[str], None] = "add_device_authorizations"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# Drop old indexes
op.drop_index("idx_records_deposition_srn", table_name="records")

# Drop old columns
op.drop_column("records", "deposition_srn")
op.drop_column("records", "indexes")

# Add new columns
op.add_column(
"records",
sa.Column("convention_srn", sa.Text(), nullable=False),
)
op.add_column(
"records",
sa.Column("source", sa.dialects.postgresql.JSONB(), nullable=False),
)

# Add new indexes
op.create_index(
"idx_records_convention_srn",
"records",
["convention_srn"],
)
op.create_index(
"uq_records_source",
"records",
[
sa.text("(source->>'type')"),
sa.text("(source->>'id')"),
],
unique=True,
)


def downgrade() -> None:
# Drop new indexes
op.drop_index("uq_records_source", table_name="records")
op.drop_index("idx_records_convention_srn", table_name="records")

# Drop new columns
op.drop_column("records", "source")
op.drop_column("records", "convention_srn")

# Re-add old columns
op.add_column(
"records",
sa.Column("deposition_srn", sa.String(), nullable=False),
)
op.add_column(
"records",
sa.Column("indexes", sa.JSON(), nullable=False),
)

# Re-add old index
op.create_index(
"idx_records_deposition_srn",
"records",
["deposition_srn"],
)
3 changes: 2 additions & 1 deletion server/osa/application/api/v1/routes/records.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ async def get_record(
return RecordResponse(
record={
"srn": str(result.srn),
"deposition_srn": str(result.deposition_srn),
"source": result.source.model_dump(),
"convention_srn": str(result.convention_srn),
"metadata": result.metadata,
"published_at": result.published_at.isoformat(),
"features": result.features,
Expand Down
6 changes: 2 additions & 4 deletions server/osa/domain/curation/event/deposition_approved.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from typing import Any

from osa.domain.shared.event import Event, EventId
from osa.domain.shared.model.hook import HookDefinition
from osa.domain.shared.model.srn import ConventionSRN, DepositionSRN


Expand All @@ -17,6 +16,5 @@ class DepositionApproved(Event):
id: EventId
deposition_srn: DepositionSRN
metadata: dict[str, Any]
convention_srn: ConventionSRN | None = None
hooks: list[HookDefinition] = []
files_dir: str = ""
convention_srn: ConventionSRN
expected_features: list[str] = []
4 changes: 1 addition & 3 deletions server/osa/domain/curation/handler/auto_approve_curation.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,12 @@ async def handle(self, event: ValidationCompleted) -> None:

logger.info(f"Auto-approving deposition: {event.deposition_srn}")

# Emit DepositionApproved
approved = DepositionApproved(
id=EventId(uuid4()),
deposition_srn=event.deposition_srn,
metadata=event.metadata,
convention_srn=event.convention_srn,
hooks=event.hooks,
files_dir=event.files_dir,
expected_features=event.expected_features,
)

await self.outbox.append(approved)
Expand Down
5 changes: 2 additions & 3 deletions server/osa/domain/deposition/event/submitted.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@
class DepositionSubmittedEvent(Event):
"""Emitted when a deposition is submitted for validation.

Enriched with convention_srn, hooks, and files_dir so the
validation domain can operate without querying deposition repos.
Enriched with convention_srn and hooks so the validation domain
can operate without querying deposition repos.
"""

id: EventId
deposition_id: DepositionSRN
metadata: dict[str, Any]
convention_srn: ConventionSRN
hooks: list[HookDefinition] = []
files_dir: str = ""
3 changes: 0 additions & 3 deletions server/osa/domain/deposition/service/deposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,12 @@ async def submit(self, srn: DepositionSRN) -> Deposition:
dep.submit()
await self.deposition_repo.save(dep)

files_dir = self.file_storage.get_files_dir(dep.srn)

event = DepositionSubmittedEvent(
id=EventId(uuid4()),
deposition_id=srn,
metadata=dep.metadata,
convention_srn=dep.convention_srn,
hooks=convention.hooks,
files_dir=str(files_dir),
)
await self.outbox.append(event)
return dep
Expand Down
15 changes: 10 additions & 5 deletions server/osa/domain/feature/handler/insert_record_features.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
"""InsertRecordFeatures — deferred feature insertion on record publication."""

from osa.domain.feature.port.storage import FeatureStoragePort
from osa.domain.feature.service.feature import FeatureService
from osa.domain.record.event.record_published import RecordPublished
from osa.domain.shared.event import EventHandler


class InsertRecordFeatures(EventHandler[RecordPublished]):
"""Reads hook outputs from cold storage and inserts features with record_srn.
"""Reads hook outputs from storage and inserts features with record_srn.

Uses enriched RecordPublished event data (hooks list) instead of
looking up the convention.
Resolves the hook output directory from the record's source via the
feature storage port, then delegates to FeatureService for insertion.
"""

feature_service: FeatureService
feature_storage: FeatureStoragePort

async def handle(self, event: RecordPublished) -> None:
hook_output_dir = self.feature_storage.get_hook_output_root(
event.source.type, event.source.id
)
await self.feature_service.insert_features_for_record(
deposition_srn=event.deposition_srn,
hook_output_dir=hook_output_dir,
record_srn=str(event.record_srn),
hooks=event.hooks,
expected_features=event.expected_features,
)
14 changes: 11 additions & 3 deletions server/osa/domain/feature/port/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,29 @@
from abc import abstractmethod
from typing import Any, Protocol

from osa.domain.shared.model.srn import DepositionSRN
from osa.domain.shared.port import Port


class FeatureStoragePort(Port, Protocol):
"""File storage operations used by the feature domain."""

@abstractmethod
def get_hook_output_root(self, source_type: str, source_id: str) -> str:
"""Resolve the root directory containing hook outputs for a source.

The handler uses this to locate hook outputs, then passes the
resolved path to read_hook_features / hook_features_exist.
"""
...

@abstractmethod
async def read_hook_features(
self, deposition_srn: DepositionSRN, hook_name: str
self, hook_output_dir: str, feature_name: str
) -> list[dict[str, Any]]:
"""Read features.json from a hook's output directory."""
...

@abstractmethod
async def hook_features_exist(self, deposition_srn: DepositionSRN, hook_name: str) -> bool:
async def hook_features_exist(self, hook_output_dir: str, feature_name: str) -> bool:
"""Check whether features.json exists in a hook's output directory."""
...
29 changes: 17 additions & 12 deletions server/osa/domain/feature/service/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from osa.domain.feature.port.feature_store import FeatureStore
from osa.domain.feature.port.storage import FeatureStoragePort
from osa.domain.shared.model.hook import HookDefinition
from osa.domain.shared.model.srn import DepositionSRN
from osa.domain.shared.service import Service

logger = logging.getLogger(__name__)
Expand All @@ -33,27 +32,33 @@ async def insert_features(

async def insert_features_for_record(
self,
deposition_srn: DepositionSRN,
hook_output_dir: str,
record_srn: str,
hooks: list[HookDefinition] | None = None,
expected_features: list[str] | None = None,
) -> None:
"""Read hook features and insert into feature tables.
"""Read hook features from the given directory and insert into feature tables.

If hooks are provided (from enriched event), iterate those.
Warns (does not raise) when an expected feature is missing — the record
is already published, blocking other features would be worse.
"""
if not hooks:
if not expected_features:
return

for hook in hooks:
hook_name = hook.name
if not await self.feature_storage.hook_features_exist(deposition_srn, hook_name):
for feature_name in expected_features:
if not await self.feature_storage.hook_features_exist(hook_output_dir, feature_name):
logger.warning(
f"Expected feature '{feature_name}' not found in {hook_output_dir} "
f"for record {record_srn}"
)
continue

features = await self.feature_storage.read_hook_features(deposition_srn, hook_name)
features = await self.feature_storage.read_hook_features(hook_output_dir, feature_name)
if features:
count = await self.insert_features(
hook_name=hook_name,
hook_name=feature_name,
record_srn=record_srn,
rows=features,
)
logger.info(f"Inserted {count} features for hook={hook_name} record={record_srn}")
logger.info(
f"Inserted {count} features for hook={feature_name} record={record_srn}"
)
17 changes: 8 additions & 9 deletions server/osa/domain/record/event/record_published.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,21 @@
from typing import Any

from osa.domain.shared.event import Event, EventId
from osa.domain.shared.model.hook import HookDefinition
from osa.domain.shared.model.srn import ConventionSRN, DepositionSRN, RecordSRN
from osa.domain.shared.model.source import RecordSource
from osa.domain.shared.model.srn import ConventionSRN, RecordSRN


class RecordPublished(Event):
"""Emitted when a record is published and ready for indexing.

Enriched with convention_srn, hooks, and files_dir so downstream
consumers (feature insertion, indexing) can operate without
querying deposition/convention repositories.
Enriched with source, convention_srn, and expected_features so downstream
consumers (feature insertion, indexing) can operate without querying
record/convention repositories.
"""

id: EventId
record_srn: RecordSRN
deposition_srn: DepositionSRN
source: RecordSource
convention_srn: ConventionSRN
metadata: dict[str, Any]
convention_srn: ConventionSRN | None = None
hooks: list[HookDefinition] = []
files_dir: str = ""
expected_features: list[str] = []
12 changes: 7 additions & 5 deletions server/osa/domain/record/handler/convert_deposition_to_record.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
"""ConvertDepositionToRecord - creates records when depositions are approved."""

from osa.domain.curation.event.deposition_approved import DepositionApproved
from osa.domain.record.model.draft import RecordDraft
from osa.domain.record.service import RecordService
from osa.domain.shared.event import EventHandler
from osa.domain.shared.model.source import DepositionSource


class ConvertDepositionToRecord(EventHandler[DepositionApproved]):
Expand All @@ -14,11 +16,11 @@ class ConvertDepositionToRecord(EventHandler[DepositionApproved]):
service: RecordService

async def handle(self, event: DepositionApproved) -> None:
"""Delegate to RecordService to create and publish the record."""
await self.service.publish_record(
deposition_srn=event.deposition_srn,
"""Build a RecordDraft from DepositionApproved and publish."""
draft = RecordDraft(
source=DepositionSource(id=str(event.deposition_srn)),
metadata=event.metadata,
convention_srn=event.convention_srn,
hooks=event.hooks,
files_dir=event.files_dir,
expected_features=event.expected_features,
)
await self.service.publish_record(draft)
3 changes: 1 addition & 2 deletions server/osa/domain/record/model/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Record domain model."""

from osa.domain.record.model.aggregate import Record
from osa.domain.record.model.value import IndexRef

__all__ = ["Record", "IndexRef"]
__all__ = ["Record"]
8 changes: 4 additions & 4 deletions server/osa/domain/record/model/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
from datetime import datetime
from typing import Any

from osa.domain.record.model.value import IndexRef
from osa.domain.shared.model.aggregate import Aggregate
from osa.domain.shared.model.srn import DepositionSRN, RecordSRN
from osa.domain.shared.model.source import RecordSource
from osa.domain.shared.model.srn import ConventionSRN, RecordSRN


class Record(Aggregate):
"""An immutable, versioned, published record."""

srn: RecordSRN
deposition_srn: DepositionSRN
source: RecordSource
convention_srn: ConventionSRN
metadata: dict[str, Any]
indexes: dict[str, IndexRef] = {}
published_at: datetime
21 changes: 21 additions & 0 deletions server/osa/domain/record/model/draft.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""RecordDraft — value object for publishing a record from any source."""

from typing import Any

from osa.domain.shared.model.source import RecordSource
from osa.domain.shared.model.srn import ConventionSRN
from osa.domain.shared.model.value import ValueObject


class RecordDraft(ValueObject):
"""Input to RecordService.publish_record().

Carries everything needed to create a Record from any source type.
``expected_features`` lists feature table names (not full HookDefinitions)
so compute runtime details don't leak past the validation boundary.
"""

source: RecordSource
metadata: dict[str, Any]
convention_srn: ConventionSRN
expected_features: list[str] = []
Loading
Loading