From 9d7ecf4ab8acda2f740486c7c6cf5bddad272739 Mon Sep 17 00:00:00 2001 From: Rory Byrne Date: Wed, 25 Mar 2026 00:40:56 +0000 Subject: [PATCH 1/4] refactor: decouple Record from Deposition with source-agnostic model MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace deposition_srn + indexes on Record with a discriminated union RecordSource (DepositionSource, HarvestSource) + convention_srn. Records can now originate from any pathway without hardwiring to depositions. Key changes: - RecordSource discriminated union in shared kernel (domain/shared/model/source.py) - RecordDraft value object as input to publish_record() - HookDefinition confined to validation boundary — downstream events carry only expected_features: list[str], not OCI runtime specs - Events tightened: files_dir/staging_dir removed from all domain events, storage adapters resolve paths from source identifiers - HookInputs simplified: deposition_srn replaced with run_id + files_dir - FeatureStoragePort.get_hook_output_root() resolves paths from source type + id, handler passes resolved path to FeatureService - IndexRef and find_by_deposition() removed, find_by_source() added - Alembic migration: drop deposition_srn/indexes, add source (JSONB) + convention_srn with functional unique index Closes #103 --- .../versions/source_agnostic_records.py | 90 ++++++++++++ .../osa/application/api/v1/routes/records.py | 3 +- .../curation/event/deposition_approved.py | 6 +- .../curation/handler/auto_approve_curation.py | 4 +- .../osa/domain/deposition/event/submitted.py | 5 +- .../domain/deposition/service/deposition.py | 3 - .../feature/handler/insert_record_features.py | 15 +- server/osa/domain/feature/port/storage.py | 14 +- server/osa/domain/feature/service/feature.py | 29 ++-- .../domain/record/event/record_published.py | 17 ++- .../handler/convert_deposition_to_record.py | 12 +- server/osa/domain/record/model/__init__.py | 3 +- server/osa/domain/record/model/aggregate.py | 8 +- server/osa/domain/record/model/draft.py | 21 +++ server/osa/domain/record/model/value.py | 21 --- server/osa/domain/record/port/repository.py | 4 +- server/osa/domain/record/query/get_record.py | 9 +- server/osa/domain/record/service/record.py | 35 ++--- server/osa/domain/shared/model/source.py | 53 ++++++- .../validation/event/validation_completed.py | 4 +- .../validation/handler/validate_deposition.py | 6 +- .../osa/domain/validation/port/hook_runner.py | 3 +- server/osa/domain/validation/port/storage.py | 5 + .../domain/validation/service/validation.py | 8 +- server/osa/infrastructure/k8s/naming.py | 29 ++-- server/osa/infrastructure/k8s/runner.py | 20 ++- server/osa/infrastructure/oci/runner.py | 2 +- .../persistence/adapter/storage.py | 19 +-- .../persistence/mappers/record.py | 24 ++-- .../persistence/repository/record.py | 11 +- .../osa/infrastructure/persistence/tables.py | 17 ++- server/osa/infrastructure/s3/storage.py | 18 ++- .../deposition/test_deposition_service.py | 9 +- .../test_deposition_submitted_enriched.py | 17 +-- .../domain/deposition/test_event_chain.py | 47 ++++--- .../feature/test_feature_service_decoupled.py | 9 +- .../feature/test_insert_record_features.py | 133 ++++++++++-------- .../unit/domain/index/test_fanout_listener.py | 19 ++- .../domain/record/test_get_record_handler.py | 13 +- .../unit/domain/record/test_record_draft.py | 38 +++++ .../domain/record/test_record_features.py | 15 +- .../record/test_record_published_enriched.py | 57 +++----- .../unit/domain/record/test_record_service.py | 128 ++++++++++------- .../unit/domain/shared/test_record_source.py | 92 ++++++++++++ .../domain/validation/test_hook_runner.py | 11 +- .../validation/test_validation_service.py | 6 +- .../test_validation_service_decoupled.py | 4 +- .../k8s/test_k8s_hook_runner.py | 75 +++++----- .../infrastructure/test_file_storage_hooks.py | 28 ++-- .../infrastructure/test_oci_hook_runner.py | 28 ++-- 50 files changed, 781 insertions(+), 466 deletions(-) create mode 100644 server/migrations/versions/source_agnostic_records.py create mode 100644 server/osa/domain/record/model/draft.py create mode 100644 server/tests/unit/domain/record/test_record_draft.py create mode 100644 server/tests/unit/domain/shared/test_record_source.py diff --git a/server/migrations/versions/source_agnostic_records.py b/server/migrations/versions/source_agnostic_records.py new file mode 100644 index 0000000..d263b14 --- /dev/null +++ b/server/migrations/versions/source_agnostic_records.py @@ -0,0 +1,90 @@ +"""source_agnostic_records + +Replace deposition_srn + indexes with source (JSONB) + convention_srn. +No data migration needed — no production data exists. + +Revision ID: source_agnostic_records +Revises: consumer_group_delivery +Create Date: 2026-03-24 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "source_agnostic_records" +down_revision: Union[str, Sequence[str], None] = "consumer_group_delivery" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # Drop old indexes + op.drop_index("idx_records_deposition_srn", table_name="records") + + # Drop old columns + op.drop_column("records", "deposition_srn") + op.drop_column("records", "indexes") + + # Add new columns + op.add_column( + "records", + sa.Column("convention_srn", sa.Text(), nullable=False), + ) + op.add_column( + "records", + sa.Column("source", sa.dialects.postgresql.JSONB(), nullable=False), + ) + + # Add new indexes + op.create_index( + "idx_records_convention_srn", + "records", + ["convention_srn"], + ) + op.create_index( + "uq_records_convention_source", + "records", + [ + "convention_srn", + sa.text("(source->>'type')"), + sa.text("(source->>'id')"), + ], + unique=True, + ) + op.create_index( + "idx_records_source_type", + "records", + [sa.text("(source->>'type')")], + ) + + +def downgrade() -> None: + # Drop new indexes + op.drop_index("idx_records_source_type", table_name="records") + op.drop_index("uq_records_convention_source", table_name="records") + op.drop_index("idx_records_convention_srn", table_name="records") + + # Drop new columns + op.drop_column("records", "source") + op.drop_column("records", "convention_srn") + + # Re-add old columns + op.add_column( + "records", + sa.Column("deposition_srn", sa.String(), nullable=False), + ) + op.add_column( + "records", + sa.Column("indexes", sa.JSON(), nullable=False), + ) + + # Re-add old index + op.create_index( + "idx_records_deposition_srn", + "records", + ["deposition_srn"], + ) diff --git a/server/osa/application/api/v1/routes/records.py b/server/osa/application/api/v1/routes/records.py index 2e6b1b6..4182005 100644 --- a/server/osa/application/api/v1/routes/records.py +++ b/server/osa/application/api/v1/routes/records.py @@ -38,7 +38,8 @@ async def get_record( return RecordResponse( record={ "srn": str(result.srn), - "deposition_srn": str(result.deposition_srn), + "source": result.source.model_dump(), + "convention_srn": str(result.convention_srn), "metadata": result.metadata, "published_at": result.published_at.isoformat(), "features": result.features, diff --git a/server/osa/domain/curation/event/deposition_approved.py b/server/osa/domain/curation/event/deposition_approved.py index 7f6c15b..5ad6ded 100644 --- a/server/osa/domain/curation/event/deposition_approved.py +++ b/server/osa/domain/curation/event/deposition_approved.py @@ -3,7 +3,6 @@ from typing import Any from osa.domain.shared.event import Event, EventId -from osa.domain.shared.model.hook import HookDefinition from osa.domain.shared.model.srn import ConventionSRN, DepositionSRN @@ -17,6 +16,5 @@ class DepositionApproved(Event): id: EventId deposition_srn: DepositionSRN metadata: dict[str, Any] - convention_srn: ConventionSRN | None = None - hooks: list[HookDefinition] = [] - files_dir: str = "" + convention_srn: ConventionSRN + expected_features: list[str] = [] diff --git a/server/osa/domain/curation/handler/auto_approve_curation.py b/server/osa/domain/curation/handler/auto_approve_curation.py index 3e74912..9883b29 100644 --- a/server/osa/domain/curation/handler/auto_approve_curation.py +++ b/server/osa/domain/curation/handler/auto_approve_curation.py @@ -32,14 +32,12 @@ async def handle(self, event: ValidationCompleted) -> None: logger.info(f"Auto-approving deposition: {event.deposition_srn}") - # Emit DepositionApproved approved = DepositionApproved( id=EventId(uuid4()), deposition_srn=event.deposition_srn, metadata=event.metadata, convention_srn=event.convention_srn, - hooks=event.hooks, - files_dir=event.files_dir, + expected_features=event.expected_features, ) await self.outbox.append(approved) diff --git a/server/osa/domain/deposition/event/submitted.py b/server/osa/domain/deposition/event/submitted.py index 610a9e1..7ca6861 100644 --- a/server/osa/domain/deposition/event/submitted.py +++ b/server/osa/domain/deposition/event/submitted.py @@ -8,8 +8,8 @@ class DepositionSubmittedEvent(Event): """Emitted when a deposition is submitted for validation. - Enriched with convention_srn, hooks, and files_dir so the - validation domain can operate without querying deposition repos. + Enriched with convention_srn and hooks so the validation domain + can operate without querying deposition repos. """ id: EventId @@ -17,4 +17,3 @@ class DepositionSubmittedEvent(Event): metadata: dict[str, Any] convention_srn: ConventionSRN hooks: list[HookDefinition] = [] - files_dir: str = "" diff --git a/server/osa/domain/deposition/service/deposition.py b/server/osa/domain/deposition/service/deposition.py index 7235e1a..239d60c 100644 --- a/server/osa/domain/deposition/service/deposition.py +++ b/server/osa/domain/deposition/service/deposition.py @@ -194,15 +194,12 @@ async def submit(self, srn: DepositionSRN) -> Deposition: dep.submit() await self.deposition_repo.save(dep) - files_dir = self.file_storage.get_files_dir(dep.srn) - event = DepositionSubmittedEvent( id=EventId(uuid4()), deposition_id=srn, metadata=dep.metadata, convention_srn=dep.convention_srn, hooks=convention.hooks, - files_dir=str(files_dir), ) await self.outbox.append(event) return dep diff --git a/server/osa/domain/feature/handler/insert_record_features.py b/server/osa/domain/feature/handler/insert_record_features.py index 3c34cb9..d0bd4a5 100644 --- a/server/osa/domain/feature/handler/insert_record_features.py +++ b/server/osa/domain/feature/handler/insert_record_features.py @@ -1,22 +1,27 @@ """InsertRecordFeatures — deferred feature insertion on record publication.""" +from osa.domain.feature.port.storage import FeatureStoragePort from osa.domain.feature.service.feature import FeatureService from osa.domain.record.event.record_published import RecordPublished from osa.domain.shared.event import EventHandler class InsertRecordFeatures(EventHandler[RecordPublished]): - """Reads hook outputs from cold storage and inserts features with record_srn. + """Reads hook outputs from storage and inserts features with record_srn. - Uses enriched RecordPublished event data (hooks list) instead of - looking up the convention. + Resolves the hook output directory from the record's source via the + feature storage port, then delegates to FeatureService for insertion. """ feature_service: FeatureService + feature_storage: FeatureStoragePort async def handle(self, event: RecordPublished) -> None: + hook_output_dir = self.feature_storage.get_hook_output_root( + event.source.type, event.source.id + ) await self.feature_service.insert_features_for_record( - deposition_srn=event.deposition_srn, + hook_output_dir=hook_output_dir, record_srn=str(event.record_srn), - hooks=event.hooks, + expected_features=event.expected_features, ) diff --git a/server/osa/domain/feature/port/storage.py b/server/osa/domain/feature/port/storage.py index 48964eb..4590de5 100644 --- a/server/osa/domain/feature/port/storage.py +++ b/server/osa/domain/feature/port/storage.py @@ -3,21 +3,29 @@ from abc import abstractmethod from typing import Any, Protocol -from osa.domain.shared.model.srn import DepositionSRN from osa.domain.shared.port import Port class FeatureStoragePort(Port, Protocol): """File storage operations used by the feature domain.""" + @abstractmethod + def get_hook_output_root(self, source_type: str, source_id: str) -> str: + """Resolve the root directory containing hook outputs for a source. + + The handler uses this to locate hook outputs, then passes the + resolved path to read_hook_features / hook_features_exist. + """ + ... + @abstractmethod async def read_hook_features( - self, deposition_srn: DepositionSRN, hook_name: str + self, hook_output_dir: str, feature_name: str ) -> list[dict[str, Any]]: """Read features.json from a hook's output directory.""" ... @abstractmethod - async def hook_features_exist(self, deposition_srn: DepositionSRN, hook_name: str) -> bool: + async def hook_features_exist(self, hook_output_dir: str, feature_name: str) -> bool: """Check whether features.json exists in a hook's output directory.""" ... diff --git a/server/osa/domain/feature/service/feature.py b/server/osa/domain/feature/service/feature.py index 31abf76..210e681 100644 --- a/server/osa/domain/feature/service/feature.py +++ b/server/osa/domain/feature/service/feature.py @@ -6,7 +6,6 @@ from osa.domain.feature.port.feature_store import FeatureStore from osa.domain.feature.port.storage import FeatureStoragePort from osa.domain.shared.model.hook import HookDefinition -from osa.domain.shared.model.srn import DepositionSRN from osa.domain.shared.service import Service logger = logging.getLogger(__name__) @@ -33,27 +32,33 @@ async def insert_features( async def insert_features_for_record( self, - deposition_srn: DepositionSRN, + hook_output_dir: str, record_srn: str, - hooks: list[HookDefinition] | None = None, + expected_features: list[str] | None = None, ) -> None: - """Read hook features and insert into feature tables. + """Read hook features from the given directory and insert into feature tables. - If hooks are provided (from enriched event), iterate those. + Warns (does not raise) when an expected feature is missing — the record + is already published, blocking other features would be worse. """ - if not hooks: + if not expected_features: return - for hook in hooks: - hook_name = hook.name - if not await self.feature_storage.hook_features_exist(deposition_srn, hook_name): + for feature_name in expected_features: + if not await self.feature_storage.hook_features_exist(hook_output_dir, feature_name): + logger.warning( + f"Expected feature '{feature_name}' not found in {hook_output_dir} " + f"for record {record_srn}" + ) continue - features = await self.feature_storage.read_hook_features(deposition_srn, hook_name) + features = await self.feature_storage.read_hook_features(hook_output_dir, feature_name) if features: count = await self.insert_features( - hook_name=hook_name, + hook_name=feature_name, record_srn=record_srn, rows=features, ) - logger.info(f"Inserted {count} features for hook={hook_name} record={record_srn}") + logger.info( + f"Inserted {count} features for hook={feature_name} record={record_srn}" + ) diff --git a/server/osa/domain/record/event/record_published.py b/server/osa/domain/record/event/record_published.py index 1a80071..2c6dc62 100644 --- a/server/osa/domain/record/event/record_published.py +++ b/server/osa/domain/record/event/record_published.py @@ -3,22 +3,21 @@ from typing import Any from osa.domain.shared.event import Event, EventId -from osa.domain.shared.model.hook import HookDefinition -from osa.domain.shared.model.srn import ConventionSRN, DepositionSRN, RecordSRN +from osa.domain.shared.model.source import RecordSource +from osa.domain.shared.model.srn import ConventionSRN, RecordSRN class RecordPublished(Event): """Emitted when a record is published and ready for indexing. - Enriched with convention_srn, hooks, and files_dir so downstream - consumers (feature insertion, indexing) can operate without - querying deposition/convention repositories. + Enriched with source, convention_srn, and expected_features so downstream + consumers (feature insertion, indexing) can operate without querying + record/convention repositories. """ id: EventId record_srn: RecordSRN - deposition_srn: DepositionSRN + source: RecordSource + convention_srn: ConventionSRN metadata: dict[str, Any] - convention_srn: ConventionSRN | None = None - hooks: list[HookDefinition] = [] - files_dir: str = "" + expected_features: list[str] = [] diff --git a/server/osa/domain/record/handler/convert_deposition_to_record.py b/server/osa/domain/record/handler/convert_deposition_to_record.py index c58fc48..5f89aa4 100644 --- a/server/osa/domain/record/handler/convert_deposition_to_record.py +++ b/server/osa/domain/record/handler/convert_deposition_to_record.py @@ -1,8 +1,10 @@ """ConvertDepositionToRecord - creates records when depositions are approved.""" from osa.domain.curation.event.deposition_approved import DepositionApproved +from osa.domain.record.model.draft import RecordDraft from osa.domain.record.service import RecordService from osa.domain.shared.event import EventHandler +from osa.domain.shared.model.source import DepositionSource class ConvertDepositionToRecord(EventHandler[DepositionApproved]): @@ -14,11 +16,11 @@ class ConvertDepositionToRecord(EventHandler[DepositionApproved]): service: RecordService async def handle(self, event: DepositionApproved) -> None: - """Delegate to RecordService to create and publish the record.""" - await self.service.publish_record( - deposition_srn=event.deposition_srn, + """Build a RecordDraft from DepositionApproved and publish.""" + draft = RecordDraft( + source=DepositionSource(id=str(event.deposition_srn)), metadata=event.metadata, convention_srn=event.convention_srn, - hooks=event.hooks, - files_dir=event.files_dir, + expected_features=event.expected_features, ) + await self.service.publish_record(draft) diff --git a/server/osa/domain/record/model/__init__.py b/server/osa/domain/record/model/__init__.py index 9558fe3..4d774da 100644 --- a/server/osa/domain/record/model/__init__.py +++ b/server/osa/domain/record/model/__init__.py @@ -1,6 +1,5 @@ """Record domain model.""" from osa.domain.record.model.aggregate import Record -from osa.domain.record.model.value import IndexRef -__all__ = ["Record", "IndexRef"] +__all__ = ["Record"] diff --git a/server/osa/domain/record/model/aggregate.py b/server/osa/domain/record/model/aggregate.py index cbf23fd..69e5575 100644 --- a/server/osa/domain/record/model/aggregate.py +++ b/server/osa/domain/record/model/aggregate.py @@ -3,16 +3,16 @@ from datetime import datetime from typing import Any -from osa.domain.record.model.value import IndexRef from osa.domain.shared.model.aggregate import Aggregate -from osa.domain.shared.model.srn import DepositionSRN, RecordSRN +from osa.domain.shared.model.source import RecordSource +from osa.domain.shared.model.srn import ConventionSRN, RecordSRN class Record(Aggregate): """An immutable, versioned, published record.""" srn: RecordSRN - deposition_srn: DepositionSRN + source: RecordSource + convention_srn: ConventionSRN metadata: dict[str, Any] - indexes: dict[str, IndexRef] = {} published_at: datetime diff --git a/server/osa/domain/record/model/draft.py b/server/osa/domain/record/model/draft.py new file mode 100644 index 0000000..f5e9e6e --- /dev/null +++ b/server/osa/domain/record/model/draft.py @@ -0,0 +1,21 @@ +"""RecordDraft — value object for publishing a record from any source.""" + +from typing import Any + +from osa.domain.shared.model.source import RecordSource +from osa.domain.shared.model.srn import ConventionSRN +from osa.domain.shared.model.value import ValueObject + + +class RecordDraft(ValueObject): + """Input to RecordService.publish_record(). + + Carries everything needed to create a Record from any source type. + ``expected_features`` lists feature table names (not full HookDefinitions) + so compute runtime details don't leak past the validation boundary. + """ + + source: RecordSource + metadata: dict[str, Any] + convention_srn: ConventionSRN + expected_features: list[str] = [] diff --git a/server/osa/domain/record/model/value.py b/server/osa/domain/record/model/value.py index 5d0c23d..a50b539 100644 --- a/server/osa/domain/record/model/value.py +++ b/server/osa/domain/record/model/value.py @@ -1,22 +1 @@ """Record domain value objects.""" - -from datetime import datetime - -from osa.domain.shared.model.value import ValueObject - - -class IndexRef(ValueObject): - """A pointer to a record's data in an external index. - - Each index adapter knows how to use the external_id to retrieve data. - The Record stores a dict mapping index_id -> IndexRef. - - Example: - record.indexes = { - "vector": IndexRef(external_id="abc123", indexed_at=...), - "files": IndexRef(external_id="s3://bucket/prefix/", indexed_at=...), - } - """ - - external_id: str # The ID/path used to find this record's data in the index - indexed_at: datetime | None = None diff --git a/server/osa/domain/record/port/repository.py b/server/osa/domain/record/port/repository.py index 12ca822..f515c95 100644 --- a/server/osa/domain/record/port/repository.py +++ b/server/osa/domain/record/port/repository.py @@ -4,7 +4,7 @@ from typing import Protocol from osa.domain.record.model.aggregate import Record -from osa.domain.shared.model.srn import DepositionSRN, RecordSRN +from osa.domain.shared.model.srn import RecordSRN from osa.domain.shared.port import Port @@ -16,7 +16,7 @@ async def save(self, record: Record) -> None: ... async def get(self, srn: RecordSRN) -> Record | None: ... @abstractmethod - async def find_by_deposition(self, deposition_srn: DepositionSRN) -> Record | None: ... + async def find_by_source(self, source_type: str, source_id: str) -> Record | None: ... @abstractmethod async def count(self) -> int: ... diff --git a/server/osa/domain/record/query/get_record.py b/server/osa/domain/record/query/get_record.py index cb586c7..10993e9 100644 --- a/server/osa/domain/record/query/get_record.py +++ b/server/osa/domain/record/query/get_record.py @@ -5,7 +5,8 @@ from osa.domain.record.service.record import RecordService from osa.domain.shared.authorization.gate import public -from osa.domain.shared.model.srn import DepositionSRN, RecordSRN +from osa.domain.shared.model.source import RecordSource +from osa.domain.shared.model.srn import ConventionSRN, RecordSRN from osa.domain.shared.query import Query, QueryHandler, Result @@ -15,7 +16,8 @@ class GetRecord(Query): class RecordDetail(Result): srn: RecordSRN - deposition_srn: DepositionSRN + source: RecordSource + convention_srn: ConventionSRN metadata: dict[str, Any] published_at: datetime features: dict[str, list[dict[str, Any]]] = {} @@ -30,7 +32,8 @@ async def run(self, cmd: GetRecord) -> RecordDetail: features = await self.record_service.get_features_for_record(cmd.srn) return RecordDetail( srn=record.srn, - deposition_srn=record.deposition_srn, + source=record.source, + convention_srn=record.convention_srn, metadata=record.metadata, published_at=record.published_at, features=features, diff --git a/server/osa/domain/record/service/record.py b/server/osa/domain/record/service/record.py index dc30e14..d6d3cc4 100644 --- a/server/osa/domain/record/service/record.py +++ b/server/osa/domain/record/service/record.py @@ -1,4 +1,4 @@ -"""RecordService - orchestrates record creation from approved depositions.""" +"""RecordService - orchestrates record creation from any source.""" from __future__ import annotations @@ -9,13 +9,11 @@ from osa.domain.record.event.record_published import RecordPublished from osa.domain.record.model.aggregate import Record +from osa.domain.record.model.draft import RecordDraft from osa.domain.record.port.repository import RecordRepository from osa.domain.shared.error import NotFoundError from osa.domain.shared.event import EventId -from osa.domain.shared.model.hook import HookDefinition from osa.domain.shared.model.srn import ( - ConventionSRN, - DepositionSRN, Domain, LocalId, RecordSRN, @@ -31,7 +29,7 @@ class RecordService(Service): - """Creates and persists Record aggregates from approved depositions.""" + """Creates and persists Record aggregates from any source.""" record_repo: RecordRepository outbox: Outbox @@ -51,16 +49,9 @@ async def get(self, srn: RecordSRN) -> Record: raise NotFoundError(f"Record not found: {srn}") return record - async def publish_record( - self, - deposition_srn: DepositionSRN, - metadata: dict[str, Any], - convention_srn: ConventionSRN | None = None, - hooks: list[HookDefinition] | None = None, - files_dir: str = "", - ) -> Record: - """Create and persist a Record from an approved deposition.""" - logger.info(f"Creating record for approved deposition: {deposition_srn}") + async def publish_record(self, draft: RecordDraft) -> Record: + """Create and persist a Record from a draft.""" + logger.info(f"Creating record from {draft.source.type} source: {draft.source.id}") record_srn = RecordSRN( domain=self.node_domain, @@ -70,8 +61,9 @@ async def publish_record( record = Record( srn=record_srn, - deposition_srn=deposition_srn, - metadata=metadata, + source=draft.source, + convention_srn=draft.convention_srn, + metadata=draft.metadata, published_at=datetime.now(UTC), ) @@ -81,11 +73,10 @@ async def publish_record( published = RecordPublished( id=EventId(uuid4()), record_srn=record_srn, - deposition_srn=deposition_srn, - metadata=metadata, - convention_srn=convention_srn, - hooks=hooks or [], - files_dir=files_dir, + source=draft.source, + convention_srn=draft.convention_srn, + metadata=draft.metadata, + expected_features=draft.expected_features, ) await self.outbox.append(published) diff --git a/server/osa/domain/shared/model/source.py b/server/osa/domain/shared/model/source.py index 2b29df8..e61fd8e 100644 --- a/server/osa/domain/shared/model/source.py +++ b/server/osa/domain/shared/model/source.py @@ -1,8 +1,8 @@ """Shared source domain models used across deposition and source domains.""" -from typing import Any +from typing import Annotated, Any, Literal, Union -from pydantic import Field +from pydantic import Discriminator, Field, Tag, field_validator from osa.domain.shared.model.value import ValueObject @@ -28,6 +28,55 @@ class InitialRunConfig(ValueObject): limit: int | None = None +# ── RecordSource discriminated union ── + + +class _RecordSourceBase(ValueObject): + """Base for all record source types.""" + + type: str + id: str + + @field_validator("id") + @classmethod + def id_must_be_non_empty(cls, v: str) -> str: + if not v: + raise ValueError("id must be non-empty") + return v + + +class DepositionSource(_RecordSourceBase): + """Record originated from a user deposition.""" + + type: Literal["deposition"] = "deposition" + + +class HarvestSource(_RecordSourceBase): + """Record originated from an automated harvest run.""" + + type: Literal["harvest"] = "harvest" + harvest_run_srn: str + upstream_source: str + + +def _record_source_discriminator(v: Any) -> str: + if isinstance(v, dict): + return v.get("type", "") + return getattr(v, "type", "") + + +RecordSource = Annotated[ + Union[ + Annotated[DepositionSource, Tag("deposition")], + Annotated[HarvestSource, Tag("harvest")], + ], + Discriminator(_record_source_discriminator), +] + + +# ── Source runner definitions ── + + class SourceDefinition(ValueObject): """Complete specification for a source: image reference + config + limits.""" diff --git a/server/osa/domain/validation/event/validation_completed.py b/server/osa/domain/validation/event/validation_completed.py index 6716320..b65c147 100644 --- a/server/osa/domain/validation/event/validation_completed.py +++ b/server/osa/domain/validation/event/validation_completed.py @@ -1,7 +1,6 @@ from typing import Any from osa.domain.shared.event import Event, EventId -from osa.domain.shared.model.hook import HookDefinition from osa.domain.shared.model.srn import ConventionSRN, DepositionSRN, ValidationRunSRN from osa.domain.validation.model import RunStatus @@ -16,5 +15,4 @@ class ValidationCompleted(Event): status: RunStatus hook_results: list[dict[str, Any]] metadata: dict[str, Any] - hooks: list[HookDefinition] = [] - files_dir: str = "" + expected_features: list[str] = [] diff --git a/server/osa/domain/validation/handler/validate_deposition.py b/server/osa/domain/validation/handler/validate_deposition.py index 0b1f246..3f92a52 100644 --- a/server/osa/domain/validation/handler/validate_deposition.py +++ b/server/osa/domain/validation/handler/validate_deposition.py @@ -30,7 +30,6 @@ async def handle(self, event: DepositionSubmittedEvent) -> None: convention_srn=event.convention_srn, metadata=event.metadata, hooks=event.hooks, - files_dir=event.files_dir, ) except ValueError: logger.error(f"Validation setup failed for: {event.deposition_id}") @@ -52,6 +51,8 @@ async def handle(self, event: DepositionSubmittedEvent) -> None: await self.outbox.append(failed) logger.info(f"Validation failed for: {event.deposition_id}") else: + # Extract expected_features from hooks at the validation boundary + expected_features = [h.name for h in event.hooks] completed = ValidationCompleted( id=EventId(uuid4()), validation_run_srn=run.srn, @@ -60,8 +61,7 @@ async def handle(self, event: DepositionSubmittedEvent) -> None: status=run.status, hook_results=[r.model_dump() for r in hook_results], metadata=event.metadata, - hooks=event.hooks, - files_dir=event.files_dir, + expected_features=expected_features, ) await self.outbox.append(completed) logger.info(f"Validation completed for: {event.deposition_id}") diff --git a/server/osa/domain/validation/port/hook_runner.py b/server/osa/domain/validation/port/hook_runner.py index 23c0583..703596b 100644 --- a/server/osa/domain/validation/port/hook_runner.py +++ b/server/osa/domain/validation/port/hook_runner.py @@ -6,7 +6,6 @@ from typing import Protocol, runtime_checkable from osa.domain.shared.model.hook import HookDefinition -from osa.domain.shared.model.srn import DepositionSRN from osa.domain.shared.port import Port from osa.domain.validation.model.hook_result import HookResult @@ -16,7 +15,7 @@ class HookInputs: """Inputs to pass to a hook container.""" record_json: dict - deposition_srn: DepositionSRN + run_id: str files_dir: Path | None = None config: dict | None = None diff --git a/server/osa/domain/validation/port/storage.py b/server/osa/domain/validation/port/storage.py index b3436f5..353578b 100644 --- a/server/osa/domain/validation/port/storage.py +++ b/server/osa/domain/validation/port/storage.py @@ -15,3 +15,8 @@ class HookStoragePort(Port, Protocol): def get_hook_output_dir(self, deposition_srn: DepositionSRN, hook_name: str) -> Path: """Return the durable output directory for a hook's results.""" ... + + @abstractmethod + def get_files_dir(self, deposition_id: DepositionSRN) -> Path: + """Return the directory containing data files for a deposition.""" + ... diff --git a/server/osa/domain/validation/service/validation.py b/server/osa/domain/validation/service/validation.py index 07efb88..0783d9b 100644 --- a/server/osa/domain/validation/service/validation.py +++ b/server/osa/domain/validation/service/validation.py @@ -3,7 +3,6 @@ import logging import uuid from datetime import datetime, timezone -from pathlib import Path from typing import Any from osa.domain.shared.model.hook import HookDefinition @@ -101,14 +100,15 @@ async def validate_deposition( convention_srn: ConventionSRN, metadata: dict[str, Any], hooks: list[HookDefinition], - files_dir: str, ) -> tuple[ValidationRun, list[HookResult]]: """Full validation workflow using enriched event data.""" record_json = {"srn": str(deposition_srn), "metadata": metadata} + run_id = f"{deposition_srn.domain.root}_{deposition_srn.id.root}" + files_dir = self.hook_storage.get_files_dir(deposition_srn) inputs = HookInputs( record_json=record_json, - deposition_srn=deposition_srn, - files_dir=Path(files_dir) if files_dir else None, + run_id=run_id, + files_dir=files_dir, ) run = await self.create_run(inputs=inputs) diff --git a/server/osa/infrastructure/k8s/naming.py b/server/osa/infrastructure/k8s/naming.py index 149da9c..c349fe4 100644 --- a/server/osa/infrastructure/k8s/naming.py +++ b/server/osa/infrastructure/k8s/naming.py @@ -1,9 +1,13 @@ """K8s naming utilities: Job names (DNS-1035) and label values.""" +from __future__ import annotations + import re import secrets +from typing import TYPE_CHECKING -from osa.domain.shared.model.srn import SRN +if TYPE_CHECKING: + from osa.domain.shared.model.srn import SRN def sanitize_label(raw: str) -> str: @@ -17,22 +21,25 @@ def sanitize_label(raw: str) -> str: return sanitized[:63].strip("-._") -def label_value(srn: SRN) -> str: - """Convert an SRN to a K8s-safe label value. +def label_value(value: str | SRN) -> str: + """Convert a string or SRN to a K8s-safe label value. - Strips the constant ``urn:osa:`` prefix to save space within the - 63-char K8s label limit, then sanitizes for label compliance. + For SRN objects, strips the constant ``urn:osa:`` prefix to save space + within the 63-char K8s label limit. For plain strings, sanitizes directly. Examples: + label_value("run-abc123") → "run-abc123" label_value(DepositionSRN.parse("urn:osa:localhost:dep:abc123")) → "localhost.dep.abc123" """ - # Format: urn:osa:{domain}:{type}:{id}[@version] - # Strip "urn:osa:" prefix — it's constant and wastes label budget - compact = f"{srn.domain.root}.{srn.type.value}.{srn.id.root}" - if srn.version is not None: - compact += f".{srn.version}" - return sanitize_label(compact) + from osa.domain.shared.model.srn import SRN + + if isinstance(value, SRN): + compact = f"{value.domain.root}.{value.type.value}.{value.id.root}" + if value.version is not None: + compact += f".{value.version}" + return sanitize_label(compact) + return sanitize_label(value) def job_name(prefix: str, hook_name: str, deposition_srn: str) -> str: diff --git a/server/osa/infrastructure/k8s/runner.py b/server/osa/infrastructure/k8s/runner.py index dfc3536..457c799 100644 --- a/server/osa/infrastructure/k8s/runner.py +++ b/server/osa/infrastructure/k8s/runner.py @@ -12,7 +12,6 @@ from osa.config import K8sConfig from osa.domain.shared.error import InfrastructureError from osa.domain.shared.model.hook import HookDefinition -from osa.domain.shared.model.srn import DepositionSRN from osa.domain.validation.model.hook_result import HookResult, HookStatus from osa.domain.validation.port.hook_runner import HookInputs, HookRunner from osa.infrastructure.k8s.errors import classify_api_error @@ -83,7 +82,6 @@ async def run( hook, inputs, work_dir, - deposition_srn=inputs.deposition_srn, ) async def _run_job( @@ -93,8 +91,6 @@ async def _run_job( hook: HookDefinition, inputs: HookInputs, work_dir: Path, - *, - deposition_srn: DepositionSRN, ) -> HookResult: """Core Job lifecycle: check orphans → create → schedule → execute → parse → cleanup.""" namespace = self._config.namespace @@ -105,7 +101,7 @@ async def _run_job( try: existing = await self._check_existing_job( - batch_api, namespace, hook.name, deposition_srn + batch_api, namespace, hook.name, inputs.run_id ) if existing == "succeeded": @@ -120,7 +116,7 @@ async def _run_job( spec = self._build_job_spec( hook, work_dir, - deposition_srn=deposition_srn, + run_id=inputs.run_id, files_dir=inputs.files_dir, ) job_name_to_watch = spec.metadata.name @@ -133,7 +129,7 @@ async def _run_job( "namespace": namespace, "image": f"{hook.runtime.image}@{hook.runtime.digest}", "hook_name": hook.name, - "deposition_srn": deposition_srn, + "run_id": inputs.run_id, }, ) @@ -192,7 +188,7 @@ async def _check_existing_job( batch_api: BatchV1Api, namespace: str, hook_name: str, - deposition_srn: DepositionSRN, + run_id: str, ) -> str | None: """Check for existing Jobs with matching labels. @@ -201,7 +197,7 @@ async def _check_existing_job( "active:{job_name}" if a running Job exists None if no Job or only failed Jobs exist """ - label_selector = f"osa.io/hook={hook_name},osa.io/deposition={label_value(deposition_srn)}" + label_selector = f"osa.io/hook={hook_name},osa.io/run-id={label_value(run_id)}" try: job_list = await batch_api.list_namespaced_job(namespace, label_selector=label_selector) except Exception as exc: @@ -220,7 +216,7 @@ def _build_job_spec( hook: HookDefinition, work_dir: Path, *, - deposition_srn: DepositionSRN, + run_id: str, files_dir: Path | None = None, ) -> V1Job: """Build a K8s Job manifest for a hook execution.""" @@ -245,7 +241,7 @@ def _build_job_spec( V1VolumeMount, ) - name = job_name("hook", hook.name, str(deposition_srn)) + name = job_name("hook", hook.name, run_id) relative_work = self._relative_path(work_dir) input_subpath = f"{relative_work}/input" output_subpath = f"{relative_work}/output" @@ -253,7 +249,7 @@ def _build_job_spec( labels = { "osa.io/role": "hook", "osa.io/hook": hook.name, - "osa.io/deposition": label_value(deposition_srn), + "osa.io/run-id": label_value(run_id), } mounts = [ diff --git a/server/osa/infrastructure/oci/runner.py b/server/osa/infrastructure/oci/runner.py index d51ac68..7878991 100644 --- a/server/osa/infrastructure/oci/runner.py +++ b/server/osa/infrastructure/oci/runner.py @@ -90,7 +90,7 @@ async def _resolve_and_run(): logfire.error( "Hook timed out", hook=hook.name, - deposition_srn=inputs.deposition_srn, + run_id=inputs.run_id, timeout=timeout, ) return HookResult( diff --git a/server/osa/infrastructure/persistence/adapter/storage.py b/server/osa/infrastructure/persistence/adapter/storage.py index acb3e50..04c01da 100644 --- a/server/osa/infrastructure/persistence/adapter/storage.py +++ b/server/osa/infrastructure/persistence/adapter/storage.py @@ -53,12 +53,17 @@ def get_hook_output_dir(self, deposition_id: DepositionSRN, hook_name: str) -> P output_dir.mkdir(parents=True, exist_ok=True) return output_dir + def get_hook_output_root(self, source_type: str, source_id: str) -> str: + """Resolve the root directory for a given source type and id.""" + if source_type == "deposition": + srn = DepositionSRN.parse(source_id) + return str(self._dep_dir(srn)) + raise ValueError(f"Unknown source type: {source_type}") + async def read_hook_features( - self, deposition_id: DepositionSRN, hook_name: str + self, hook_output_dir: str, feature_name: str ) -> list[dict[str, Any]]: - features_file = ( - self._dep_dir(deposition_id) / "hooks" / hook_name / "output" / "features.json" - ) + features_file = Path(hook_output_dir) / "hooks" / feature_name / "output" / "features.json" if not features_file.exists(): return [] data = json.loads(features_file.read_text()) @@ -68,10 +73,8 @@ async def read_hook_features( return [data] return [] - async def hook_features_exist(self, deposition_id: DepositionSRN, hook_name: str) -> bool: - features_file = ( - self._dep_dir(deposition_id) / "hooks" / hook_name / "output" / "features.json" - ) + async def hook_features_exist(self, hook_output_dir: str, feature_name: str) -> bool: + features_file = Path(hook_output_dir) / "hooks" / feature_name / "output" / "features.json" return features_file.exists() async def save_file( diff --git a/server/osa/infrastructure/persistence/mappers/record.py b/server/osa/infrastructure/persistence/mappers/record.py index 237ec54..97ad7db 100644 --- a/server/osa/infrastructure/persistence/mappers/record.py +++ b/server/osa/infrastructure/persistence/mappers/record.py @@ -3,9 +3,13 @@ from datetime import datetime from typing import Any +from pydantic import TypeAdapter + from osa.domain.record.model.aggregate import Record -from osa.domain.record.model.value import IndexRef -from osa.domain.shared.model.srn import DepositionSRN, RecordSRN +from osa.domain.shared.model.source import RecordSource +from osa.domain.shared.model.srn import ConventionSRN, RecordSRN + +_source_adapter = TypeAdapter(RecordSource) def row_to_record(row: dict[str, Any]) -> Record: @@ -14,29 +18,23 @@ def row_to_record(row: dict[str, Any]) -> Record: if isinstance(published_at, str): published_at = datetime.fromisoformat(published_at) - # Deserialize indexes - raw_indexes = row.get("indexes", {}) or {} - indexes: dict[str, IndexRef] = { - key: IndexRef.model_validate(value) for key, value in raw_indexes.items() - } + source = _source_adapter.validate_python(row["source"]) return Record( srn=RecordSRN.parse(row["srn"]), - deposition_srn=DepositionSRN.parse(row["deposition_srn"]), + source=source, + convention_srn=ConventionSRN.parse(row["convention_srn"]), metadata=row.get("metadata", {}), - indexes=indexes, published_at=published_at, ) def record_to_dict(record: Record) -> dict[str, Any]: """Convert Record aggregate to database dict.""" - indexes_dict = {key: ref.model_dump(mode="json") for key, ref in record.indexes.items()} - return { "srn": str(record.srn), - "deposition_srn": str(record.deposition_srn), + "convention_srn": str(record.convention_srn), + "source": _source_adapter.dump_python(record.source, mode="json"), "metadata": record.metadata, - "indexes": indexes_dict, "published_at": record.published_at, } diff --git a/server/osa/infrastructure/persistence/repository/record.py b/server/osa/infrastructure/persistence/repository/record.py index ddb6dc7..cb40645 100644 --- a/server/osa/infrastructure/persistence/repository/record.py +++ b/server/osa/infrastructure/persistence/repository/record.py @@ -5,7 +5,7 @@ from osa.domain.record.model.aggregate import Record from osa.domain.record.port.repository import RecordRepository -from osa.domain.shared.model.srn import DepositionSRN, RecordSRN +from osa.domain.shared.model.srn import RecordSRN from osa.infrastructure.persistence.mappers.record import record_to_dict, row_to_record from osa.infrastructure.persistence.tables import records_table @@ -30,9 +30,12 @@ async def get(self, srn: RecordSRN) -> Record | None: row = result.mappings().first() return row_to_record(dict(row)) if row else None - async def find_by_deposition(self, deposition_srn: DepositionSRN) -> Record | None: - """Find the record created from a deposition.""" - stmt = select(records_table).where(records_table.c.deposition_srn == str(deposition_srn)) + async def find_by_source(self, source_type: str, source_id: str) -> Record | None: + """Find a record by source type and id.""" + stmt = select(records_table).where( + records_table.c.source["type"].as_string() == source_type, + records_table.c.source["id"].as_string() == source_id, + ) result = await self.session.execute(stmt) row = result.mappings().first() return row_to_record(dict(row)) if row else None diff --git a/server/osa/infrastructure/persistence/tables.py b/server/osa/infrastructure/persistence/tables.py index 0132112..150f951 100644 --- a/server/osa/infrastructure/persistence/tables.py +++ b/server/osa/infrastructure/persistence/tables.py @@ -65,13 +65,24 @@ "records", metadata, Column("srn", String, primary_key=True), - Column("deposition_srn", String, nullable=False), + Column("convention_srn", Text, nullable=False), + Column("source", JSONB, nullable=False), Column("metadata", JSONB, nullable=False), - Column("indexes", JSON, nullable=False), Column("published_at", DateTime(timezone=True), nullable=False), ) -Index("idx_records_deposition_srn", records_table.c.deposition_srn) +Index("idx_records_convention_srn", records_table.c.convention_srn) +Index( + "uq_records_convention_source", + records_table.c.convention_srn, + records_table.c.source["type"].as_string(), + records_table.c.source["id"].as_string(), + unique=True, +) +Index( + "idx_records_source_type", + records_table.c.source["type"].as_string(), +) Index("idx_records_published_at", records_table.c.published_at) Index( "idx_records_metadata_gin", diff --git a/server/osa/infrastructure/s3/storage.py b/server/osa/infrastructure/s3/storage.py index c154127..2b37eda 100644 --- a/server/osa/infrastructure/s3/storage.py +++ b/server/osa/infrastructure/s3/storage.py @@ -172,10 +172,19 @@ def get_hook_output_dir(self, deposition_id: DepositionSRN, hook_name: str) -> P # ── FeatureStoragePort ─────────────────────────────────────────── + def get_hook_output_root(self, source_type: str, source_id: str) -> str: + """Resolve the hook output root path for a given source type and id.""" + if source_type == "deposition": + srn = DepositionSRN.parse(source_id) + safe_id = self._safe_id(srn) + return f"{self._data_mount_path}/depositions/{safe_id}" + raise ValueError(f"Unknown source type: {source_type}") + async def read_hook_features( - self, deposition_id: DepositionSRN, hook_name: str + self, hook_output_dir: str, feature_name: str ) -> list[dict[str, Any]]: - key = f"{self._dep_prefix(deposition_id)}/hooks/{hook_name}/output/features.json" + prefix = relative_path(Path(hook_output_dir), self._data_mount_path) + key = f"{prefix}/hooks/{feature_name}/output/features.json" try: data_bytes = await self._s3.get_object(key) except Exception: @@ -187,6 +196,7 @@ async def read_hook_features( return [data] return [] - async def hook_features_exist(self, deposition_id: DepositionSRN, hook_name: str) -> bool: - key = f"{self._dep_prefix(deposition_id)}/hooks/{hook_name}/output/features.json" + async def hook_features_exist(self, hook_output_dir: str, feature_name: str) -> bool: + prefix = relative_path(Path(hook_output_dir), self._data_mount_path) + key = f"{prefix}/hooks/{feature_name}/output/features.json" return await self._s3.head_object(key) diff --git a/server/tests/unit/domain/deposition/test_deposition_service.py b/server/tests/unit/domain/deposition/test_deposition_service.py index 22b0213..69a5494 100644 --- a/server/tests/unit/domain/deposition/test_deposition_service.py +++ b/server/tests/unit/domain/deposition/test_deposition_service.py @@ -343,6 +343,9 @@ async def test_delete_file_emits_event(self): class TestDepositionServiceSubmit: @pytest.mark.asyncio async def test_submit_with_enough_files(self): + from pathlib import Path + from unittest.mock import MagicMock + dep = _make_deposition( files=[ DepositionFile(name="a.csv", size=10, checksum="x", uploaded_at=datetime.now(UTC)) @@ -354,8 +357,12 @@ async def test_submit_with_enough_files(self): conv_repo = AsyncMock() conv_repo.get.return_value = conv outbox = AsyncMock() + file_storage = MagicMock() + file_storage.get_files_dir.return_value = Path("/data/depositions/test-dep/files") - service = _make_service(dep_repo=dep_repo, conv_repo=conv_repo, outbox=outbox) + service = _make_service( + dep_repo=dep_repo, conv_repo=conv_repo, outbox=outbox, file_storage=file_storage + ) result = await service.submit(dep.srn) assert result.status == DepositionStatus.IN_VALIDATION outbox.append.assert_called_once() diff --git a/server/tests/unit/domain/deposition/test_deposition_submitted_enriched.py b/server/tests/unit/domain/deposition/test_deposition_submitted_enriched.py index d96224a..2ec5ee8 100644 --- a/server/tests/unit/domain/deposition/test_deposition_submitted_enriched.py +++ b/server/tests/unit/domain/deposition/test_deposition_submitted_enriched.py @@ -1,7 +1,6 @@ """Unit tests for enriched DepositionSubmittedEvent. -Tests for User Story 3: Cross-domain decoupling. -Verifies the event carries convention_srn, hooks, and files_dir. +Verifies the event carries convention_srn and hooks. """ from uuid import uuid4 @@ -49,7 +48,6 @@ def test_carries_convention_srn(self): metadata={"title": "Test"}, convention_srn=_make_conv_srn(), hooks=[_make_hook_definition()], - files_dir="/data/files/test-dep", ) assert event.convention_srn == _make_conv_srn() @@ -62,20 +60,7 @@ def test_carries_hooks(self): metadata={"title": "Test"}, convention_srn=_make_conv_srn(), hooks=[hook], - files_dir="/data/files/test-dep", ) assert len(event.hooks) == 1 assert event.hooks[0].name == "pocketeer" assert event.hooks[0].runtime.digest == "sha256:abc123" - - def test_carries_files_dir(self): - """Event has files_dir field.""" - event = DepositionSubmittedEvent( - id=EventId(uuid4()), - deposition_id=_make_dep_srn(), - metadata={"title": "Test"}, - convention_srn=_make_conv_srn(), - hooks=[], - files_dir="/data/files/test-dep", - ) - assert event.files_dir == "/data/files/test-dep" diff --git a/server/tests/unit/domain/deposition/test_event_chain.py b/server/tests/unit/domain/deposition/test_event_chain.py index 5d06d63..3a6243a 100644 --- a/server/tests/unit/domain/deposition/test_event_chain.py +++ b/server/tests/unit/domain/deposition/test_event_chain.py @@ -1,7 +1,7 @@ -"""Unit tests verifying the event chain: DepositionSubmitted → Validate → Approve → Record.""" +"""Unit tests verifying the event chain: DepositionSubmitted -> Validate -> Approve -> Record.""" from datetime import UTC, datetime -from unittest.mock import AsyncMock, MagicMock +from unittest.mock import AsyncMock, MagicMock, Mock from uuid import uuid4 import pytest @@ -21,6 +21,7 @@ OciConfig, TableFeatureSpec, ) +from osa.domain.shared.model.source import DepositionSource from osa.domain.shared.model.srn import ( ConventionSRN, DepositionSRN, @@ -79,7 +80,6 @@ def _make_submitted_event( metadata={"title": "Test"}, convention_srn=_make_conv_srn(), hooks=hooks or [], - files_dir="/data/files/test-dep", ) @@ -136,7 +136,6 @@ async def test_handler_delegates_with_event_data(self): convention_srn=_make_conv_srn(), metadata={"title": "Test"}, hooks=hooks, - files_dir="/data/files/test-dep", ) @@ -220,29 +219,29 @@ async def test_skips_approve_on_failed(self): class TestConvertDepositionToRecord: @pytest.mark.asyncio async def test_publishes_record(self): + from osa.domain.curation.event.deposition_approved import DepositionApproved + from osa.domain.record.model.draft import RecordDraft + service = AsyncMock() handler = ConvertDepositionToRecord(service=service) - from osa.domain.curation.event.deposition_approved import DepositionApproved - - hooks = [_make_hook_definition()] event = DepositionApproved( id=EventId(uuid4()), deposition_srn=_make_dep_srn(), metadata={"title": "Test"}, convention_srn=_make_conv_srn(), - hooks=hooks, - files_dir="/data/files/test-dep", + expected_features=["pocket_detect"], ) await handler.handle(event) - service.publish_record.assert_called_once_with( - deposition_srn=_make_dep_srn(), - metadata={"title": "Test"}, - convention_srn=_make_conv_srn(), - hooks=hooks, - files_dir="/data/files/test-dep", - ) + service.publish_record.assert_called_once() + draft = service.publish_record.call_args[0][0] + assert isinstance(draft, RecordDraft) + assert isinstance(draft.source, DepositionSource) + assert draft.source.id == str(_make_dep_srn()) + assert draft.convention_srn == _make_conv_srn() + assert draft.expected_features == ["pocket_detect"] + assert draft.metadata == {"title": "Test"} class TestInsertRecordFeatures: @@ -250,23 +249,29 @@ class TestInsertRecordFeatures: async def test_delegates_to_feature_service(self): """InsertRecordFeatures delegates to feature_service.insert_features_for_record.""" feature_service = AsyncMock() + feature_storage = Mock() + feature_storage.get_hook_output_root.return_value = "/fake/output/dir" handler = InsertRecordFeatures( feature_service=feature_service, + feature_storage=feature_storage, ) - hooks = [_make_hook_definition()] event = RecordPublished( id=EventId(uuid4()), record_srn=_make_record_srn(), - deposition_srn=_make_dep_srn(), + source=DepositionSource(id=str(_make_dep_srn())), metadata={"title": "Test"}, - hooks=hooks, + convention_srn=_make_conv_srn(), + expected_features=["pocket_detect"], ) await handler.handle(event) + feature_storage.get_hook_output_root.assert_called_once_with( + "deposition", str(_make_dep_srn()) + ) feature_service.insert_features_for_record.assert_called_once_with( - deposition_srn=_make_dep_srn(), + hook_output_dir="/fake/output/dir", record_srn=str(_make_record_srn()), - hooks=hooks, + expected_features=["pocket_detect"], ) diff --git a/server/tests/unit/domain/feature/test_feature_service_decoupled.py b/server/tests/unit/domain/feature/test_feature_service_decoupled.py index 71dab65..b8984d9 100644 --- a/server/tests/unit/domain/feature/test_feature_service_decoupled.py +++ b/server/tests/unit/domain/feature/test_feature_service_decoupled.py @@ -58,7 +58,7 @@ def test_no_file_storage_dependency(self): @pytest.mark.asyncio async def test_insert_features_for_record_uses_event_data(self): - """insert_features_for_record accepts hooks directly.""" + """insert_features_for_record accepts hook_output_dir + expected_features directly.""" feature_store = AsyncMock() feature_store.insert_features.return_value = 3 feature_storage = AsyncMock() @@ -74,12 +74,11 @@ async def test_insert_features_for_record_uses_event_data(self): feature_storage=feature_storage, ) - hook = _make_hook_definition() await service.insert_features_for_record( - deposition_srn=_make_dep_srn(), + hook_output_dir="/fake/output/dir", record_srn="urn:osa:localhost:rec:test@1", - hooks=[hook], + expected_features=["pocketeer"], ) - feature_storage.hook_features_exist.assert_called_once_with(_make_dep_srn(), "pocketeer") + feature_storage.hook_features_exist.assert_called_once_with("/fake/output/dir", "pocketeer") feature_store.insert_features.assert_called_once() diff --git a/server/tests/unit/domain/feature/test_insert_record_features.py b/server/tests/unit/domain/feature/test_insert_record_features.py index 26629b8..114404a 100644 --- a/server/tests/unit/domain/feature/test_insert_record_features.py +++ b/server/tests/unit/domain/feature/test_insert_record_features.py @@ -1,6 +1,6 @@ """Unit tests for InsertRecordFeatures event handler and FeatureService.insert_features_for_record.""" -from unittest.mock import AsyncMock +from unittest.mock import AsyncMock, MagicMock from uuid import uuid4 import pytest @@ -9,47 +9,31 @@ from osa.domain.feature.service.feature import FeatureService from osa.domain.record.event.record_published import RecordPublished from osa.domain.shared.event import EventId -from osa.domain.shared.model.hook import ( - ColumnDef, - HookDefinition, - OciConfig, - TableFeatureSpec, -) +from osa.domain.shared.model.source import DepositionSource, HarvestSource from osa.domain.shared.model.srn import ( - DepositionSRN, + ConventionSRN, RecordSRN, ) -def _make_dep_srn() -> DepositionSRN: - return DepositionSRN.parse("urn:osa:localhost:dep:test-dep") - - def _make_record_srn() -> RecordSRN: return RecordSRN.parse("urn:osa:localhost:rec:test-rec@1") -def _make_hook_definition(name: str = "pocket_detect") -> HookDefinition: - return HookDefinition( - name=name, - runtime=OciConfig( - image="ghcr.io/example/hook", - digest="sha256:abc123", - ), - feature=TableFeatureSpec( - cardinality="many", - columns=[ColumnDef(name="score", json_type="number", required=True)], - ), - ) +def _make_conv_srn() -> ConventionSRN: + return ConventionSRN.parse("urn:osa:localhost:conv:test@1.0.0") -def _make_event(hooks: list[HookDefinition] | None = None) -> RecordPublished: +def _make_event( + expected_features: list[str] | None = None, +) -> RecordPublished: return RecordPublished( id=EventId(uuid4()), record_srn=_make_record_srn(), - deposition_srn=_make_dep_srn(), + source=DepositionSource(id="urn:osa:localhost:dep:test-dep"), metadata={"title": "Test"}, - hooks=hooks or [], + convention_srn=_make_conv_srn(), + expected_features=expected_features or [], ) @@ -65,9 +49,14 @@ def _make_feature_service( def _make_handler( feature_service: FeatureService | AsyncMock | None = None, + feature_storage: MagicMock | None = None, ) -> InsertRecordFeatures: + storage = feature_storage or MagicMock() + if not feature_storage: + storage.get_hook_output_root = MagicMock(return_value="/fake/output/dir") return InsertRecordFeatures( feature_service=feature_service or AsyncMock(), + feature_storage=storage, ) @@ -78,14 +67,15 @@ async def test_delegates_to_feature_service(self): feature_service = AsyncMock() handler = _make_handler(feature_service=feature_service) - hooks = [_make_hook_definition()] - event = _make_event(hooks=hooks) + event = _make_event( + expected_features=["pocket_detect"], + ) await handler.handle(event) feature_service.insert_features_for_record.assert_called_once_with( - deposition_srn=event.deposition_srn, + hook_output_dir="/fake/output/dir", record_srn=str(event.record_srn), - hooks=event.hooks, + expected_features=["pocket_detect"], ) @@ -105,9 +95,10 @@ async def test_inserts_features_from_cold_storage(self): feature_storage=feature_storage, ) - hooks = [_make_hook_definition()] await service.insert_features_for_record( - _make_dep_srn(), str(_make_record_srn()), hooks=hooks + hook_output_dir="/fake/output/dir", + record_srn=str(_make_record_srn()), + expected_features=["pocket_detect"], ) feature_store.insert_features.assert_called_once_with( @@ -117,8 +108,8 @@ async def test_inserts_features_from_cold_storage(self): ) @pytest.mark.asyncio - async def test_skips_hooks_without_features_file(self): - """Hooks that didn't produce features.json are skipped.""" + async def test_skips_features_without_features_file(self): + """Features that didn't produce features.json are skipped with a warning.""" feature_storage = AsyncMock() feature_storage.hook_features_exist.return_value = False @@ -129,9 +120,10 @@ async def test_skips_hooks_without_features_file(self): feature_storage=feature_storage, ) - hooks = [_make_hook_definition()] await service.insert_features_for_record( - _make_dep_srn(), str(_make_record_srn()), hooks=hooks + hook_output_dir="/fake/output/dir", + record_srn=str(_make_record_srn()), + expected_features=["pocket_detect"], ) feature_storage.read_hook_features.assert_not_called() @@ -139,7 +131,7 @@ async def test_skips_hooks_without_features_file(self): @pytest.mark.asyncio async def test_skips_empty_feature_list(self): - """Hooks that produced empty features.json are skipped.""" + """Features that produced empty features.json are skipped.""" feature_storage = AsyncMock() feature_storage.hook_features_exist.return_value = True feature_storage.read_hook_features.return_value = [] @@ -151,16 +143,17 @@ async def test_skips_empty_feature_list(self): feature_storage=feature_storage, ) - hooks = [_make_hook_definition()] await service.insert_features_for_record( - _make_dep_srn(), str(_make_record_srn()), hooks=hooks + hook_output_dir="/fake/output/dir", + record_srn=str(_make_record_srn()), + expected_features=["pocket_detect"], ) feature_store.insert_features.assert_not_called() @pytest.mark.asyncio - async def test_handles_multiple_hooks(self): - """Processes all hooks in the event payload.""" + async def test_handles_multiple_features(self): + """Processes all expected features.""" feature_storage = AsyncMock() feature_storage.hook_features_exist.return_value = True feature_storage.read_hook_features.side_effect = [ @@ -176,16 +169,17 @@ async def test_handles_multiple_hooks(self): feature_storage=feature_storage, ) - hooks = [_make_hook_definition("hook_a"), _make_hook_definition("hook_b")] await service.insert_features_for_record( - _make_dep_srn(), str(_make_record_srn()), hooks=hooks + hook_output_dir="/fake/output/dir", + record_srn=str(_make_record_srn()), + expected_features=["hook_a", "hook_b"], ) assert feature_store.insert_features.call_count == 2 @pytest.mark.asyncio - async def test_no_hooks_is_noop(self): - """No-op when hooks list is empty.""" + async def test_no_features_is_noop(self): + """No-op when expected_features list is empty.""" feature_store = AsyncMock() feature_storage = AsyncMock() @@ -194,25 +188,44 @@ async def test_no_hooks_is_noop(self): feature_storage=feature_storage, ) - await service.insert_features_for_record(_make_dep_srn(), str(_make_record_srn()), hooks=[]) + await service.insert_features_for_record( + hook_output_dir="/fake/output/dir", + record_srn=str(_make_record_srn()), + expected_features=[], + ) feature_storage.hook_features_exist.assert_not_called() feature_store.insert_features.assert_not_called() - @pytest.mark.asyncio - async def test_none_hooks_is_noop(self): - """No-op when hooks is None.""" - feature_store = AsyncMock() - feature_storage = AsyncMock() - service = _make_feature_service( - feature_store=feature_store, - feature_storage=feature_storage, - ) +class TestInsertRecordFeaturesHarvestSource: + """US2: InsertRecordFeatures works identically for harvest-sourced records.""" - await service.insert_features_for_record( - _make_dep_srn(), str(_make_record_srn()), hooks=None + @pytest.mark.asyncio + async def test_harvest_source_uses_source_fields(self): + """Handler uses source type and id from event regardless of source type.""" + feature_service = AsyncMock() + storage = MagicMock() + storage.get_hook_output_root.return_value = "/fake/harvest/dir" + handler = _make_handler(feature_service=feature_service, feature_storage=storage) + + event = RecordPublished( + id=EventId(uuid4()), + record_srn=_make_record_srn(), + source=HarvestSource( + id="run-123-pdb-456", + harvest_run_srn="urn:osa:localhost:val:run123", + upstream_source="pdb", + ), + metadata={"title": "Harvested"}, + convention_srn=_make_conv_srn(), + expected_features=["pocket_detect"], ) + await handler.handle(event) - feature_storage.hook_features_exist.assert_not_called() - feature_store.insert_features.assert_not_called() + storage.get_hook_output_root.assert_called_once_with("harvest", "run-123-pdb-456") + feature_service.insert_features_for_record.assert_called_once_with( + hook_output_dir="/fake/harvest/dir", + record_srn=str(_make_record_srn()), + expected_features=["pocket_detect"], + ) diff --git a/server/tests/unit/domain/index/test_fanout_listener.py b/server/tests/unit/domain/index/test_fanout_listener.py index 7355fe9..329cf08 100644 --- a/server/tests/unit/domain/index/test_fanout_listener.py +++ b/server/tests/unit/domain/index/test_fanout_listener.py @@ -11,7 +11,15 @@ from osa.domain.index.model.registry import IndexRegistry from osa.domain.record.event.record_published import RecordPublished from osa.domain.shared.event import EventId -from osa.domain.shared.model.srn import DepositionSRN, Domain, LocalId, RecordSRN, RecordVersion +from osa.domain.shared.model.source import DepositionSource +from osa.domain.shared.model.srn import ( + ConventionSRN, + DepositionSRN, + Domain, + LocalId, + RecordSRN, + RecordVersion, +) class FakeBackend: @@ -87,7 +95,8 @@ async def test_creates_index_record_per_backend( event = RecordPublished( id=EventId(uuid4()), record_srn=sample_record_srn, - deposition_srn=sample_deposition_srn, + source=DepositionSource(id=str(sample_deposition_srn)), + convention_srn=ConventionSRN.parse("urn:osa:localhost:conv:test@1.0.0"), metadata=sample_metadata, ) @@ -126,7 +135,8 @@ async def test_creates_unique_event_ids( event = RecordPublished( id=EventId(uuid4()), record_srn=sample_record_srn, - deposition_srn=sample_deposition_srn, + source=DepositionSource(id=str(sample_deposition_srn)), + convention_srn=ConventionSRN.parse("urn:osa:localhost:conv:test@1.0.0"), metadata=sample_metadata, ) @@ -154,7 +164,8 @@ async def test_empty_registry_creates_no_events( event = RecordPublished( id=EventId(uuid4()), record_srn=sample_record_srn, - deposition_srn=sample_deposition_srn, + source=DepositionSource(id=str(sample_deposition_srn)), + convention_srn=ConventionSRN.parse("urn:osa:localhost:conv:test@1.0.0"), metadata=sample_metadata, ) diff --git a/server/tests/unit/domain/record/test_get_record_handler.py b/server/tests/unit/domain/record/test_get_record_handler.py index a1e175b..8fea888 100644 --- a/server/tests/unit/domain/record/test_get_record_handler.py +++ b/server/tests/unit/domain/record/test_get_record_handler.py @@ -7,21 +7,23 @@ from osa.domain.record.model.aggregate import Record from osa.domain.shared.error import NotFoundError -from osa.domain.shared.model.srn import DepositionSRN, RecordSRN +from osa.domain.shared.model.source import DepositionSource +from osa.domain.shared.model.srn import ConventionSRN, RecordSRN def _make_record_srn() -> RecordSRN: return RecordSRN.parse("urn:osa:localhost:rec:test-rec@1") -def _make_dep_srn() -> DepositionSRN: - return DepositionSRN.parse("urn:osa:localhost:dep:test-dep") +def _make_conv_srn() -> ConventionSRN: + return ConventionSRN.parse("urn:osa:localhost:conv:test@1.0.0") def _make_record() -> Record: return Record( srn=_make_record_srn(), - deposition_srn=_make_dep_srn(), + source=DepositionSource(id="urn:osa:localhost:dep:test-dep"), + convention_srn=_make_conv_srn(), metadata={"title": "Test Protein"}, published_at=datetime.now(UTC), ) @@ -41,7 +43,8 @@ async def test_returns_record_detail(self): result = await handler.run(GetRecord(srn=record.srn)) assert result.srn == record.srn - assert result.deposition_srn == record.deposition_srn + assert result.source == record.source + assert result.convention_srn == record.convention_srn assert result.metadata == record.metadata service.get.assert_called_once_with(record.srn) diff --git a/server/tests/unit/domain/record/test_record_draft.py b/server/tests/unit/domain/record/test_record_draft.py new file mode 100644 index 0000000..68ebef2 --- /dev/null +++ b/server/tests/unit/domain/record/test_record_draft.py @@ -0,0 +1,38 @@ +"""Unit tests for RecordDraft value object.""" + +from osa.domain.record.model.draft import RecordDraft +from osa.domain.shared.model.source import DepositionSource +from osa.domain.shared.model.srn import ConventionSRN + + +def _make_conv_srn() -> ConventionSRN: + return ConventionSRN.parse("urn:osa:localhost:conv:test@1.0.0") + + +class TestRecordDraft: + def test_construction(self): + draft = RecordDraft( + source=DepositionSource(id="urn:osa:localhost:dep:abc"), + metadata={"title": "Test"}, + convention_srn=_make_conv_srn(), + ) + assert draft.source.type == "deposition" + assert draft.metadata == {"title": "Test"} + assert draft.convention_srn == _make_conv_srn() + + def test_expected_features_defaults_empty(self): + draft = RecordDraft( + source=DepositionSource(id="dep-1"), + metadata={}, + convention_srn=_make_conv_srn(), + ) + assert draft.expected_features == [] + + def test_expected_features_can_be_set(self): + draft = RecordDraft( + source=DepositionSource(id="dep-1"), + metadata={}, + convention_srn=_make_conv_srn(), + expected_features=["pocket_detect", "qc_check"], + ) + assert draft.expected_features == ["pocket_detect", "qc_check"] diff --git a/server/tests/unit/domain/record/test_record_features.py b/server/tests/unit/domain/record/test_record_features.py index 700f967..e1143f4 100644 --- a/server/tests/unit/domain/record/test_record_features.py +++ b/server/tests/unit/domain/record/test_record_features.py @@ -8,7 +8,8 @@ from osa.domain.record.model.aggregate import Record from osa.domain.record.query.get_record import GetRecord, GetRecordHandler, RecordDetail from osa.domain.record.service.record import RecordService -from osa.domain.shared.model.srn import DepositionSRN, Domain, RecordSRN +from osa.domain.shared.model.source import DepositionSource +from osa.domain.shared.model.srn import ConventionSRN, Domain, RecordSRN from osa.infrastructure.persistence.adapter.feature_reader import PostgresFeatureReader @@ -34,7 +35,6 @@ async def test_returns_dict_keyed_by_hook_name( ) -> None: srn = RecordSRN.parse("urn:osa:localhost:rec:abc@1") - # First call: catalog query catalog_result = MagicMock() catalog_result.mappings.return_value.all.return_value = [ _make_catalog_row( @@ -47,8 +47,6 @@ async def test_returns_dict_keyed_by_hook_name( ) ] - # Second call: UNION ALL query returning {hook_name, row_data} mappings - # row_data now excludes auto columns (jsonb_build_object only includes data cols) feature_result = MagicMock() feature_result.mappings.return_value = [ { @@ -129,7 +127,6 @@ async def test_returns_empty_when_record_has_no_data( ) ] - # UNION ALL returns no rows when record has no feature data feature_result = MagicMock() feature_result.mappings.return_value = [] @@ -153,8 +150,6 @@ async def test_includes_data_from_multiple_tables( ), ] - # Single UNION ALL result containing rows from both tables - # row_data now excludes auto columns feature_result = MagicMock() feature_result.mappings.return_value = [ { @@ -180,7 +175,8 @@ async def test_includes_data_from_multiple_tables( def _make_record() -> Record: return Record( srn=RecordSRN.parse("urn:osa:localhost:rec:abc@1"), - deposition_srn=DepositionSRN.parse("urn:osa:localhost:dep:dep1"), + source=DepositionSource(id="urn:osa:localhost:dep:dep1"), + convention_srn=ConventionSRN.parse("urn:osa:localhost:conv:test@1.0.0"), metadata={"title": "Test"}, published_at=datetime.now(UTC), ) @@ -240,6 +236,7 @@ async def test_existing_behavior_preserved(self) -> None: result: RecordDetail = await handler.run(GetRecord(srn=record.srn)) assert result.srn == record.srn - assert result.deposition_srn == record.deposition_srn + assert result.source == record.source + assert result.convention_srn == record.convention_srn assert result.metadata == record.metadata mock_service.get.assert_called_once_with(record.srn) diff --git a/server/tests/unit/domain/record/test_record_published_enriched.py b/server/tests/unit/domain/record/test_record_published_enriched.py index a9ef039..bee3f0e 100644 --- a/server/tests/unit/domain/record/test_record_published_enriched.py +++ b/server/tests/unit/domain/record/test_record_published_enriched.py @@ -1,71 +1,48 @@ """Unit tests for enriched RecordPublished event. -Tests for User Story 3: Cross-domain decoupling. -Verifies the event carries convention_srn, hooks, and files_dir. +Verifies the event carries source, convention_srn, and expected_features. """ from uuid import uuid4 from osa.domain.record.event.record_published import RecordPublished from osa.domain.shared.event import EventId -from osa.domain.shared.model.hook import ( - ColumnDef, - HookDefinition, - OciConfig, - TableFeatureSpec, -) -from osa.domain.shared.model.srn import ConventionSRN, DepositionSRN, RecordSRN - - -def _make_hook_definition() -> HookDefinition: - return HookDefinition( - name="pocketeer", - runtime=OciConfig( - image="osa-hooks/pocketeer:latest", - digest="sha256:abc123", - ), - feature=TableFeatureSpec( - cardinality="many", - columns=[ColumnDef(name="score", json_type="number", required=True)], - ), - ) +from osa.domain.shared.model.source import DepositionSource +from osa.domain.shared.model.srn import ConventionSRN, RecordSRN class TestRecordPublishedEnriched: - def test_carries_convention_srn(self): + def test_carries_source(self): + source = DepositionSource(id="urn:osa:localhost:dep:test") event = RecordPublished( id=EventId(uuid4()), record_srn=RecordSRN.parse("urn:osa:localhost:rec:test@1"), - deposition_srn=DepositionSRN.parse("urn:osa:localhost:dep:test"), + source=source, metadata={"title": "Test"}, convention_srn=ConventionSRN.parse("urn:osa:localhost:conv:test@1.0.0"), - hooks=[_make_hook_definition()], - files_dir="/data/files/test-dep", + expected_features=["pocketeer"], ) - assert event.convention_srn == ConventionSRN.parse("urn:osa:localhost:conv:test@1.0.0") + assert event.source.type == "deposition" + assert event.source.id == "urn:osa:localhost:dep:test" - def test_carries_hooks(self): - hook = _make_hook_definition() + def test_carries_convention_srn(self): event = RecordPublished( id=EventId(uuid4()), record_srn=RecordSRN.parse("urn:osa:localhost:rec:test@1"), - deposition_srn=DepositionSRN.parse("urn:osa:localhost:dep:test"), + source=DepositionSource(id="urn:osa:localhost:dep:test"), metadata={"title": "Test"}, convention_srn=ConventionSRN.parse("urn:osa:localhost:conv:test@1.0.0"), - hooks=[hook], - files_dir="/data/files/test-dep", + expected_features=[], ) - assert len(event.hooks) == 1 - assert event.hooks[0].name == "pocketeer" + assert event.convention_srn == ConventionSRN.parse("urn:osa:localhost:conv:test@1.0.0") - def test_carries_files_dir(self): + def test_carries_expected_features(self): event = RecordPublished( id=EventId(uuid4()), record_srn=RecordSRN.parse("urn:osa:localhost:rec:test@1"), - deposition_srn=DepositionSRN.parse("urn:osa:localhost:dep:test"), + source=DepositionSource(id="urn:osa:localhost:dep:test"), metadata={"title": "Test"}, convention_srn=ConventionSRN.parse("urn:osa:localhost:conv:test@1.0.0"), - hooks=[], - files_dir="/data/files/test-dep", + expected_features=["pocketeer", "qc_check"], ) - assert event.files_dir == "/data/files/test-dep" + assert event.expected_features == ["pocketeer", "qc_check"] diff --git a/server/tests/unit/domain/record/test_record_service.py b/server/tests/unit/domain/record/test_record_service.py index c78b778..2400d82 100644 --- a/server/tests/unit/domain/record/test_record_service.py +++ b/server/tests/unit/domain/record/test_record_service.py @@ -6,15 +6,23 @@ import pytest from osa.domain.record.event.record_published import RecordPublished +from osa.domain.record.model.draft import RecordDraft from osa.domain.record.port.repository import RecordRepository from osa.domain.record.service.record import RecordService -from osa.domain.shared.model.srn import DepositionSRN, Domain, LocalId +from osa.domain.shared.model.source import ( + DepositionSource, + HarvestSource, +) +from osa.domain.shared.model.srn import ConventionSRN, DepositionSRN, Domain, LocalId from osa.domain.shared.outbox import Outbox +def _make_conv_srn() -> ConventionSRN: + return ConventionSRN.parse("urn:osa:localhost:conv:test@1.0.0") + + @pytest.fixture def mock_record_repo() -> RecordRepository: - """Create a mock RecordRepository.""" repo = MagicMock(spec=RecordRepository) repo.save = AsyncMock() return repo @@ -22,7 +30,6 @@ def mock_record_repo() -> RecordRepository: @pytest.fixture def mock_outbox() -> Outbox: - """Create a mock Outbox.""" outbox = MagicMock(spec=Outbox) outbox.append = AsyncMock() return outbox @@ -30,43 +37,30 @@ def mock_outbox() -> Outbox: @pytest.fixture def node_domain() -> Domain: - """Create test node domain.""" return Domain("test.example.com") @pytest.fixture -def sample_deposition_srn(node_domain: Domain) -> DepositionSRN: - """Create a sample deposition SRN.""" - return DepositionSRN( - domain=node_domain, - id=LocalId(str(uuid4())), +def sample_draft(node_domain: Domain) -> RecordDraft: + dep_srn = DepositionSRN(domain=node_domain, id=LocalId(str(uuid4()))) + return RecordDraft( + source=DepositionSource(id=str(dep_srn)), + metadata={"title": "Test Record", "organism": "human", "platform": "GPL570"}, + convention_srn=_make_conv_srn(), + expected_features=["pocket_detect"], ) -@pytest.fixture -def sample_metadata() -> dict: - """Create sample metadata for testing.""" - return { - "title": "Test Record", - "organism": "human", - "platform": "GPL570", - } - - class TestRecordService: - """Tests for RecordService.""" - @pytest.mark.asyncio async def test_publish_record_creates_record( self, mock_record_repo: RecordRepository, mock_outbox: Outbox, node_domain: Domain, - sample_deposition_srn: DepositionSRN, - sample_metadata: dict, + sample_draft: RecordDraft, ): - """Service should create and persist a Record.""" - # Arrange + """Service should create and persist a Record from a draft.""" service = RecordService( record_repo=mock_record_repo, outbox=mock_outbox, @@ -74,16 +68,12 @@ async def test_publish_record_creates_record( feature_reader=AsyncMock(), ) - # Act - record = await service.publish_record( - deposition_srn=sample_deposition_srn, - metadata=sample_metadata, - ) + record = await service.publish_record(sample_draft) - # Assert assert record is not None - assert record.deposition_srn == sample_deposition_srn - assert record.metadata == sample_metadata + assert record.source == sample_draft.source + assert record.convention_srn == sample_draft.convention_srn + assert record.metadata == sample_draft.metadata mock_record_repo.save.assert_called_once() @pytest.mark.asyncio @@ -92,11 +82,9 @@ async def test_publish_record_emits_record_published_event( mock_record_repo: RecordRepository, mock_outbox: Outbox, node_domain: Domain, - sample_deposition_srn: DepositionSRN, - sample_metadata: dict, + sample_draft: RecordDraft, ): - """Service should emit RecordPublished event.""" - # Arrange + """Service should emit RecordPublished event with source-agnostic fields.""" service = RecordService( record_repo=mock_record_repo, outbox=mock_outbox, @@ -104,19 +92,16 @@ async def test_publish_record_emits_record_published_event( feature_reader=AsyncMock(), ) - # Act - record = await service.publish_record( - deposition_srn=sample_deposition_srn, - metadata=sample_metadata, - ) + record = await service.publish_record(sample_draft) - # Assert mock_outbox.append.assert_called_once() event = mock_outbox.append.call_args[0][0] assert isinstance(event, RecordPublished) assert event.record_srn == record.srn - assert event.deposition_srn == sample_deposition_srn - assert event.metadata == sample_metadata + assert event.source == sample_draft.source + assert event.convention_srn == sample_draft.convention_srn + assert event.expected_features == sample_draft.expected_features + assert event.metadata == sample_draft.metadata @pytest.mark.asyncio async def test_publish_record_creates_version_1( @@ -124,11 +109,9 @@ async def test_publish_record_creates_version_1( mock_record_repo: RecordRepository, mock_outbox: Outbox, node_domain: Domain, - sample_deposition_srn: DepositionSRN, - sample_metadata: dict, + sample_draft: RecordDraft, ): """New records should be version 1.""" - # Arrange service = RecordService( record_repo=mock_record_repo, outbox=mock_outbox, @@ -136,11 +119,48 @@ async def test_publish_record_creates_version_1( feature_reader=AsyncMock(), ) - # Act - record = await service.publish_record( - deposition_srn=sample_deposition_srn, - metadata=sample_metadata, - ) + record = await service.publish_record(sample_draft) - # Assert assert record.srn.version.root == 1 + + +class TestRecordServiceHarvestSource: + """US2: Verify harvest-sourced records publish correctly.""" + + @pytest.mark.asyncio + async def test_publish_with_harvest_source( + self, + mock_record_repo: RecordRepository, + mock_outbox: Outbox, + node_domain: Domain, + ): + """HarvestSource draft produces correct Record + RecordPublished event.""" + draft = RecordDraft( + source=HarvestSource( + id="run-123-pdb-456", + harvest_run_srn="urn:osa:localhost:val:run123", + upstream_source="pdb", + ), + metadata={"title": "Harvested Protein"}, + convention_srn=_make_conv_srn(), + expected_features=["pocket_detect"], + ) + + service = RecordService( + record_repo=mock_record_repo, + outbox=mock_outbox, + node_domain=node_domain, + feature_reader=AsyncMock(), + ) + + record = await service.publish_record(draft) + + assert record.source.type == "harvest" + assert record.source.upstream_source == "pdb" + assert record.convention_srn == _make_conv_srn() + mock_record_repo.save.assert_called_once() + + event = mock_outbox.append.call_args[0][0] + assert isinstance(event, RecordPublished) + assert event.source.type == "harvest" + assert event.expected_features == ["pocket_detect"] diff --git a/server/tests/unit/domain/shared/test_record_source.py b/server/tests/unit/domain/shared/test_record_source.py new file mode 100644 index 0000000..ff71708 --- /dev/null +++ b/server/tests/unit/domain/shared/test_record_source.py @@ -0,0 +1,92 @@ +"""Unit tests for RecordSource discriminated union.""" + +import pytest +from pydantic import TypeAdapter, ValidationError + +from osa.domain.shared.model.source import ( + DepositionSource, + HarvestSource, + RecordSource, +) + + +class TestDepositionSource: + def test_type_is_deposition(self): + src = DepositionSource(id="urn:osa:localhost:dep:abc") + assert src.type == "deposition" + + def test_id_required(self): + with pytest.raises(ValidationError): + DepositionSource(id="") + + def test_serialization_roundtrip(self): + src = DepositionSource(id="urn:osa:localhost:dep:abc") + data = src.model_dump() + assert data == {"type": "deposition", "id": "urn:osa:localhost:dep:abc"} + restored = DepositionSource.model_validate(data) + assert restored == src + + +class TestHarvestSource: + def test_type_is_harvest(self): + src = HarvestSource( + id="run-123-source-456", + harvest_run_srn="urn:osa:localhost:val:run123", + upstream_source="pdb", + ) + assert src.type == "harvest" + + def test_requires_harvest_run_srn(self): + with pytest.raises(ValidationError): + HarvestSource(id="run-123", upstream_source="pdb") + + def test_requires_upstream_source(self): + with pytest.raises(ValidationError): + HarvestSource(id="run-123", harvest_run_srn="urn:osa:localhost:val:run123") + + def test_serialization_roundtrip(self): + src = HarvestSource( + id="run-123-source-456", + harvest_run_srn="urn:osa:localhost:val:run123", + upstream_source="pdb", + ) + data = src.model_dump() + restored = HarvestSource.model_validate(data) + assert restored == src + + +class TestRecordSourceDiscriminator: + def test_deserializes_deposition(self): + adapter = TypeAdapter(RecordSource) + src = adapter.validate_python({"type": "deposition", "id": "dep-abc"}) + assert isinstance(src, DepositionSource) + + def test_deserializes_harvest(self): + data = { + "type": "harvest", + "id": "run-123", + "harvest_run_srn": "urn:osa:localhost:val:run1", + "upstream_source": "geo", + } + adapter = TypeAdapter(RecordSource) + src = adapter.validate_python(data) + assert isinstance(src, HarvestSource) + assert src.upstream_source == "geo" + + def test_rejects_unknown_type(self): + adapter = TypeAdapter(RecordSource) + with pytest.raises(ValidationError): + adapter.validate_python({"type": "unknown", "id": "abc"}) + + def test_json_roundtrip(self): + """Serialize to JSON and back via the union type.""" + adapter = TypeAdapter(RecordSource) + src = HarvestSource( + id="run-1", + harvest_run_srn="urn:osa:localhost:val:run1", + upstream_source="pdb", + ) + json_str = adapter.dump_json(src) + restored = adapter.validate_json(json_str) + assert isinstance(restored, HarvestSource) + assert restored == src diff --git a/server/tests/unit/domain/validation/test_hook_runner.py b/server/tests/unit/domain/validation/test_hook_runner.py index 16d5f26..fa1e1fa 100644 --- a/server/tests/unit/domain/validation/test_hook_runner.py +++ b/server/tests/unit/domain/validation/test_hook_runner.py @@ -5,7 +5,6 @@ import pytest from osa.domain.shared.model.hook import HookDefinition -from osa.domain.shared.model.srn import DepositionSRN from osa.domain.validation.model.hook_result import HookResult, HookStatus from osa.domain.validation.port.hook_runner import HookInputs, HookRunner @@ -14,7 +13,7 @@ class TestHookInputs: def test_minimal_construction(self): inputs = HookInputs( record_json={"srn": "urn:osa:localhost:rec:123"}, - deposition_srn=DepositionSRN.parse("urn:osa:localhost:dep:test123"), + run_id="localhost_test123", ) assert inputs.record_json == {"srn": "urn:osa:localhost:rec:123"} assert inputs.files_dir is None @@ -24,7 +23,7 @@ def test_with_files_dir(self): files = Path("/tmp/files") inputs = HookInputs( record_json={"srn": "test"}, - deposition_srn=DepositionSRN.parse("urn:osa:localhost:dep:test123"), + run_id="localhost_test123", files_dir=files, ) assert inputs.files_dir == files @@ -32,7 +31,7 @@ def test_with_files_dir(self): def test_with_config(self): inputs = HookInputs( record_json={"srn": "test"}, - deposition_srn=DepositionSRN.parse("urn:osa:localhost:dep:test123"), + run_id="localhost_test123", config={"r_min": 3.0, "threshold": 0.5}, ) assert inputs.config == {"r_min": 3.0, "threshold": 0.5} @@ -41,7 +40,7 @@ def test_full_construction(self): files = Path("/tmp/data/files") inputs = HookInputs( record_json={"srn": "urn:osa:localhost:rec:456", "name": "test"}, - deposition_srn=DepositionSRN.parse("urn:osa:localhost:dep:test456"), + run_id="localhost_test456", files_dir=files, config={"key": "value"}, ) @@ -52,7 +51,7 @@ def test_full_construction(self): def test_is_frozen(self): inputs = HookInputs( record_json={"srn": "test"}, - deposition_srn=DepositionSRN.parse("urn:osa:localhost:dep:test123"), + run_id="localhost_test123", ) with pytest.raises(AttributeError): inputs.record_json = {} # type: ignore[misc] diff --git a/server/tests/unit/domain/validation/test_validation_service.py b/server/tests/unit/domain/validation/test_validation_service.py index d8b598e..1ce570e 100644 --- a/server/tests/unit/domain/validation/test_validation_service.py +++ b/server/tests/unit/domain/validation/test_validation_service.py @@ -51,6 +51,8 @@ def _make_service( hs = hook_storage or MagicMock() if not hasattr(hs, "get_hook_output_dir") or not callable(hs.get_hook_output_dir): hs.get_hook_output_dir = MagicMock(return_value=Path("/tmp/hooks/test")) + if not hasattr(hs, "get_files_dir") or not callable(hs.get_files_dir): + hs.get_files_dir = MagicMock(return_value=Path("/tmp/files/test")) return ValidationService( run_repo=run_repo or AsyncMock(), hook_runner=hook_runner or AsyncMock(), @@ -62,7 +64,8 @@ def _make_service( def _make_inputs() -> HookInputs: return HookInputs( record_json={"srn": "urn:osa:localhost:dep:test123", "metadata": {"name": "test"}}, - deposition_srn=DepositionSRN.parse("urn:osa:localhost:dep:test123"), + run_id="localhost_test123", + files_dir=Path("/tmp/staging/files"), ) @@ -149,6 +152,7 @@ async def test_output_dir_from_hook_storage(self): hook_runner.run.return_value = _make_hook_result() hook_storage = MagicMock() hook_storage.get_hook_output_dir.return_value = Path("/cold/hooks/pocket_detect") + hook_storage.get_files_dir = MagicMock(return_value=Path("/tmp/files/test")) service = _make_service(hook_runner=hook_runner, hook_storage=hook_storage) run = await service.create_run(inputs=_make_inputs()) diff --git a/server/tests/unit/domain/validation/test_validation_service_decoupled.py b/server/tests/unit/domain/validation/test_validation_service_decoupled.py index edd636c..31a03f7 100644 --- a/server/tests/unit/domain/validation/test_validation_service_decoupled.py +++ b/server/tests/unit/domain/validation/test_validation_service_decoupled.py @@ -67,7 +67,7 @@ def test_no_file_storage_dependency(self): @pytest.mark.asyncio async def test_validate_deposition_uses_event_data(self): - """validate_deposition accepts hooks/files_dir/metadata directly.""" + """validate_deposition accepts hooks/metadata directly.""" run_repo = AsyncMock() run_repo.save = AsyncMock() hook_runner = AsyncMock() @@ -78,6 +78,7 @@ async def test_validate_deposition_uses_event_data(self): ) hook_storage = MagicMock() hook_storage.get_hook_output_dir.return_value = Path("/tmp/hooks/pocketeer") + hook_storage.get_files_dir.return_value = Path("/data/files/test-dep") service = ValidationService( run_repo=run_repo, @@ -92,7 +93,6 @@ async def test_validate_deposition_uses_event_data(self): convention_srn=_make_conv_srn(), metadata={"pdb_id": "4HHB"}, hooks=[hook], - files_dir="/data/files/test-dep", ) assert run.status == RunStatus.COMPLETED diff --git a/server/tests/unit/infrastructure/k8s/test_k8s_hook_runner.py b/server/tests/unit/infrastructure/k8s/test_k8s_hook_runner.py index a8a0474..14a9b47 100644 --- a/server/tests/unit/infrastructure/k8s/test_k8s_hook_runner.py +++ b/server/tests/unit/infrastructure/k8s/test_k8s_hook_runner.py @@ -14,12 +14,11 @@ OciLimits, TableFeatureSpec, ) -from osa.domain.shared.model.srn import DepositionSRN from osa.domain.validation.model.hook_result import HookStatus from osa.domain.validation.port.hook_runner import HookInputs from osa.infrastructure.k8s.runner import K8sHookRunner -_DEP_SRN = DepositionSRN.parse("urn:osa:localhost:dep:abc123") +_RUN_ID = "run-abc123" def _make_hook( @@ -85,7 +84,7 @@ def test_correct_image(self): spec = runner._build_job_spec( hook, Path("/data/depositions/localhost_abc/hooks/validate_dna"), - deposition_srn=_DEP_SRN, + run_id=_RUN_ID, ) container = spec.spec.template.spec.containers[0] @@ -97,7 +96,7 @@ def test_security_context(self): spec = runner._build_job_spec( hook, Path("/data/depositions/localhost_abc/hooks/validate_dna"), - deposition_srn=_DEP_SRN, + run_id=_RUN_ID, ) pod_spec = spec.spec.template.spec @@ -119,7 +118,7 @@ def test_resource_limits(self): spec = runner._build_job_spec( hook, Path("/data/depositions/localhost_abc/hooks/validate_dna"), - deposition_srn=_DEP_SRN, + run_id=_RUN_ID, ) resources = spec.spec.template.spec.containers[0].resources @@ -130,7 +129,7 @@ def test_volume_mounts(self): runner = _make_runner() hook = _make_hook() work_dir = Path("/data/depositions/localhost_abc/hooks/validate_dna") - spec = runner._build_job_spec(hook, work_dir, deposition_srn=_DEP_SRN) + spec = runner._build_job_spec(hook, work_dir, run_id=_RUN_ID) volumes = spec.spec.template.spec.volumes pvc_vol = next(v for v in volumes if v.name == "data") @@ -151,7 +150,7 @@ def test_env_vars(self): spec = runner._build_job_spec( hook, Path("/data/depositions/localhost_abc/hooks/pocket_detect"), - deposition_srn=_DEP_SRN, + run_id=_RUN_ID, ) env = spec.spec.template.spec.containers[0].env @@ -166,7 +165,7 @@ def test_backoff_limit_zero(self): spec = runner._build_job_spec( hook, Path("/data/depositions/localhost_abc/hooks/validate_dna"), - deposition_srn=_DEP_SRN, + run_id=_RUN_ID, ) assert spec.spec.backoff_limit == 0 @@ -177,7 +176,7 @@ def test_active_deadline_seconds(self): spec = runner._build_job_spec( hook, Path("/data/depositions/localhost_abc/hooks/validate_dna"), - deposition_srn=_DEP_SRN, + run_id=_RUN_ID, ) # scheduling_timeout (120) + hook timeout (300) @@ -189,7 +188,7 @@ def test_dns_policy_none(self): spec = runner._build_job_spec( hook, Path("/data/depositions/localhost_abc/hooks/validate_dna"), - deposition_srn=_DEP_SRN, + run_id=_RUN_ID, ) pod_spec = spec.spec.template.spec @@ -202,13 +201,13 @@ def test_labels(self): spec = runner._build_job_spec( hook, Path("/data/depositions/localhost_abc/hooks/validate_dna"), - deposition_srn=_DEP_SRN, + run_id=_RUN_ID, ) labels = spec.spec.template.metadata.labels assert labels["osa.io/role"] == "hook" assert labels["osa.io/hook"] == "validate_dna" - assert labels["osa.io/deposition"] == "localhost.dep.abc123" + assert labels["osa.io/run-id"] == "run-abc123" def test_human_readable_job_name(self): runner = _make_runner() @@ -216,7 +215,7 @@ def test_human_readable_job_name(self): spec = runner._build_job_spec( hook, Path("/data/depositions/localhost_abc/hooks/validate_dna"), - deposition_srn=_DEP_SRN, + run_id=_RUN_ID, ) name = spec.metadata.name @@ -229,7 +228,7 @@ def test_empty_dir_at_tmp(self): spec = runner._build_job_spec( hook, Path("/data/depositions/localhost_abc/hooks/validate_dna"), - deposition_srn=_DEP_SRN, + run_id=_RUN_ID, ) volumes = spec.spec.template.spec.volumes @@ -242,7 +241,7 @@ def test_automount_service_account_false(self): spec = runner._build_job_spec( hook, Path("/data/depositions/localhost_abc/hooks/validate_dna"), - deposition_srn=_DEP_SRN, + run_id=_RUN_ID, ) pod_spec = spec.spec.template.spec @@ -254,7 +253,7 @@ def test_ttl_seconds_after_finished(self): spec = runner._build_job_spec( hook, Path("/data/depositions/localhost_abc/hooks/validate_dna"), - deposition_srn=_DEP_SRN, + run_id=_RUN_ID, ) assert spec.spec.ttl_seconds_after_finished == 600 @@ -265,7 +264,7 @@ def test_files_mount_when_files_dir_provided(self): spec = runner._build_job_spec( hook, Path("/data/depositions/localhost_abc/hooks/validate_dna"), - deposition_srn=_DEP_SRN, + run_id=_RUN_ID, files_dir=Path("/data/depositions/localhost_abc/files"), ) @@ -280,7 +279,7 @@ def test_image_pull_secrets(self): spec = runner._build_job_spec( hook, Path("/data/depositions/localhost_abc/hooks/validate_dna"), - deposition_srn=_DEP_SRN, + run_id=_RUN_ID, ) secrets = spec.spec.template.spec.image_pull_secrets @@ -293,7 +292,7 @@ def test_service_account(self): spec = runner._build_job_spec( hook, Path("/data/depositions/localhost_abc/hooks/validate_dna"), - deposition_srn=_DEP_SRN, + run_id=_RUN_ID, ) assert spec.spec.template.spec.service_account_name == "osa-runner" @@ -461,14 +460,13 @@ async def test_successful_run(self, tmp_path: Path): b'{"step":"Check","status":"completed","message":"OK"}\n' ) - inputs = HookInputs(record_json={"srn": "test"}, deposition_srn=_DEP_SRN) + inputs = HookInputs(record_json={"srn": "test"}, run_id=_RUN_ID) result = await runner._run_job( batch_api, core_api, hook, inputs, work_dir, - deposition_srn=_DEP_SRN, ) assert result.status == HookStatus.PASSED @@ -511,7 +509,7 @@ async def test_timeout_deadline_exceeded(self, tmp_path: Path): hook = _make_hook() work_dir = tmp_path / "depositions" / "localhost_abc" / "hooks" / "validate_dna" work_dir.mkdir(parents=True) - inputs = HookInputs(record_json={"srn": "test"}, deposition_srn=_DEP_SRN) + inputs = HookInputs(record_json={"srn": "test"}, run_id=_RUN_ID) result = await runner._run_job( batch_api, @@ -519,7 +517,6 @@ async def test_timeout_deadline_exceeded(self, tmp_path: Path): hook, inputs, work_dir, - deposition_srn=_DEP_SRN, ) assert result.status == HookStatus.FAILED @@ -577,7 +574,7 @@ async def test_oom_exit_137(self, tmp_path: Path): hook = _make_hook() work_dir = tmp_path / "depositions" / "localhost_abc" / "hooks" / "validate_dna" work_dir.mkdir(parents=True) - inputs = HookInputs(record_json={"srn": "test"}, deposition_srn=_DEP_SRN) + inputs = HookInputs(record_json={"srn": "test"}, run_id=_RUN_ID) result = await runner._run_job( batch_api, @@ -585,7 +582,6 @@ async def test_oom_exit_137(self, tmp_path: Path): hook, inputs, work_dir, - deposition_srn=_DEP_SRN, ) assert result.status == HookStatus.FAILED @@ -637,7 +633,7 @@ async def test_nonzero_exit(self, tmp_path: Path): hook = _make_hook() work_dir = tmp_path / "depositions" / "localhost_abc" / "hooks" / "validate_dna" work_dir.mkdir(parents=True) - inputs = HookInputs(record_json={"srn": "test"}, deposition_srn=_DEP_SRN) + inputs = HookInputs(record_json={"srn": "test"}, run_id=_RUN_ID) result = await runner._run_job( batch_api, @@ -645,7 +641,6 @@ async def test_nonzero_exit(self, tmp_path: Path): hook, inputs, work_dir, - deposition_srn=_DEP_SRN, ) assert result.status == HookStatus.FAILED @@ -692,7 +687,7 @@ async def test_orphan_running_job_attaches(self, tmp_path: Path): work_dir = tmp_path / "depositions" / "localhost_abc" / "hooks" / "validate_dna" output_dir = work_dir / "output" output_dir.mkdir(parents=True) - inputs = HookInputs(record_json={"srn": "test"}, deposition_srn=_DEP_SRN) + inputs = HookInputs(record_json={"srn": "test"}, run_id=_RUN_ID) result = await runner._run_job( batch_api, @@ -700,7 +695,6 @@ async def test_orphan_running_job_attaches(self, tmp_path: Path): hook, inputs, work_dir, - deposition_srn=_DEP_SRN, ) assert result.status == HookStatus.PASSED @@ -729,7 +723,7 @@ async def test_orphan_completed_job_reads_output(self, tmp_path: Path): work_dir = tmp_path / "depositions" / "localhost_abc" / "hooks" / "validate_dna" output_dir = work_dir / "output" output_dir.mkdir(parents=True) - inputs = HookInputs(record_json={"srn": "test"}, deposition_srn=_DEP_SRN) + inputs = HookInputs(record_json={"srn": "test"}, run_id=_RUN_ID) result = await runner._run_job( batch_api, @@ -737,7 +731,6 @@ async def test_orphan_completed_job_reads_output(self, tmp_path: Path): hook, inputs, work_dir, - deposition_srn=_DEP_SRN, ) assert result.status == HookStatus.PASSED @@ -785,7 +778,7 @@ async def test_orphan_failed_job_creates_new(self, tmp_path: Path): work_dir = tmp_path / "depositions" / "localhost_abc" / "hooks" / "validate_dna" output_dir = work_dir / "output" output_dir.mkdir(parents=True) - inputs = HookInputs(record_json={"srn": "test"}, deposition_srn=_DEP_SRN) + inputs = HookInputs(record_json={"srn": "test"}, run_id=_RUN_ID) result = await runner._run_job( batch_api, @@ -793,7 +786,6 @@ async def test_orphan_failed_job_creates_new(self, tmp_path: Path): hook, inputs, work_dir, - deposition_srn=_DEP_SRN, ) assert result.status == HookStatus.PASSED @@ -851,7 +843,7 @@ async def test_rejection_via_progress(self, tmp_path: Path): runner._s3.get_object.return_value = ( b'{"step":"Validate","status":"rejected","message":"Missing atoms"}\n' ) - inputs = HookInputs(record_json={"srn": "test"}, deposition_srn=_DEP_SRN) + inputs = HookInputs(record_json={"srn": "test"}, run_id=_RUN_ID) result = await runner._run_job( batch_api, @@ -859,7 +851,6 @@ async def test_rejection_via_progress(self, tmp_path: Path): hook, inputs, work_dir, - deposition_srn=_DEP_SRN, ) assert result.status == HookStatus.REJECTED @@ -871,12 +862,12 @@ async def test_rejection_via_progress(self, tmp_path: Path): # --------------------------------------------------------------------------- -class TestDepositionSrnFromInputs: - """Verify run() uses inputs.deposition_srn for Job labels, not path parsing.""" +class TestRunIdFromInputs: + """Verify run() uses inputs.run_id for Job labels, not path parsing.""" @pytest.mark.asyncio - async def test_run_uses_deposition_srn_from_inputs(self, tmp_path: Path): - """The deposition SRN in Job labels comes from inputs, not the work_dir path.""" + async def test_run_uses_run_id_from_inputs(self, tmp_path: Path): + """The run_id in Job labels comes from inputs, not the work_dir path.""" from unittest.mock import patch config = _make_config(data_mount_path=str(tmp_path)) @@ -914,7 +905,7 @@ async def test_run_uses_deposition_srn_from_inputs(self, tmp_path: Path): hook = _make_hook() inputs = HookInputs( record_json={"srn": "test"}, - deposition_srn=DepositionSRN.parse("urn:osa:localhost:dep:my-real-srn"), + run_id="my-real-run-id", ) with ( @@ -923,8 +914,8 @@ async def test_run_uses_deposition_srn_from_inputs(self, tmp_path: Path): ): await runner.run(hook, inputs, work_dir) - # Verify the Job was created with the SRN from inputs + # Verify the Job was created with the run_id from inputs call_args = batch_api.create_namespaced_job.call_args spec = call_args[0][1] # positional arg: (namespace, spec) labels = spec.metadata.labels - assert labels["osa.io/deposition"] == "localhost.dep.my-real-srn" + assert labels["osa.io/run-id"] == "my-real-run-id" diff --git a/server/tests/unit/infrastructure/test_file_storage_hooks.py b/server/tests/unit/infrastructure/test_file_storage_hooks.py index 62b04e4..3fb186c 100644 --- a/server/tests/unit/infrastructure/test_file_storage_hooks.py +++ b/server/tests/unit/infrastructure/test_file_storage_hooks.py @@ -46,14 +46,14 @@ class TestReadHookFeatures: @pytest.mark.asyncio async def test_reads_features_list(self, tmp_path: Path): adapter = FilesystemStorageAdapter(base_path=str(tmp_path)) - dep_srn = _make_dep_srn() - # Write features.json in the output/ subdirectory - output_dir = tmp_path / "depositions" / "localhost_test-dep" / "hooks" / "detect" / "output" + # The hook_output_dir is the deposition root directory + dep_root = tmp_path / "depositions" / "localhost_test-dep" + output_dir = dep_root / "hooks" / "detect" / "output" output_dir.mkdir(parents=True) (output_dir / "features.json").write_text(json.dumps([{"score": 0.95}, {"score": 0.82}])) - features = await adapter.read_hook_features(dep_srn, "detect") + features = await adapter.read_hook_features(str(dep_root), "detect") assert len(features) == 2 assert features[0]["score"] == 0.95 @@ -61,13 +61,13 @@ async def test_reads_features_list(self, tmp_path: Path): @pytest.mark.asyncio async def test_reads_features_dict(self, tmp_path: Path): adapter = FilesystemStorageAdapter(base_path=str(tmp_path)) - dep_srn = _make_dep_srn() - output_dir = tmp_path / "depositions" / "localhost_test-dep" / "hooks" / "detect" / "output" + dep_root = tmp_path / "depositions" / "localhost_test-dep" + output_dir = dep_root / "hooks" / "detect" / "output" output_dir.mkdir(parents=True) (output_dir / "features.json").write_text(json.dumps({"score": 0.95})) - features = await adapter.read_hook_features(dep_srn, "detect") + features = await adapter.read_hook_features(str(dep_root), "detect") assert len(features) == 1 assert features[0]["score"] == 0.95 @@ -75,9 +75,9 @@ async def test_reads_features_dict(self, tmp_path: Path): @pytest.mark.asyncio async def test_returns_empty_when_missing(self, tmp_path: Path): adapter = FilesystemStorageAdapter(base_path=str(tmp_path)) - dep_srn = _make_dep_srn() - features = await adapter.read_hook_features(dep_srn, "nonexistent") + dep_root = tmp_path / "depositions" / "localhost_test-dep" + features = await adapter.read_hook_features(str(dep_root), "nonexistent") assert features == [] @@ -86,20 +86,20 @@ class TestHookFeaturesExist: @pytest.mark.asyncio async def test_true_when_file_exists(self, tmp_path: Path): adapter = FilesystemStorageAdapter(base_path=str(tmp_path)) - dep_srn = _make_dep_srn() - output_dir = tmp_path / "depositions" / "localhost_test-dep" / "hooks" / "detect" / "output" + dep_root = tmp_path / "depositions" / "localhost_test-dep" + output_dir = dep_root / "hooks" / "detect" / "output" output_dir.mkdir(parents=True) (output_dir / "features.json").write_text("[]") - assert await adapter.hook_features_exist(dep_srn, "detect") is True + assert await adapter.hook_features_exist(str(dep_root), "detect") is True @pytest.mark.asyncio async def test_false_when_missing(self, tmp_path: Path): adapter = FilesystemStorageAdapter(base_path=str(tmp_path)) - dep_srn = _make_dep_srn() - assert await adapter.hook_features_exist(dep_srn, "nonexistent") is False + dep_root = tmp_path / "depositions" / "localhost_test-dep" + assert await adapter.hook_features_exist(str(dep_root), "nonexistent") is False class TestDeleteCleansHookOutputs: diff --git a/server/tests/unit/infrastructure/test_oci_hook_runner.py b/server/tests/unit/infrastructure/test_oci_hook_runner.py index 268b3a2..ae43290 100644 --- a/server/tests/unit/infrastructure/test_oci_hook_runner.py +++ b/server/tests/unit/infrastructure/test_oci_hook_runner.py @@ -5,7 +5,6 @@ import pytest -from osa.domain.shared.model.srn import DepositionSRN from osa.domain.shared.model.hook import ( ColumnDef, HookDefinition, @@ -201,7 +200,7 @@ async def test_successful_hook_returns_passed(self, tmp_path: Path): hook = _make_hook() inputs = HookInputs( record_json={"srn": "test"}, - deposition_srn=DepositionSRN.parse("urn:osa:localhost:dep:test123"), + run_id="test-run", ) output_dir = tmp_path / "output" @@ -227,7 +226,7 @@ async def test_nonzero_exit_returns_failed(self, tmp_path: Path): hook = _make_hook() inputs = HookInputs( record_json={"srn": "test"}, - deposition_srn=DepositionSRN.parse("urn:osa:localhost:dep:test123"), + run_id="test-run", ) output_dir = tmp_path / "output" @@ -250,7 +249,7 @@ async def test_oom_killed_returns_failed(self, tmp_path: Path): hook = _make_hook() inputs = HookInputs( record_json={"srn": "test"}, - deposition_srn=DepositionSRN.parse("urn:osa:localhost:dep:test123"), + run_id="test-run", ) output_dir = tmp_path / "output" @@ -280,7 +279,7 @@ async def hang(): hook = _make_hook(timeout=1) # 1 second timeout inputs = HookInputs( record_json={"srn": "test"}, - deposition_srn=DepositionSRN.parse("urn:osa:localhost:dep:test123"), + run_id="test-run", ) output_dir = tmp_path / "output" @@ -301,14 +300,15 @@ async def test_rejection_via_progress(self, tmp_path: Path): runner = OciHookRunner(docker=docker) hook = _make_hook() - inputs = HookInputs( - record_json={"srn": "test"}, - deposition_srn=DepositionSRN.parse("urn:osa:localhost:dep:test123"), - ) work_dir = tmp_path / "hook_work" work_dir.mkdir() + inputs = HookInputs( + record_json={"srn": "test"}, + run_id="test-run", + ) + # Pre-create rejection progress in the output subdir (where the runner reads from) container_output = work_dir / "output" container_output.mkdir() @@ -336,7 +336,7 @@ async def test_security_hardening(self, tmp_path: Path): hook = _make_hook(memory="4g", cpu="4.0") inputs = HookInputs( record_json={"srn": "test"}, - deposition_srn=DepositionSRN.parse("urn:osa:localhost:dep:test123"), + run_id="test-run", ) output_dir = tmp_path / "output" @@ -370,7 +370,7 @@ async def test_env_vars_set(self, tmp_path: Path): hook = _make_hook() inputs = HookInputs( record_json={"srn": "test"}, - deposition_srn=DepositionSRN.parse("urn:osa:localhost:dep:test123"), + run_id="test-run", ) output_dir = tmp_path / "output" @@ -400,7 +400,7 @@ async def test_nested_bind_mounts(self, tmp_path: Path): files_dir.mkdir() inputs = HookInputs( record_json={"srn": "test"}, - deposition_srn=DepositionSRN.parse("urn:osa:localhost:dep:test123"), + run_id="test-run", files_dir=files_dir, ) @@ -436,7 +436,7 @@ async def test_no_files_bind_when_no_files_dir(self, tmp_path: Path): hook = _make_hook() inputs = HookInputs( record_json={"srn": "test"}, - deposition_srn=DepositionSRN.parse("urn:osa:localhost:dep:test123"), + run_id="test-run", files_dir=None, ) @@ -465,7 +465,7 @@ async def test_container_deleted_on_failure(self, tmp_path: Path): hook = _make_hook() inputs = HookInputs( record_json={"srn": "test"}, - deposition_srn=DepositionSRN.parse("urn:osa:localhost:dep:test123"), + run_id="test-run", ) output_dir = tmp_path / "output" From a1da59da1766461c0d73c8985f2fd9a1e2c6852d Mon Sep 17 00:00:00 2001 From: Rory Byrne Date: Wed, 25 Mar 2026 11:22:39 +0000 Subject: [PATCH 2/4] refactor: simplify records table indexes and remove unused methods Remove convention_srn from unique constraint and drop source_type index to simplify database schema. Remove unused find_by_source method from repository interface and implementation. Delete empty value objects file. --- .../migrations/versions/source_agnostic_records.py | 13 +++---------- server/osa/domain/record/model/value.py | 1 - server/osa/domain/record/port/repository.py | 3 --- .../infrastructure/persistence/repository/record.py | 10 ---------- server/osa/infrastructure/persistence/tables.py | 7 +------ 5 files changed, 4 insertions(+), 30 deletions(-) delete mode 100644 server/osa/domain/record/model/value.py diff --git a/server/migrations/versions/source_agnostic_records.py b/server/migrations/versions/source_agnostic_records.py index d263b14..8f6cc8e 100644 --- a/server/migrations/versions/source_agnostic_records.py +++ b/server/migrations/versions/source_agnostic_records.py @@ -16,7 +16,7 @@ # revision identifiers, used by Alembic. revision: str = "source_agnostic_records" -down_revision: Union[str, Sequence[str], None] = "consumer_group_delivery" +down_revision: Union[str, Sequence[str], None] = "add_device_authorizations" branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None @@ -46,26 +46,19 @@ def upgrade() -> None: ["convention_srn"], ) op.create_index( - "uq_records_convention_source", + "uq_records_source", "records", [ - "convention_srn", sa.text("(source->>'type')"), sa.text("(source->>'id')"), ], unique=True, ) - op.create_index( - "idx_records_source_type", - "records", - [sa.text("(source->>'type')")], - ) def downgrade() -> None: # Drop new indexes - op.drop_index("idx_records_source_type", table_name="records") - op.drop_index("uq_records_convention_source", table_name="records") + op.drop_index("uq_records_source", table_name="records") op.drop_index("idx_records_convention_srn", table_name="records") # Drop new columns diff --git a/server/osa/domain/record/model/value.py b/server/osa/domain/record/model/value.py deleted file mode 100644 index a50b539..0000000 --- a/server/osa/domain/record/model/value.py +++ /dev/null @@ -1 +0,0 @@ -"""Record domain value objects.""" diff --git a/server/osa/domain/record/port/repository.py b/server/osa/domain/record/port/repository.py index f515c95..34a1724 100644 --- a/server/osa/domain/record/port/repository.py +++ b/server/osa/domain/record/port/repository.py @@ -15,8 +15,5 @@ async def save(self, record: Record) -> None: ... @abstractmethod async def get(self, srn: RecordSRN) -> Record | None: ... - @abstractmethod - async def find_by_source(self, source_type: str, source_id: str) -> Record | None: ... - @abstractmethod async def count(self) -> int: ... diff --git a/server/osa/infrastructure/persistence/repository/record.py b/server/osa/infrastructure/persistence/repository/record.py index cb40645..22763b9 100644 --- a/server/osa/infrastructure/persistence/repository/record.py +++ b/server/osa/infrastructure/persistence/repository/record.py @@ -30,16 +30,6 @@ async def get(self, srn: RecordSRN) -> Record | None: row = result.mappings().first() return row_to_record(dict(row)) if row else None - async def find_by_source(self, source_type: str, source_id: str) -> Record | None: - """Find a record by source type and id.""" - stmt = select(records_table).where( - records_table.c.source["type"].as_string() == source_type, - records_table.c.source["id"].as_string() == source_id, - ) - result = await self.session.execute(stmt) - row = result.mappings().first() - return row_to_record(dict(row)) if row else None - async def count(self) -> int: """Count total records in the database.""" stmt = select(func.count()).select_from(records_table) diff --git a/server/osa/infrastructure/persistence/tables.py b/server/osa/infrastructure/persistence/tables.py index 150f951..de6e670 100644 --- a/server/osa/infrastructure/persistence/tables.py +++ b/server/osa/infrastructure/persistence/tables.py @@ -73,16 +73,11 @@ Index("idx_records_convention_srn", records_table.c.convention_srn) Index( - "uq_records_convention_source", - records_table.c.convention_srn, + "uq_records_source", records_table.c.source["type"].as_string(), records_table.c.source["id"].as_string(), unique=True, ) -Index( - "idx_records_source_type", - records_table.c.source["type"].as_string(), -) Index("idx_records_published_at", records_table.c.published_at) Index( "idx_records_metadata_gin", From 086aae5ac6c5355ce46a9b37382fb15e5d350567 Mon Sep 17 00:00:00 2001 From: Rory Byrne Date: Wed, 25 Mar 2026 11:27:48 +0000 Subject: [PATCH 3/4] refactor: update RecordPublished event creation to use new source and convention fields Add ConventionSRN import and replace deposition_srn parameter with source and convention_srn fields in make_record_published helper function to align with updated event structure --- .../test_event_batch_processing.py | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/server/tests/integration/test_event_batch_processing.py b/server/tests/integration/test_event_batch_processing.py index 39f1450..41b026f 100644 --- a/server/tests/integration/test_event_batch_processing.py +++ b/server/tests/integration/test_event_batch_processing.py @@ -15,7 +15,14 @@ from osa.domain.index.model.registry import IndexRegistry from osa.domain.record.event.record_published import RecordPublished from osa.domain.shared.event import EventId -from osa.domain.shared.model.srn import DepositionSRN, Domain, LocalId, RecordSRN, RecordVersion +from osa.domain.shared.model.srn import ( + ConventionSRN, + DepositionSRN, + Domain, + LocalId, + RecordSRN, + RecordVersion, +) class FakeBackend: @@ -51,6 +58,12 @@ def make_record_published( metadata: dict | None = None, ) -> RecordPublished: """Create a RecordPublished event for testing.""" + from osa.domain.shared.model.source import DepositionSource + + dep_srn = DepositionSRN( + domain=Domain("test.example.com"), + id=LocalId(str(uuid4())), + ) return RecordPublished( id=EventId(uuid4()), record_srn=RecordSRN( @@ -58,10 +71,8 @@ def make_record_published( id=LocalId(record_id or str(uuid4())), version=RecordVersion(1), ), - deposition_srn=DepositionSRN( - domain=Domain("test.example.com"), - id=LocalId(str(uuid4())), - ), + source=DepositionSource(id=str(dep_srn)), + convention_srn=ConventionSRN.parse("urn:osa:localhost:conv:test@1.0.0"), metadata=metadata or {"title": "Test Record"}, ) From 7ca800e154ffa6d4c8b5b65e21712dcd55f3c8fd Mon Sep 17 00:00:00 2001 From: Rory Byrne Date: Wed, 25 Mar 2026 11:38:55 +0000 Subject: [PATCH 4/4] fix: update migration revision dependency to add_device_authorizations --- server/migrations/versions/source_agnostic_records.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/migrations/versions/source_agnostic_records.py b/server/migrations/versions/source_agnostic_records.py index 8f6cc8e..2153bca 100644 --- a/server/migrations/versions/source_agnostic_records.py +++ b/server/migrations/versions/source_agnostic_records.py @@ -4,7 +4,7 @@ No data migration needed — no production data exists. Revision ID: source_agnostic_records -Revises: consumer_group_delivery +Revises: add_device_authorizations Create Date: 2026-03-24 """