diff --git a/Justfile b/Justfile index 1130ac0..ba4f56f 100644 --- a/Justfile +++ b/Justfile @@ -58,6 +58,10 @@ cli *ARGS: init FIELD: uv run osa field init {{FIELD}} +# Wipe all local OSA data (database, vectors, cache) +wipe: + uv run osa local clean --force + # === Complete Workflow === # Set up everything for first time (start DB + run migrations) diff --git a/ingestors/__init__.py b/ingestors/__init__.py deleted file mode 100644 index 4e35d0c..0000000 --- a/ingestors/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -"""First-party ingestors for OSA. - -Ingestors are pluggable components that pull data from external sources. -Each ingestor is registered via entry points in pyproject.toml. -""" diff --git a/ingestors/geo_entrez/__init__.py b/ingestors/geo_entrez/__init__.py deleted file mode 100644 index 7ed0745..0000000 --- a/ingestors/geo_entrez/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -"""GEO Entrez ingestor - uses NCBI E-utilities API.""" - -from ingestors.geo_entrez.config import GEOEntrezConfig -from ingestors.geo_entrez.ingestor import GEOEntrezIngestor - -__all__ = ["GEOEntrezIngestor", "GEOEntrezConfig"] diff --git a/osa/application/api/rest/app.py b/osa/application/api/rest/app.py index 9b28490..877cbb8 100644 --- a/osa/application/api/rest/app.py +++ b/osa/application/api/rest/app.py @@ -11,7 +11,7 @@ from osa.config import Config, configure_logging from osa.domain.shared.error import OSAError from osa.infrastructure.event.worker import BackgroundWorker -from osa.infrastructure.ingest.discovery import validate_ingestors_at_startup +from osa.infrastructure.source.discovery import validate_sources_at_startup from osa.util.di.fastapi import setup_dishka logger = logging.getLogger(__name__) @@ -37,8 +37,8 @@ def create_app() -> FastAPI: configure_logging(config.logging) logger.info("Starting OSA server: %s v%s", config.server.name, config.server.version) - # Validate ingestor configs at startup (fail fast with clear errors) - validate_ingestors_at_startup(config.ingestors) + # Validate source configs at startup (fail fast with clear errors) + validate_sources_at_startup(config.sources) app_instance = FastAPI( title=config.server.name, diff --git a/osa/application/di.py b/osa/application/di.py index cdd63b3..9f70b33 100644 --- a/osa/application/di.py +++ b/osa/application/di.py @@ -6,7 +6,7 @@ from osa.domain.validation.util.di import ValidationProvider from osa.infrastructure.event.di import EventProvider from osa.infrastructure.index.di import IndexProvider -from osa.infrastructure.ingest.di import IngestProvider +from osa.infrastructure.source.di import SourceProvider from osa.infrastructure.oci import OciProvider from osa.infrastructure.persistence import PersistenceProvider from osa.util.di.scope import Scope @@ -22,7 +22,7 @@ def create_container() -> AsyncContainer: PersistenceProvider(), OciProvider(), IndexProvider(), - IngestProvider(), + SourceProvider(), EventProvider(), DepositionProvider(), ValidationProvider(), diff --git a/osa/cli/commands/local.py b/osa/cli/commands/local.py index 80dc013..4b14f37 100644 --- a/osa/cli/commands/local.py +++ b/osa/cli/commands/local.py @@ -37,9 +37,9 @@ # logging: # level: "INFO" -# GEO Ingestor - pulls from NCBI Gene Expression Omnibus via Entrez API -ingestors: - - ingestor: geo-entrez +# GEO Source - pulls from NCBI Gene Expression Omnibus via Entrez API +sources: + - source: geo-entrez config: record_type: gse # gse (~250k all) or gds (~5k curated) email: your@email.com # Required by NCBI - please update this @@ -77,9 +77,9 @@ # logging: # level: "INFO" -# Add your ingestors here: -# ingestors: -# - ingestor: geo-entrez +# Add your sources here: +# sources: +# - source: geo-entrez # config: # email: your@email.com diff --git a/osa/config.py b/osa/config.py index 316c23a..7aa1a3f 100644 --- a/osa/config.py +++ b/osa/config.py @@ -34,41 +34,41 @@ class IndexConfig(BaseModel): # ============================================================================= -# Ingest Configuration +# Source Configuration # ============================================================================= -class IngestSchedule(BaseModel): - """Schedule configuration for an ingestor.""" +class SourceSchedule(BaseModel): + """Schedule configuration for a source.""" cron: str # Cron expression (e.g., "0 * * * *" for hourly) limit: int | None = None # Optional limit per scheduled run class InitialRun(BaseModel): - """Initial run configuration for an ingestor.""" + """Initial run configuration for a source.""" enabled: bool = False limit: int | None = 10 # Limit records for initial run since: datetime | None = None # Optional: bootstrap from specific date -class IngestConfig(BaseModel): - """Configuration for an ingestor. +class SourceConfig(BaseModel): + """Configuration for a source. - The `config` field is validated at runtime based on the ingestor type, - allowing external ingestors to define their own config schemas. + The `config` field is validated at runtime based on the source type, + allowing external sources to define their own config schemas. """ - ingestor: str # "geo-entrez", etc. - matches entry point name - config: dict[str, Any] = {} # Validated at runtime by ingestor's config_class - schedule: IngestSchedule | None = None # Optional: if set, runs on schedule + source: str # "geo-entrez", etc. - matches entry point name + config: dict[str, Any] = {} # Validated at runtime by source's config_class + schedule: SourceSchedule | None = None # Optional: if set, runs on schedule initial_run: InitialRun | None = None # Optional: if set, runs on startup @property def name(self) -> str: - """The ingestor name (same as ingestor type for now).""" - return self.ingestor + """The source name (same as source type for now).""" + return self.source # ============================================================================= @@ -147,7 +147,7 @@ class Config(BaseSettings): database: DatabaseConfig = DatabaseConfig() logging: LoggingConfig = LoggingConfig() indexes: list[IndexConfig] = [] # list of index configs - ingestors: list[IngestConfig] = [] # list of ingestor configs + sources: list[SourceConfig] = [] # list of source configs model_config = { "env_prefix": "OSA_", diff --git a/osa/domain/deposition/command/submit.py b/osa/domain/deposition/command/submit.py index 0546849..d7412f4 100644 --- a/osa/domain/deposition/command/submit.py +++ b/osa/domain/deposition/command/submit.py @@ -30,7 +30,7 @@ async def run(self, cmd: SubmitDeposition) -> DepositionSubmitted: event = DepositionSubmittedEvent( id=EventId(uuid4()), deposition_id=cmd.srn, - metadata={}, # Empty metadata for direct submission (not from ingest) + metadata={}, # Empty metadata for direct submission (not from source) ) await self.outbox.append(event) diff --git a/osa/domain/deposition/model/aggregate.py b/osa/domain/deposition/model/aggregate.py index e643c65..1cae2d9 100644 --- a/osa/domain/deposition/model/aggregate.py +++ b/osa/domain/deposition/model/aggregate.py @@ -13,7 +13,7 @@ class Deposition(Aggregate, Generic[T]): metadata: T files: list[DepositionFile] = [] record_srn: RecordSRN | None = None - provenance: dict[str, Any] = {} # Ingestor info, source tracking + provenance: dict[str, Any] = {} # Source info, provenance tracking def remove_all_files(self) -> None: self.files = [] diff --git a/osa/domain/ingest/__init__.py b/osa/domain/ingest/__init__.py deleted file mode 100644 index 003032d..0000000 --- a/osa/domain/ingest/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Ingest domain - configuration and orchestration for data source ingestors.""" diff --git a/osa/domain/ingest/event/__init__.py b/osa/domain/ingest/event/__init__.py deleted file mode 100644 index f43d6fb..0000000 --- a/osa/domain/ingest/event/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -"""Ingest domain events.""" - -from osa.domain.ingest.event.ingest_requested import IngestRequested -from osa.domain.ingest.event.ingestion_run_completed import IngestionRunCompleted - -__all__ = ["IngestRequested", "IngestionRunCompleted"] diff --git a/osa/domain/ingest/listener/__init__.py b/osa/domain/ingest/listener/__init__.py deleted file mode 100644 index f0163c8..0000000 --- a/osa/domain/ingest/listener/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -"""Ingest domain listeners.""" - -from osa.domain.ingest.listener.ingest_listener import IngestFromUpstream -from osa.domain.ingest.listener.initial_ingest_listener import TriggerInitialIngestion - -__all__ = ["IngestFromUpstream", "TriggerInitialIngestion"] diff --git a/osa/domain/ingest/listener/ingest_listener.py b/osa/domain/ingest/listener/ingest_listener.py deleted file mode 100644 index 2607f6b..0000000 --- a/osa/domain/ingest/listener/ingest_listener.py +++ /dev/null @@ -1,22 +0,0 @@ -"""IngestListener - handles IngestRequested events.""" - -from osa.domain.ingest.event.ingest_requested import IngestRequested -from osa.domain.ingest.service import IngestService -from osa.domain.shared.event import EventListener - - -class IngestFromUpstream(EventListener[IngestRequested]): - """Pulls from upstream source and creates depositions. - - This listener delegates to IngestService for all business logic. - """ - - service: IngestService - - async def handle(self, event: IngestRequested) -> None: - """Delegate to IngestService to pull records and emit deposition events.""" - await self.service.run_ingest( - ingestor_name=event.ingestor_name, - since=event.since, - limit=event.limit, - ) diff --git a/osa/domain/ingest/listener/initial_ingest_listener.py b/osa/domain/ingest/listener/initial_ingest_listener.py deleted file mode 100644 index 4141fbf..0000000 --- a/osa/domain/ingest/listener/initial_ingest_listener.py +++ /dev/null @@ -1,65 +0,0 @@ -"""InitialIngestListener - triggers ingestion on server startup if configured.""" - -import logging -from uuid import uuid4 - -from osa.application.event import ServerStarted -from osa.config import Config -from osa.domain.ingest.event.ingest_requested import IngestRequested -from osa.domain.ingest.event.ingestion_run_completed import IngestionRunCompleted -from osa.domain.ingest.model.registry import IngestorRegistry -from osa.domain.shared.event import EventId, EventListener -from osa.domain.shared.outbox import Outbox - -logger = logging.getLogger(__name__) - - -class TriggerInitialIngestion(EventListener[ServerStarted]): - """Emits IngestRequested on server startup for ingestors with initial_run enabled.""" - - config: Config - ingestors: IngestorRegistry - outbox: Outbox - - async def handle(self, event: ServerStarted) -> None: - """Check each ingestor config and emit IngestRequested if initial_run is enabled.""" - for ingest_config in self.config.ingestors: - if ingest_config.initial_run is None: - continue - if not ingest_config.initial_run.enabled: - continue - - ingestor_name = ingest_config.name - - # Verify ingestor exists in registry - if ingestor_name not in self.ingestors: - logger.error( - f"Initial ingest: ingestor '{ingestor_name}' not found in registry. " - f"Available: {self.ingestors.names()}" - ) - continue - - # Check if initial run already completed for this ingestor - last_run = await self.outbox.find_latest(IngestionRunCompleted) - if last_run and last_run.ingestor_name == ingestor_name: - logger.debug( - f"Initial ingest: skipping '{ingestor_name}' - " - f"already completed at {last_run.completed_at}" - ) - continue - - initial_run = ingest_config.initial_run - limit = initial_run.limit - since = initial_run.since - - logger.info(f"Initial ingest: {ingestor_name} (since={since}, limit={limit})") - - await self.outbox.append( - IngestRequested( - id=EventId(uuid4()), - ingestor_name=ingestor_name, - since=since, - limit=limit, - ) - ) - # Session commit handled by BackgroundWorker diff --git a/osa/domain/ingest/model/__init__.py b/osa/domain/ingest/model/__init__.py deleted file mode 100644 index cd4135e..0000000 --- a/osa/domain/ingest/model/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -"""Ingest domain models.""" - -from osa.domain.ingest.model.registry import IngestorRegistry - -__all__ = ["IngestorRegistry"] diff --git a/osa/domain/ingest/model/registry.py b/osa/domain/ingest/model/registry.py deleted file mode 100644 index 0c379f1..0000000 --- a/osa/domain/ingest/model/registry.py +++ /dev/null @@ -1,29 +0,0 @@ -"""Ingestor registry - typed container for available ingestors.""" - -from collections.abc import Iterator - -from osa.sdk.ingest.ingestor import Ingestor - - -class IngestorRegistry: - """Registry of available ingestors.""" - - def __init__(self, ingestors: dict[str, Ingestor]) -> None: - self._ingestors = ingestors - - def get(self, name: str) -> Ingestor | None: - """Get an ingestor by name.""" - return self._ingestors.get(name) - - def __contains__(self, name: str) -> bool: - return name in self._ingestors - - def __iter__(self) -> Iterator[str]: - return iter(self._ingestors) - - def __len__(self) -> int: - return len(self._ingestors) - - def names(self) -> list[str]: - """List all available ingestor names.""" - return list(self._ingestors.keys()) diff --git a/osa/domain/ingest/schedule/__init__.py b/osa/domain/ingest/schedule/__init__.py deleted file mode 100644 index 7c56e09..0000000 --- a/osa/domain/ingest/schedule/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -"""Ingest scheduled tasks.""" - -from osa.domain.ingest.schedule.ingest_schedule import IngestSchedule - -__all__ = ["IngestSchedule"] diff --git a/osa/domain/ingest/schedule/ingest_schedule.py b/osa/domain/ingest/schedule/ingest_schedule.py deleted file mode 100644 index 28f186d..0000000 --- a/osa/domain/ingest/schedule/ingest_schedule.py +++ /dev/null @@ -1,53 +0,0 @@ -"""IngestSchedule - scheduled task that emits IngestRequested events.""" - -import logging -from dataclasses import dataclass -from typing import Any -from uuid import uuid4 - -from osa.domain.ingest.event.ingest_requested import IngestRequested -from osa.domain.ingest.event.ingestion_run_completed import IngestionRunCompleted -from osa.domain.shared.event import EventId, Schedule -from osa.domain.shared.outbox import Outbox - -logger = logging.getLogger(__name__) - - -@dataclass -class IngestSchedule(Schedule): - """Scheduled task that emits IngestRequested events. - - Looks up the last completed ingestion run to determine the `since` timestamp, - then emits an IngestRequested event to trigger a new ingestion. - """ - - outbox: Outbox - - async def run(self, **params: Any) -> None: - """Emit an IngestRequested event for the given ingestor. - - Params: - ingestor_name: Key into config.ingestors dict (e.g., "geo") - limit: Optional limit on records to fetch - """ - ingestor_name: str = params["ingestor_name"] - limit: int | None = params.get("limit") - - # Look up last completed run for this ingestor - last_run = await self.outbox.find_latest(IngestionRunCompleted) - - # Only use last_run if it's for the same ingestor - since = None - if last_run is not None and last_run.ingestor_name == ingestor_name: - since = last_run.completed_at - - logger.info(f"Scheduled ingest: {ingestor_name} (since={since}, limit={limit})") - - await self.outbox.append( - IngestRequested( - id=EventId(uuid4()), - ingestor_name=ingestor_name, - since=since, - limit=limit, - ) - ) diff --git a/osa/domain/ingest/service/__init__.py b/osa/domain/ingest/service/__init__.py deleted file mode 100644 index cd67af0..0000000 --- a/osa/domain/ingest/service/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -"""Ingest service module.""" - -from osa.domain.ingest.service.ingest import IngestResult, IngestService - -__all__ = ["IngestService", "IngestResult"] diff --git a/osa/domain/shared/event.py b/osa/domain/shared/event.py index e3bd183..af0d439 100644 --- a/osa/domain/shared/event.py +++ b/osa/domain/shared/event.py @@ -82,14 +82,14 @@ class EventListener(Generic[E], metaclass=_EventListenerMeta): based on their generic parameter. Example: - class IngestListener(EventListener[IngestRequested]): + class SourceListener(EventListener[SourceRequested]): outbox: Outbox config: Config - async def handle(self, event: IngestRequested) -> None: + async def handle(self, event: SourceRequested) -> None: ... - # IngestListener.__event_type__ == IngestRequested + # SourceListener.__event_type__ == SourceRequested """ __event_type__: ClassVar[type[Event]] @@ -112,12 +112,12 @@ class Schedule(ABC): Example: @dataclass - class IngestSchedule(Schedule): + class SourceSchedule(Schedule): outbox: Outbox - async def run(self, ingestor_name: str, limit: int | None = None) -> None: - last_run = await self.outbox.find_latest(IngestionRunCompleted) - await self.outbox.append(IngestRequested(...)) + async def run(self, source_name: str, limit: int | None = None) -> None: + last_run = await self.outbox.find_latest(SourceRunCompleted) + await self.outbox.append(SourceRequested(...)) """ @abstractmethod diff --git a/osa/domain/source/__init__.py b/osa/domain/source/__init__.py new file mode 100644 index 0000000..a001339 --- /dev/null +++ b/osa/domain/source/__init__.py @@ -0,0 +1 @@ +"""Source domain - configuration and orchestration for data sources.""" diff --git a/osa/domain/source/event/__init__.py b/osa/domain/source/event/__init__.py new file mode 100644 index 0000000..5ebc989 --- /dev/null +++ b/osa/domain/source/event/__init__.py @@ -0,0 +1,6 @@ +"""Source domain events.""" + +from osa.domain.source.event.source_requested import SourceRequested +from osa.domain.source.event.source_run_completed import SourceRunCompleted + +__all__ = ["SourceRequested", "SourceRunCompleted"] diff --git a/osa/domain/ingest/event/ingest_requested.py b/osa/domain/source/event/source_requested.py similarity index 53% rename from osa/domain/ingest/event/ingest_requested.py rename to osa/domain/source/event/source_requested.py index edfb3fa..6594d59 100644 --- a/osa/domain/ingest/event/ingest_requested.py +++ b/osa/domain/source/event/source_requested.py @@ -1,18 +1,18 @@ -"""IngestRequested event - triggers ingestion from an upstream source.""" +"""SourceRequested event - triggers pulling from a data source.""" from datetime import datetime from osa.domain.shared.event import Event, EventId -class IngestRequested(Event): - """Emitted when ingestion should start for a source. +class SourceRequested(Event): + """Emitted when pulling should start for a source. The `since` field is always set by the emitter (scheduler or initial run listener) - based on the last IngestionRunCompleted event, or None for first run. + based on the last SourceRunCompleted event, or None for first run. """ id: EventId - ingestor_name: str # Key into config.ingestors dict (e.g., "geo") + source_name: str # Key into config.sources list (e.g., "geo-entrez") since: datetime | None # Fetch records updated after this time (None = all time) limit: int | None = None # Optional limit on records to fetch diff --git a/osa/domain/ingest/event/ingestion_run_completed.py b/osa/domain/source/event/source_run_completed.py similarity index 52% rename from osa/domain/ingest/event/ingestion_run_completed.py rename to osa/domain/source/event/source_run_completed.py index e872008..95e6ff0 100644 --- a/osa/domain/ingest/event/ingestion_run_completed.py +++ b/osa/domain/source/event/source_run_completed.py @@ -1,16 +1,16 @@ -"""IngestionRunCompleted event - emitted after an ingestion run finishes.""" +"""SourceRunCompleted event - emitted after a source run finishes.""" from datetime import datetime from osa.domain.shared.event import Event, EventId -class IngestionRunCompleted(Event): - """Emitted after an ingestion run completes successfully.""" +class SourceRunCompleted(Event): + """Emitted after a source run completes successfully.""" id: EventId - ingestor_name: str - source_type: str # "geo", "ena", etc. + source_name: str + source_type: str # "geo-entrez", "ena", etc. started_at: datetime completed_at: datetime record_count: int diff --git a/osa/domain/source/listener/__init__.py b/osa/domain/source/listener/__init__.py new file mode 100644 index 0000000..7435bc6 --- /dev/null +++ b/osa/domain/source/listener/__init__.py @@ -0,0 +1,6 @@ +"""Source domain listeners.""" + +from osa.domain.source.listener.initial_source_listener import TriggerInitialSourceRun +from osa.domain.source.listener.source_listener import PullFromSource + +__all__ = ["PullFromSource", "TriggerInitialSourceRun"] diff --git a/osa/domain/source/listener/initial_source_listener.py b/osa/domain/source/listener/initial_source_listener.py new file mode 100644 index 0000000..6a60985 --- /dev/null +++ b/osa/domain/source/listener/initial_source_listener.py @@ -0,0 +1,65 @@ +"""InitialSourceListener - triggers source pull on server startup if configured.""" + +import logging +from uuid import uuid4 + +from osa.application.event import ServerStarted +from osa.config import Config +from osa.domain.shared.event import EventId, EventListener +from osa.domain.shared.outbox import Outbox +from osa.domain.source.event.source_requested import SourceRequested +from osa.domain.source.event.source_run_completed import SourceRunCompleted +from osa.domain.source.model.registry import SourceRegistry + +logger = logging.getLogger(__name__) + + +class TriggerInitialSourceRun(EventListener[ServerStarted]): + """Emits SourceRequested on server startup for sources with initial_run enabled.""" + + config: Config + sources: SourceRegistry + outbox: Outbox + + async def handle(self, event: ServerStarted) -> None: + """Check each source config and emit SourceRequested if initial_run is enabled.""" + for source_config in self.config.sources: + if source_config.initial_run is None: + continue + if not source_config.initial_run.enabled: + continue + + source_name = source_config.name + + # Verify source exists in registry + if source_name not in self.sources: + logger.error( + f"Initial source run: source '{source_name}' not found in registry. " + f"Available: {self.sources.names()}" + ) + continue + + # Check if initial run already completed for this source + last_run = await self.outbox.find_latest(SourceRunCompleted) + if last_run and last_run.source_name == source_name: + logger.debug( + f"Initial source run: skipping '{source_name}' - " + f"already completed at {last_run.completed_at}" + ) + continue + + initial_run = source_config.initial_run + limit = initial_run.limit + since = initial_run.since + + logger.info(f"Initial source run: {source_name} (since={since}, limit={limit})") + + await self.outbox.append( + SourceRequested( + id=EventId(uuid4()), + source_name=source_name, + since=since, + limit=limit, + ) + ) + # Session commit handled by BackgroundWorker diff --git a/osa/domain/source/listener/source_listener.py b/osa/domain/source/listener/source_listener.py new file mode 100644 index 0000000..9e300d5 --- /dev/null +++ b/osa/domain/source/listener/source_listener.py @@ -0,0 +1,22 @@ +"""SourceListener - handles SourceRequested events.""" + +from osa.domain.shared.event import EventListener +from osa.domain.source.event.source_requested import SourceRequested +from osa.domain.source.service import SourceService + + +class PullFromSource(EventListener[SourceRequested]): + """Pulls from a data source and creates depositions. + + This listener delegates to SourceService for all business logic. + """ + + service: SourceService + + async def handle(self, event: SourceRequested) -> None: + """Delegate to SourceService to pull records and emit deposition events.""" + await self.service.run_source( + source_name=event.source_name, + since=event.since, + limit=event.limit, + ) diff --git a/osa/domain/source/model/__init__.py b/osa/domain/source/model/__init__.py new file mode 100644 index 0000000..ace8022 --- /dev/null +++ b/osa/domain/source/model/__init__.py @@ -0,0 +1,5 @@ +"""Source domain models.""" + +from osa.domain.source.model.registry import SourceRegistry + +__all__ = ["SourceRegistry"] diff --git a/osa/domain/source/model/registry.py b/osa/domain/source/model/registry.py new file mode 100644 index 0000000..8b04d81 --- /dev/null +++ b/osa/domain/source/model/registry.py @@ -0,0 +1,29 @@ +"""Source registry - typed container for available sources.""" + +from collections.abc import Iterator + +from osa.sdk.source.source import Source + + +class SourceRegistry: + """Registry of available sources.""" + + def __init__(self, sources: dict[str, Source]) -> None: + self._sources = sources + + def get(self, name: str) -> Source | None: + """Get a source by name.""" + return self._sources.get(name) + + def __contains__(self, name: str) -> bool: + return name in self._sources + + def __iter__(self) -> Iterator[str]: + return iter(self._sources) + + def __len__(self) -> int: + return len(self._sources) + + def names(self) -> list[str]: + """List all available source names.""" + return list(self._sources.keys()) diff --git a/osa/domain/source/schedule/__init__.py b/osa/domain/source/schedule/__init__.py new file mode 100644 index 0000000..d566437 --- /dev/null +++ b/osa/domain/source/schedule/__init__.py @@ -0,0 +1,5 @@ +"""Source scheduled tasks.""" + +from osa.domain.source.schedule.source_schedule import SourceSchedule + +__all__ = ["SourceSchedule"] diff --git a/osa/domain/source/schedule/source_schedule.py b/osa/domain/source/schedule/source_schedule.py new file mode 100644 index 0000000..2ab91ea --- /dev/null +++ b/osa/domain/source/schedule/source_schedule.py @@ -0,0 +1,53 @@ +"""SourceSchedule - scheduled task that emits SourceRequested events.""" + +import logging +from dataclasses import dataclass +from typing import Any +from uuid import uuid4 + +from osa.domain.shared.event import EventId, Schedule +from osa.domain.shared.outbox import Outbox +from osa.domain.source.event.source_requested import SourceRequested +from osa.domain.source.event.source_run_completed import SourceRunCompleted + +logger = logging.getLogger(__name__) + + +@dataclass +class SourceSchedule(Schedule): + """Scheduled task that emits SourceRequested events. + + Looks up the last completed source run to determine the `since` timestamp, + then emits a SourceRequested event to trigger a new pull. + """ + + outbox: Outbox + + async def run(self, **params: Any) -> None: + """Emit a SourceRequested event for the given source. + + Params: + source_name: Key into config.sources list (e.g., "geo-entrez") + limit: Optional limit on records to fetch + """ + source_name: str = params["source_name"] + limit: int | None = params.get("limit") + + # Look up last completed run for this source + last_run = await self.outbox.find_latest(SourceRunCompleted) + + # Only use last_run if it's for the same source + since = None + if last_run is not None and last_run.source_name == source_name: + since = last_run.completed_at + + logger.info(f"Scheduled source run: {source_name} (since={since}, limit={limit})") + + await self.outbox.append( + SourceRequested( + id=EventId(uuid4()), + source_name=source_name, + since=since, + limit=limit, + ) + ) diff --git a/osa/domain/source/service/__init__.py b/osa/domain/source/service/__init__.py new file mode 100644 index 0000000..ecefd99 --- /dev/null +++ b/osa/domain/source/service/__init__.py @@ -0,0 +1,5 @@ +"""Source service module.""" + +from osa.domain.source.service.source import SourceResult, SourceService + +__all__ = ["SourceService", "SourceResult"] diff --git a/osa/domain/ingest/service/ingest.py b/osa/domain/source/service/source.py similarity index 58% rename from osa/domain/ingest/service/ingest.py rename to osa/domain/source/service/source.py index 36c8197..ee9cbbf 100644 --- a/osa/domain/ingest/service/ingest.py +++ b/osa/domain/source/service/source.py @@ -1,4 +1,4 @@ -"""IngestService - orchestrates ingestion from upstream sources.""" +"""SourceService - orchestrates pulling from data sources.""" import logging from dataclasses import dataclass @@ -6,66 +6,66 @@ from uuid import uuid4 from osa.domain.deposition.event.submitted import DepositionSubmittedEvent -from osa.domain.ingest.event.ingestion_run_completed import IngestionRunCompleted -from osa.domain.ingest.model.registry import IngestorRegistry from osa.domain.shared.event import EventId from osa.domain.shared.model.srn import DepositionSRN, Domain, LocalId from osa.domain.shared.outbox import Outbox from osa.domain.shared.service import Service +from osa.domain.source.event.source_run_completed import SourceRunCompleted +from osa.domain.source.model.registry import SourceRegistry logger = logging.getLogger(__name__) @dataclass -class IngestResult: - """Result of an ingestion run.""" +class SourceResult: + """Result of a source run.""" - ingestor_name: str + source_name: str record_count: int started_at: datetime completed_at: datetime -class IngestService(Service): - """Orchestrates pulling records from upstream ingestors and emitting deposition events. +class SourceService(Service): + """Orchestrates pulling records from sources and emitting deposition events. - This service encapsulates the business logic for ingestion that was previously - embedded in the IngestFromUpstream listener. It can be called from multiple + This service encapsulates the business logic for pulling from sources that was + previously embedded in the PullFromSource listener. It can be called from multiple entry points (event listeners, CLI commands, scheduled jobs). """ - ingestors: IngestorRegistry + sources: SourceRegistry outbox: Outbox node_domain: Domain - async def run_ingest( + async def run_source( self, - ingestor_name: str, + source_name: str, since: datetime | None = None, limit: int | None = None, - ) -> IngestResult: - """Pull records from an ingestor and emit DepositionSubmitted events. + ) -> SourceResult: + """Pull records from a source and emit DepositionSubmitted events. Args: - ingestor_name: Name of the ingestor to use. + source_name: Name of the source to use. since: Only fetch records updated after this time. limit: Maximum number of records to fetch. Returns: - IngestResult with ingestion statistics. + SourceResult with run statistics. Raises: - ValueError: If the ingestor is not found. + ValueError: If the source is not found. """ - ingestor = self.ingestors.get(ingestor_name) - if not ingestor: - raise ValueError(f"Unknown ingestor: {ingestor_name}") + source = self.sources.get(source_name) + if not source: + raise ValueError(f"Unknown source: {source_name}") started_at = datetime.now(UTC) - logger.info(f"Starting ingest from {ingestor_name}, since={since}, limit={limit}") + logger.info(f"Starting pull from {source_name}, since={since}, limit={limit}") count = 0 - async for record in ingestor.pull(since=since, limit=limit): + async for record in source.pull(since=since, limit=limit): # Create a deposition SRN for this record dep_srn = DepositionSRN( domain=self.node_domain, @@ -86,14 +86,14 @@ async def run_ingest( logger.info(f" [{record.source_id}] {title}...") completed_at = datetime.now(UTC) - logger.info(f"Ingest completed: {count} records from {ingestor_name}") + logger.info(f"Pull completed: {count} records from {source_name}") # Emit completion event for tracking await self.outbox.append( - IngestionRunCompleted( + SourceRunCompleted( id=EventId(uuid4()), - ingestor_name=ingestor_name, - source_type=ingestor.name, + source_name=source_name, + source_type=source.name, started_at=started_at, completed_at=completed_at, record_count=count, @@ -102,8 +102,8 @@ async def run_ingest( ) ) - return IngestResult( - ingestor_name=ingestor_name, + return SourceResult( + source_name=source_name, record_count=count, started_at=started_at, completed_at=completed_at, diff --git a/osa/infrastructure/event/di.py b/osa/infrastructure/event/di.py index dbe9298..925d62c 100644 --- a/osa/infrastructure/event/di.py +++ b/osa/infrastructure/event/di.py @@ -7,8 +7,8 @@ from osa.config import Config from osa.domain.curation.listener import AutoApproveCurationTool from osa.domain.index.listener import ProjectNewRecordToIndexes -from osa.domain.ingest.listener import IngestFromUpstream, TriggerInitialIngestion -from osa.domain.ingest.schedule import IngestSchedule +from osa.domain.source.listener import PullFromSource, TriggerInitialSourceRun +from osa.domain.source.schedule import SourceSchedule from osa.domain.record.listener import ConvertDepositionToRecord from osa.domain.shared.event_log import EventLog from osa.domain.shared.outbox import Outbox @@ -29,8 +29,8 @@ # All event listeners - single source of truth LISTENER_TYPES: Subscriptions = Subscriptions( [ - TriggerInitialIngestion, - IngestFromUpstream, + TriggerInitialSourceRun, + PullFromSource, ValidateNewDeposition, AutoApproveCurationTool, ConvertDepositionToRecord, @@ -60,8 +60,8 @@ def get_event_log(self, repo: EventRepository) -> EventLog: for _listener_type in LISTENER_TYPES: locals()[_listener_type.__name__] = provide(_listener_type, scope=Scope.UOW) - # UOW-scoped provider for IngestSchedule - ingest_schedule = provide(IngestSchedule, scope=Scope.UOW) + # UOW-scoped provider for SourceSchedule + source_schedule = provide(SourceSchedule, scope=Scope.UOW) @provide(scope=Scope.APP) def get_subscriptions(self) -> Subscriptions: @@ -73,18 +73,18 @@ def get_schedule_configs(self, config: Config) -> ScheduleConfigs: """Build schedule configs from application config.""" configs: list[ScheduleConfig] = [] - for ingestor in config.ingestors: - if ingestor.schedule is None: + for source in config.sources: + if source.schedule is None: continue configs.append( ScheduleConfig( - schedule_type=IngestSchedule, - cron=ingestor.schedule.cron, - id=f"ingest-{ingestor.name}", + schedule_type=SourceSchedule, + cron=source.schedule.cron, + id=f"source-{source.name}", params={ - "ingestor_name": ingestor.name, - "limit": ingestor.schedule.limit, + "source_name": source.name, + "limit": source.schedule.limit, }, ) ) diff --git a/osa/infrastructure/ingest/__init__.py b/osa/infrastructure/ingest/__init__.py deleted file mode 100644 index 77713a3..0000000 --- a/osa/infrastructure/ingest/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Ingest infrastructure - ingestor implementations.""" diff --git a/osa/infrastructure/ingest/di.py b/osa/infrastructure/ingest/di.py deleted file mode 100644 index 30b4de6..0000000 --- a/osa/infrastructure/ingest/di.py +++ /dev/null @@ -1,59 +0,0 @@ -"""Dependency injection provider for ingestors.""" - -from dishka import Provider, provide - -from osa.config import Config -from osa.domain.ingest.model.registry import IngestorRegistry -from osa.domain.ingest.service import IngestService -from osa.domain.shared.model.srn import Domain -from osa.domain.shared.outbox import Outbox -from osa.infrastructure.ingest.discovery import ( - discover_ingestors, - validate_all_ingestor_configs, -) -from osa.sdk.ingest.ingestor import Ingestor -from osa.util.di.scope import Scope - - -class IngestProvider(Provider): - """Provides configured ingestors.""" - - @provide(scope=Scope.APP) - def get_ingestors(self, config: Config) -> IngestorRegistry: - """Build all configured ingestors. - - Discovers available ingestors via entry points, validates - configuration, and instantiates each configured ingestor. - - Returns: - Registry of ingestors. - """ - # Discover available ingestor classes - available = discover_ingestors() - - # Validate all configs and get (class, validated_config) pairs - validated = validate_all_ingestor_configs(config.ingestors, available) - - # Instantiate ingestors - ingestors: dict[str, Ingestor] = {} - for name, (ingestor_cls, validated_config) in validated.items(): - ingestors[name] = ingestor_cls(validated_config) - - return IngestorRegistry(ingestors) - - @provide(scope=Scope.UOW) - def get_ingest_service( - self, - ingestors: IngestorRegistry, - outbox: Outbox, - config: Config, - ) -> IngestService: - """Provide IngestService for UOW scope. - - IngestService is UOW-scoped because it needs fresh Outbox per unit of work. - """ - return IngestService( - ingestors=ingestors, - outbox=outbox, - node_domain=Domain(config.server.domain), - ) diff --git a/osa/infrastructure/ingest/discovery.py b/osa/infrastructure/ingest/discovery.py deleted file mode 100644 index 202f64b..0000000 --- a/osa/infrastructure/ingest/discovery.py +++ /dev/null @@ -1,174 +0,0 @@ -"""Ingestor discovery via entry points.""" - -from __future__ import annotations - -import logging -from importlib.metadata import entry_points -from typing import TYPE_CHECKING, Any - -from pydantic import BaseModel, ValidationError - -from osa.sdk.ingest.ingestor import Ingestor - -if TYPE_CHECKING: - from osa.config import IngestConfig - -logger = logging.getLogger(__name__) - -ENTRY_POINT_GROUP = "osa.ingestors" - - -def discover_ingestors() -> dict[str, type[Ingestor]]: - """Discover available ingestors via entry points. - - Scans for entry points in the 'osa.ingestors' group and loads - the ingestor classes. Each entry point should point to a class - that implements the Ingestor protocol. - - Returns: - Dict mapping ingestor type names to their classes. - - Example pyproject.toml entry: - [project.entry-points."osa.ingestors"] - geo-entrez = "ingestors.geo_entrez:GEOEntrezIngestor" - """ - ingestors: dict[str, type[Ingestor]] = {} - eps = entry_points(group=ENTRY_POINT_GROUP) - - for ep in eps: - try: - cls = ep.load() - _validate_ingestor_class(cls, ep.name) - ingestors[ep.name] = cls - logger.debug("Discovered ingestor: %s -> %s", ep.name, cls.__name__) - except Exception as e: - logger.warning("Failed to load ingestor '%s': %s", ep.name, e) - - return ingestors - - -def _validate_ingestor_class(cls: Any, name: str) -> None: - """Validate that a class conforms to the Ingestor protocol. - - Args: - cls: The class to validate (loaded from entry point). - name: The entry point name for error messages. - - Raises: - TypeError: If the class doesn't conform to the Ingestor protocol. - """ - if not isinstance(cls, type): - raise TypeError(f"Ingestor {name} must be a class, got {type(cls).__name__}") - if not hasattr(cls, "name"): - raise TypeError(f"Ingestor {name} missing 'name' class attribute") - if not hasattr(cls, "config_class"): - raise TypeError(f"Ingestor {name} missing 'config_class' class attribute") - if not issubclass(cls.config_class, BaseModel): - raise TypeError(f"Ingestor {name} config_class must be a Pydantic BaseModel") - - -def validate_ingestor_config( - ingestor_cls: type[Ingestor], - config_data: dict[str, Any], -) -> BaseModel: - """Validate configuration data against an ingestor's config class. - - Args: - ingestor_cls: The ingestor class with a config_class attribute. - config_data: Raw configuration dictionary from user config. - - Returns: - Validated config instance. - - Raises: - ValidationError: If config data doesn't match the schema. - """ - return ingestor_cls.config_class.model_validate(config_data) - - -class IngestorConfigError(Exception): - """Raised when ingestor configuration validation fails.""" - - def __init__( - self, - ingestor_name: str, - name: str, - validation_error: ValidationError, - ) -> None: - self.ingestor_name = ingestor_name - self.name = name - self.validation_error = validation_error - super().__init__(self._format_message()) - - def _format_message(self) -> str: - """Format a human-readable error message.""" - lines = [f"Invalid config for ingestor '{self.ingestor_name}' ({self.name}):"] - for err in self.validation_error.errors(): - loc = ".".join(str(x) for x in err.get("loc", [])) - msg = err.get("msg", "Unknown error") - lines.append(f" - {loc}: {msg}") - return "\n".join(lines) - - -def validate_ingestors_at_startup(ingestors_config: list[IngestConfig]) -> None: - """Validate all ingestor configurations at application startup. - - Call this early in startup to fail fast with clear error messages - if any ingestor configuration is invalid. - - Args: - ingestors_config: List of IngestConfig from user config. - - Raises: - IngestorConfigError: If any configuration is invalid. - ValueError: If an unknown ingestor type is specified or duplicates exist. - """ - available = discover_ingestors() - validate_all_ingestor_configs(ingestors_config, available) - - -def validate_all_ingestor_configs( - ingestors_config: list[IngestConfig], - available_ingestors: dict[str, type[Ingestor]], -) -> dict[str, tuple[type[Ingestor], BaseModel]]: - """Validate all ingestor configurations at startup. - - Args: - ingestors_config: List of IngestConfig from user config. - available_ingestors: Dict of ingestor type -> class from discovery. - - Returns: - Dict of ingestor name -> (class, validated_config). - - Raises: - IngestorConfigError: If any configuration is invalid. - ValueError: If an unknown ingestor type is specified or duplicates exist. - """ - validated: dict[str, tuple[type[Ingestor], BaseModel]] = {} - - for ing_config in ingestors_config: - name = ing_config.ingestor - - # Check for duplicates - if name in validated: - raise ValueError( - f"Duplicate ingestor '{name}'. Each ingestor type can only be configured once." - ) - - if name not in available_ingestors: - available = ", ".join(sorted(available_ingestors.keys())) or "(none)" - raise ValueError(f"Unknown ingestor type '{name}'. Available: {available}") - - ingestor_cls = available_ingestors[name] - - try: - config = validate_ingestor_config(ingestor_cls, ing_config.config) - validated[name] = (ingestor_cls, config) - except ValidationError as e: - raise IngestorConfigError( - ingestor_name=name, - name=name, - validation_error=e, - ) from e - - return validated diff --git a/osa/infrastructure/source/__init__.py b/osa/infrastructure/source/__init__.py new file mode 100644 index 0000000..701b44d --- /dev/null +++ b/osa/infrastructure/source/__init__.py @@ -0,0 +1 @@ +"""Source infrastructure - source implementations.""" diff --git a/osa/infrastructure/source/di.py b/osa/infrastructure/source/di.py new file mode 100644 index 0000000..bdeb469 --- /dev/null +++ b/osa/infrastructure/source/di.py @@ -0,0 +1,59 @@ +"""Dependency injection provider for sources.""" + +from dishka import Provider, provide + +from osa.config import Config +from osa.domain.shared.model.srn import Domain +from osa.domain.shared.outbox import Outbox +from osa.domain.source.model.registry import SourceRegistry +from osa.domain.source.service import SourceService +from osa.infrastructure.source.discovery import ( + discover_sources, + validate_all_source_configs, +) +from osa.sdk.source.source import Source +from osa.util.di.scope import Scope + + +class SourceProvider(Provider): + """Provides configured sources.""" + + @provide(scope=Scope.APP) + def get_sources(self, config: Config) -> SourceRegistry: + """Build all configured sources. + + Discovers available sources via entry points, validates + configuration, and instantiates each configured source. + + Returns: + Registry of sources. + """ + # Discover available source classes + available = discover_sources() + + # Validate all configs and get (class, validated_config) pairs + validated = validate_all_source_configs(config.sources, available) + + # Instantiate sources + sources: dict[str, Source] = {} + for name, (source_cls, validated_config) in validated.items(): + sources[name] = source_cls(validated_config) + + return SourceRegistry(sources) + + @provide(scope=Scope.UOW) + def get_source_service( + self, + sources: SourceRegistry, + outbox: Outbox, + config: Config, + ) -> SourceService: + """Provide SourceService for UOW scope. + + SourceService is UOW-scoped because it needs fresh Outbox per unit of work. + """ + return SourceService( + sources=sources, + outbox=outbox, + node_domain=Domain(config.server.domain), + ) diff --git a/osa/infrastructure/source/discovery.py b/osa/infrastructure/source/discovery.py new file mode 100644 index 0000000..7a209e6 --- /dev/null +++ b/osa/infrastructure/source/discovery.py @@ -0,0 +1,174 @@ +"""Source discovery via entry points.""" + +from __future__ import annotations + +import logging +from importlib.metadata import entry_points +from typing import TYPE_CHECKING, Any + +from pydantic import BaseModel, ValidationError + +from osa.sdk.source.source import Source + +if TYPE_CHECKING: + from osa.config import SourceConfig + +logger = logging.getLogger(__name__) + +ENTRY_POINT_GROUP = "osa.sources" + + +def discover_sources() -> dict[str, type[Source]]: + """Discover available sources via entry points. + + Scans for entry points in the 'osa.sources' group and loads + the source classes. Each entry point should point to a class + that implements the Source protocol. + + Returns: + Dict mapping source type names to their classes. + + Example pyproject.toml entry: + [project.entry-points."osa.sources"] + geo-entrez = "sources.geo_entrez:GEOEntrezSource" + """ + sources: dict[str, type[Source]] = {} + eps = entry_points(group=ENTRY_POINT_GROUP) + + for ep in eps: + try: + cls = ep.load() + _validate_source_class(cls, ep.name) + sources[ep.name] = cls + logger.debug("Discovered source: %s -> %s", ep.name, cls.__name__) + except Exception as e: + logger.warning("Failed to load source '%s': %s", ep.name, e) + + return sources + + +def _validate_source_class(cls: Any, name: str) -> None: + """Validate that a class conforms to the Source protocol. + + Args: + cls: The class to validate (loaded from entry point). + name: The entry point name for error messages. + + Raises: + TypeError: If the class doesn't conform to the Source protocol. + """ + if not isinstance(cls, type): + raise TypeError(f"Source {name} must be a class, got {type(cls).__name__}") + if not hasattr(cls, "name"): + raise TypeError(f"Source {name} missing 'name' class attribute") + if not hasattr(cls, "config_class"): + raise TypeError(f"Source {name} missing 'config_class' class attribute") + if not issubclass(cls.config_class, BaseModel): + raise TypeError(f"Source {name} config_class must be a Pydantic BaseModel") + + +def validate_source_config( + source_cls: type[Source], + config_data: dict[str, Any], +) -> BaseModel: + """Validate configuration data against a source's config class. + + Args: + source_cls: The source class with a config_class attribute. + config_data: Raw configuration dictionary from user config. + + Returns: + Validated config instance. + + Raises: + ValidationError: If config data doesn't match the schema. + """ + return source_cls.config_class.model_validate(config_data) + + +class SourceConfigError(Exception): + """Raised when source configuration validation fails.""" + + def __init__( + self, + source_name: str, + name: str, + validation_error: ValidationError, + ) -> None: + self.source_name = source_name + self.name = name + self.validation_error = validation_error + super().__init__(self._format_message()) + + def _format_message(self) -> str: + """Format a human-readable error message.""" + lines = [f"Invalid config for source '{self.source_name}' ({self.name}):"] + for err in self.validation_error.errors(): + loc = ".".join(str(x) for x in err.get("loc", [])) + msg = err.get("msg", "Unknown error") + lines.append(f" - {loc}: {msg}") + return "\n".join(lines) + + +def validate_sources_at_startup(sources_config: list[SourceConfig]) -> None: + """Validate all source configurations at application startup. + + Call this early in startup to fail fast with clear error messages + if any source configuration is invalid. + + Args: + sources_config: List of SourceConfig from user config. + + Raises: + SourceConfigError: If any configuration is invalid. + ValueError: If an unknown source type is specified or duplicates exist. + """ + available = discover_sources() + validate_all_source_configs(sources_config, available) + + +def validate_all_source_configs( + sources_config: list[SourceConfig], + available_sources: dict[str, type[Source]], +) -> dict[str, tuple[type[Source], BaseModel]]: + """Validate all source configurations at startup. + + Args: + sources_config: List of SourceConfig from user config. + available_sources: Dict of source type -> class from discovery. + + Returns: + Dict of source name -> (class, validated_config). + + Raises: + SourceConfigError: If any configuration is invalid. + ValueError: If an unknown source type is specified or duplicates exist. + """ + validated: dict[str, tuple[type[Source], BaseModel]] = {} + + for src_config in sources_config: + name = src_config.source + + # Check for duplicates + if name in validated: + raise ValueError( + f"Duplicate source '{name}'. Each source type can only be configured once." + ) + + if name not in available_sources: + available = ", ".join(sorted(available_sources.keys())) or "(none)" + raise ValueError(f"Unknown source type '{name}'. Available: {available}") + + source_cls = available_sources[name] + + try: + config = validate_source_config(source_cls, src_config.config) + validated[name] = (source_cls, config) + except ValidationError as e: + raise SourceConfigError( + source_name=name, + name=name, + validation_error=e, + ) from e + + return validated diff --git a/osa/sdk/ingest/__init__.py b/osa/sdk/ingest/__init__.py deleted file mode 100644 index 85e095e..0000000 --- a/osa/sdk/ingest/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -"""Ingest SDK - Protocols and types for pluggable data source ingestors.""" - -from osa.sdk.ingest.config import IngestorConfig -from osa.sdk.ingest.ingestor import Ingestor -from osa.sdk.ingest.record import UpstreamRecord - -__all__ = [ - "Ingestor", - "IngestorConfig", - "UpstreamRecord", -] diff --git a/osa/sdk/ingest/config.py b/osa/sdk/ingest/config.py deleted file mode 100644 index 86a930c..0000000 --- a/osa/sdk/ingest/config.py +++ /dev/null @@ -1,12 +0,0 @@ -"""Base configuration for ingestors.""" - -from pydantic import BaseModel - - -class IngestorConfig(BaseModel): - """Base configuration for ingestors. - - Extend this class for source-specific configuration. - """ - - pass diff --git a/osa/sdk/source/__init__.py b/osa/sdk/source/__init__.py new file mode 100644 index 0000000..3e5d15f --- /dev/null +++ b/osa/sdk/source/__init__.py @@ -0,0 +1,11 @@ +"""Source SDK - Protocols and types for pluggable data sources.""" + +from osa.sdk.source.config import SourceConfig +from osa.sdk.source.record import UpstreamRecord +from osa.sdk.source.source import Source + +__all__ = [ + "Source", + "SourceConfig", + "UpstreamRecord", +] diff --git a/osa/sdk/source/config.py b/osa/sdk/source/config.py new file mode 100644 index 0000000..df719fa --- /dev/null +++ b/osa/sdk/source/config.py @@ -0,0 +1,12 @@ +"""Base configuration for sources.""" + +from pydantic import BaseModel + + +class SourceConfig(BaseModel): + """Base configuration for sources. + + Extend this class for source-specific configuration. + """ + + pass diff --git a/osa/sdk/ingest/record.py b/osa/sdk/source/record.py similarity index 100% rename from osa/sdk/ingest/record.py rename to osa/sdk/source/record.py diff --git a/osa/sdk/ingest/ingestor.py b/osa/sdk/source/source.py similarity index 76% rename from osa/sdk/ingest/ingestor.py rename to osa/sdk/source/source.py index d43239d..28a0239 100644 --- a/osa/sdk/ingest/ingestor.py +++ b/osa/sdk/source/source.py @@ -1,4 +1,4 @@ -"""Ingestor protocol for pluggable data source ingestors.""" +"""Source protocol for pluggable data sources.""" from collections.abc import AsyncIterator from datetime import datetime @@ -6,17 +6,17 @@ from pydantic import BaseModel -from osa.sdk.ingest.record import UpstreamRecord +from osa.sdk.source.record import UpstreamRecord -class Ingestor(Protocol): - """Protocol for pluggable data source ingestors. +class Source(Protocol): + """Protocol for pluggable data sources. - Implement this protocol to create custom ingestors - for different data sources (e.g., GEO, ENA, Zenodo). + Implement this protocol to create custom sources + for different data origins (e.g., GEO, ENA, Zenodo, user uploads). Class attributes: - name: Unique identifier for this ingestor (e.g., 'geo-entrez'). + name: Unique identifier for this source (e.g., 'geo-entrez'). Must match the entry point name. config_class: Pydantic model for validating configuration. """ @@ -25,7 +25,7 @@ class Ingestor(Protocol): config_class: ClassVar[type[BaseModel]] def __init__(self, config: BaseModel) -> None: - """Initialize the ingestor with validated configuration.""" + """Initialize the source with validated configuration.""" ... def pull( diff --git a/pyproject.toml b/pyproject.toml index 9d9251e..83b649b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,8 +31,8 @@ dependencies = [ [project.scripts] osa = "osa:main" -[project.entry-points."osa.ingestors"] -geo-entrez = "ingestors.geo_entrez:GEOEntrezIngestor" +[project.entry-points."osa.sources"] +geo-entrez = "sources.geo_entrez:GEOEntrezSource" [build-system] requires = ["hatchling"] diff --git a/sources/__init__.py b/sources/__init__.py new file mode 100644 index 0000000..ab04c06 --- /dev/null +++ b/sources/__init__.py @@ -0,0 +1,5 @@ +"""First-party sources for OSA. + +Sources are pluggable components that pull data from external origins. +Each source is registered via entry points in pyproject.toml. +""" diff --git a/sources/geo_entrez/__init__.py b/sources/geo_entrez/__init__.py new file mode 100644 index 0000000..e00b070 --- /dev/null +++ b/sources/geo_entrez/__init__.py @@ -0,0 +1,6 @@ +"""GEO Entrez source - uses NCBI E-utilities API.""" + +from sources.geo_entrez.config import GEOEntrezConfig +from sources.geo_entrez.source import GEOEntrezSource + +__all__ = ["GEOEntrezSource", "GEOEntrezConfig"] diff --git a/ingestors/geo_entrez/config.py b/sources/geo_entrez/config.py similarity index 73% rename from ingestors/geo_entrez/config.py rename to sources/geo_entrez/config.py index 949f4e2..8f711c5 100644 --- a/ingestors/geo_entrez/config.py +++ b/sources/geo_entrez/config.py @@ -1,16 +1,16 @@ -"""Configuration for GEO Entrez ingestor.""" +"""Configuration for GEO Entrez source.""" from typing import Literal from pydantic import EmailStr -from osa.sdk.ingest.config import IngestorConfig +from osa.sdk.source.config import SourceConfig GEORecordType = Literal["gse", "gds"] -class GEOEntrezConfig(IngestorConfig): - """Configuration for GEO Entrez (E-utilities) ingestor. +class GEOEntrezConfig(SourceConfig): + """Configuration for GEO Entrez (E-utilities) source. Uses NCBI E-utilities API for incremental updates. """ diff --git a/ingestors/geo_entrez/ingestor.py b/sources/geo_entrez/source.py similarity index 97% rename from ingestors/geo_entrez/ingestor.py rename to sources/geo_entrez/source.py index 6f5b8c0..f50503d 100644 --- a/ingestors/geo_entrez/ingestor.py +++ b/sources/geo_entrez/source.py @@ -1,4 +1,4 @@ -"""GEO Entrez ingestor - uses NCBI E-utilities API.""" +"""GEO Entrez source - uses NCBI E-utilities API.""" from collections.abc import AsyncIterator from datetime import UTC, datetime @@ -6,11 +6,11 @@ import httpx -from ingestors.geo_entrez.config import GEOEntrezConfig -from osa.sdk.ingest.record import UpstreamRecord +from osa.sdk.source.record import UpstreamRecord +from sources.geo_entrez.config import GEOEntrezConfig -class GEOEntrezIngestor: +class GEOEntrezSource: """Pulls GEO metadata from NCBI via E-utilities (Entrez). Supports both GSE (Series, ~230k records) and GDS (DataSets, ~5k curated). diff --git a/tests/integration/infrastructure/ingest/__init__.py b/tests/integration/infrastructure/ingest/__init__.py deleted file mode 100644 index 2f11cee..0000000 --- a/tests/integration/infrastructure/ingest/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Ingest infrastructure integration tests.""" diff --git a/tests/integration/infrastructure/source/__init__.py b/tests/integration/infrastructure/source/__init__.py new file mode 100644 index 0000000..a460f9d --- /dev/null +++ b/tests/integration/infrastructure/source/__init__.py @@ -0,0 +1 @@ +"""Source infrastructure integration tests.""" diff --git a/tests/test_ingestors/__init__.py b/tests/test_sources/__init__.py similarity index 100% rename from tests/test_ingestors/__init__.py rename to tests/test_sources/__init__.py diff --git a/tests/test_ingestors/geo_entrez/__init__.py b/tests/test_sources/geo_entrez/__init__.py similarity index 100% rename from tests/test_ingestors/geo_entrez/__init__.py rename to tests/test_sources/geo_entrez/__init__.py diff --git a/tests/test_ingestors/geo_entrez/test_ingestor.py b/tests/test_sources/geo_entrez/test_source.py similarity index 63% rename from tests/test_ingestors/geo_entrez/test_ingestor.py rename to tests/test_sources/geo_entrez/test_source.py index 816ff18..bb2b0e7 100644 --- a/tests/test_ingestors/geo_entrez/test_ingestor.py +++ b/tests/test_sources/geo_entrez/test_source.py @@ -1,12 +1,12 @@ -"""Integration tests for GEO Entrez ingestor against live NCBI E-utilities API.""" +"""Integration tests for GEO Entrez source against live NCBI E-utilities API.""" from collections.abc import AsyncGenerator from datetime import datetime import pytest -from ingestors.geo_entrez import GEOEntrezConfig, GEOEntrezIngestor -from osa.sdk.ingest.record import UpstreamRecord +from osa.sdk.source.record import UpstreamRecord +from sources.geo_entrez import GEOEntrezConfig, GEOEntrezSource @pytest.fixture @@ -18,26 +18,26 @@ def geo_config() -> GEOEntrezConfig: @pytest.fixture -async def geo_ingestor( +async def geo_source( geo_config: GEOEntrezConfig, -) -> AsyncGenerator[GEOEntrezIngestor, None]: - ingestor = GEOEntrezIngestor(geo_config) - yield ingestor - await ingestor.close() +) -> AsyncGenerator[GEOEntrezSource, None]: + source = GEOEntrezSource(geo_config) + yield source + await source.close() -class TestGEOEntrezIngestorIntegration: +class TestGEOEntrezSourceIntegration: """Integration tests that hit the live GEO API.""" - async def test_health_returns_true(self, geo_ingestor: GEOEntrezIngestor) -> None: + async def test_health_returns_true(self, geo_source: GEOEntrezSource) -> None: """Health check should return True when GEO API is reachable.""" - result = await geo_ingestor.health() + result = await geo_source.health() assert result is True - async def test_get_one_returns_upstream_record(self, geo_ingestor: GEOEntrezIngestor) -> None: + async def test_get_one_returns_upstream_record(self, geo_source: GEOEntrezSource) -> None: """Fetching a known GSE should return a valid UpstreamRecord.""" # GSE1 is one of the earliest GEO series, stable for testing - record = await geo_ingestor.get_one("GSE1") + record = await geo_source.get_one("GSE1") assert record is not None assert isinstance(record, UpstreamRecord) @@ -52,16 +52,16 @@ async def test_get_one_returns_upstream_record(self, geo_ingestor: GEOEntrezInge assert "title" in record.metadata assert record.metadata["title"] is not None - async def test_get_one_nonexistent_returns_none(self, geo_ingestor: GEOEntrezIngestor) -> None: + async def test_get_one_nonexistent_returns_none(self, geo_source: GEOEntrezSource) -> None: """Fetching a nonexistent GSE should return None.""" - record = await geo_ingestor.get_one("GSE999999999999") + record = await geo_source.get_one("GSE999999999999") assert record is None - async def test_pull_yields_upstream_records(self, geo_ingestor: GEOEntrezIngestor) -> None: + async def test_pull_yields_upstream_records(self, geo_source: GEOEntrezSource) -> None: """Pulling records should yield valid UpstreamRecords.""" records: list[UpstreamRecord] = [] - async for record in geo_ingestor.pull(limit=3): + async for record in geo_source.pull(limit=3): records.append(record) assert len(records) >= 1 diff --git a/tests/unit/domain/ingest/__init__.py b/tests/unit/domain/source/__init__.py similarity index 100% rename from tests/unit/domain/ingest/__init__.py rename to tests/unit/domain/source/__init__.py diff --git a/tests/unit/domain/ingest/test_ingest_service.py b/tests/unit/domain/source/test_source_service.py similarity index 61% rename from tests/unit/domain/ingest/test_ingest_service.py rename to tests/unit/domain/source/test_source_service.py index ea6a2c9..d64c44f 100644 --- a/tests/unit/domain/ingest/test_ingest_service.py +++ b/tests/unit/domain/source/test_source_service.py @@ -1,4 +1,4 @@ -"""Unit tests for IngestService.""" +"""Unit tests for SourceService.""" from datetime import datetime, timezone from unittest.mock import AsyncMock, MagicMock @@ -6,17 +6,17 @@ import pytest from osa.config import Config -from osa.domain.ingest.model.registry import IngestorRegistry -from osa.domain.ingest.service.ingest import IngestService from osa.domain.shared.model.srn import Domain from osa.domain.shared.outbox import Outbox -from osa.sdk.ingest.record import UpstreamRecord +from osa.domain.source.model.registry import SourceRegistry +from osa.domain.source.service.source import SourceService +from osa.sdk.source.record import UpstreamRecord -class FakeIngestor: - """Fake ingestor for testing.""" +class FakeSource: + """Fake source for testing.""" - name = "fake-ingestor" + name = "fake-source" def __init__(self, records: list[UpstreamRecord]): self._records = records @@ -63,42 +63,42 @@ def sample_records() -> list[UpstreamRecord]: ] -class TestIngestService: - """Tests for IngestService.""" +class TestSourceService: + """Tests for SourceService.""" @pytest.mark.asyncio - async def test_run_ingest_emits_deposition_events( + async def test_run_source_emits_deposition_events( self, mock_outbox: Outbox, mock_config: Config, sample_records: list[UpstreamRecord], ): - """Service should emit DepositionSubmittedEvent for each ingested record.""" + """Service should emit DepositionSubmittedEvent for each record.""" # Arrange - fake_ingestor = FakeIngestor(sample_records) - registry = IngestorRegistry({"fake": fake_ingestor}) + fake_source = FakeSource(sample_records) + registry = SourceRegistry({"fake": fake_source}) - service = IngestService( - ingestors=registry, + service = SourceService( + sources=registry, outbox=mock_outbox, node_domain=Domain(mock_config.server.domain), ) # Act - result = await service.run_ingest( - ingestor_name="fake", + result = await service.run_source( + source_name="fake", since=None, limit=None, ) # Assert assert result.record_count == 2 - assert result.ingestor_name == "fake" - # Two DepositionSubmittedEvent + one IngestionRunCompleted + assert result.source_name == "fake" + # Two DepositionSubmittedEvent + one SourceRunCompleted assert mock_outbox.append.call_count == 3 @pytest.mark.asyncio - async def test_run_ingest_with_limit( + async def test_run_source_with_limit( self, mock_outbox: Outbox, mock_config: Config, @@ -106,18 +106,18 @@ async def test_run_ingest_with_limit( ): """Service should respect limit parameter.""" # Arrange - fake_ingestor = FakeIngestor(sample_records) - registry = IngestorRegistry({"fake": fake_ingestor}) + fake_source = FakeSource(sample_records) + registry = SourceRegistry({"fake": fake_source}) - service = IngestService( - ingestors=registry, + service = SourceService( + sources=registry, outbox=mock_outbox, node_domain=Domain(mock_config.server.domain), ) # Act - result = await service.run_ingest( - ingestor_name="fake", + result = await service.run_source( + source_name="fake", since=None, limit=1, ) @@ -126,61 +126,59 @@ async def test_run_ingest_with_limit( assert result.record_count == 1 @pytest.mark.asyncio - async def test_run_ingest_unknown_ingestor_raises( + async def test_run_source_unknown_source_raises( self, mock_outbox: Outbox, mock_config: Config, ): - """Service should raise error for unknown ingestor.""" + """Service should raise error for unknown source.""" # Arrange - registry = IngestorRegistry({}) + registry = SourceRegistry({}) - service = IngestService( - ingestors=registry, + service = SourceService( + sources=registry, outbox=mock_outbox, node_domain=Domain(mock_config.server.domain), ) # Act & Assert - with pytest.raises(ValueError, match="Unknown ingestor"): - await service.run_ingest( - ingestor_name="nonexistent", + with pytest.raises(ValueError, match="Unknown source"): + await service.run_source( + source_name="nonexistent", since=None, limit=None, ) @pytest.mark.asyncio - async def test_run_ingest_emits_completion_event( + async def test_run_source_emits_completion_event( self, mock_outbox: Outbox, mock_config: Config, sample_records: list[UpstreamRecord], ): - """Service should emit IngestionRunCompleted event after ingestion.""" + """Service should emit SourceRunCompleted event after pulling.""" # Arrange - fake_ingestor = FakeIngestor(sample_records) - registry = IngestorRegistry({"fake": fake_ingestor}) + fake_source = FakeSource(sample_records) + registry = SourceRegistry({"fake": fake_source}) - service = IngestService( - ingestors=registry, + service = SourceService( + sources=registry, outbox=mock_outbox, node_domain=Domain(mock_config.server.domain), ) # Act - await service.run_ingest( - ingestor_name="fake", + await service.run_source( + source_name="fake", since=None, limit=None, ) # Assert - last call should be the completion event - from osa.domain.ingest.event.ingestion_run_completed import ( - IngestionRunCompleted, - ) + from osa.domain.source.event.source_run_completed import SourceRunCompleted last_call = mock_outbox.append.call_args_list[-1] event = last_call[0][0] - assert isinstance(event, IngestionRunCompleted) + assert isinstance(event, SourceRunCompleted) assert event.record_count == 2 - assert event.ingestor_name == "fake" + assert event.source_name == "fake"