Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion server/migrations/versions/0d9fbacf8e58_initial_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision: str = "0d9fbacf8e58"
Expand Down Expand Up @@ -53,13 +54,20 @@ def upgrade() -> None:
"records",
sa.Column("srn", sa.String(), nullable=False),
sa.Column("deposition_srn", sa.String(), nullable=False),
sa.Column("metadata", sa.JSON(), nullable=False),
sa.Column("metadata", postgresql.JSONB(), nullable=False),
sa.Column("indexes", sa.JSON(), nullable=False),
sa.Column("published_at", sa.DateTime(timezone=True), nullable=False),
sa.PrimaryKeyConstraint("srn"),
)
op.create_index("idx_records_deposition_srn", "records", ["deposition_srn"])
op.create_index("idx_records_published_at", "records", ["published_at"])
op.create_index(
"idx_records_metadata_gin",
"records",
["metadata"],
postgresql_using="gin",
postgresql_ops={"metadata": "jsonb_path_ops"},
)
Comment thread
rorybyrne marked this conversation as resolved.

# EVENTS (Outbox)
op.create_table(
Expand Down Expand Up @@ -89,6 +97,7 @@ def downgrade() -> None:
op.drop_table("events")

# RECORDS
op.drop_index("idx_records_metadata_gin", table_name="records")
op.drop_index("idx_records_published_at", table_name="records")
op.drop_index("idx_records_deposition_srn", table_name="records")
op.drop_table("records")
Expand Down
2 changes: 2 additions & 0 deletions server/osa/application/api/rest/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
auth,
conventions,
depositions,
discovery,
events,
health,
ontologies,
Expand Down Expand Up @@ -89,6 +90,7 @@ def create_app() -> FastAPI:
app_instance.include_router(conventions.router, prefix="/api/v1")
app_instance.include_router(depositions.router, prefix="/api/v1")
app_instance.include_router(validation.router, prefix="/api/v1")
app_instance.include_router(discovery.router, prefix="/api/v1")

# Global OSA error handler - maps domain and infrastructure errors to HTTP responses
@app_instance.exception_handler(OSAError)
Expand Down
130 changes: 130 additions & 0 deletions server/osa/application/api/v1/routes/discovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
"""Discovery API routes — search and filter records and features."""

from typing import Any

from dishka.integrations.fastapi import DishkaRoute, FromDishka
from fastapi import APIRouter
from pydantic import BaseModel, Field

from osa.domain.discovery.model.value import (
Filter,
SortOrder,
)
from osa.domain.discovery.query.get_feature_catalog import (
GetFeatureCatalog,
GetFeatureCatalogHandler,
GetFeatureCatalogResult,
)
from osa.domain.discovery.query.search_features import (
SearchFeatures,
SearchFeaturesHandler,
SearchFeaturesResult,
)
from osa.domain.discovery.query.search_records import (
SearchRecords,
SearchRecordsHandler,
SearchRecordsResult,
)

router = APIRouter(
prefix="/discovery",
tags=["discovery"],
route_class=DishkaRoute,
)


# ── Request / Response models ──


class RecordSearchRequest(BaseModel):
filters: list[Filter] = []
q: str | None = None
sort: str = "published_at"
order: SortOrder = SortOrder.DESC
cursor: str | None = None
limit: int = Field(default=20, ge=1, le=100)


class RecordSearchResponse(BaseModel):
results: list[dict[str, Any]]
cursor: str | None
has_more: bool


class FeatureCatalogResponse(BaseModel):
tables: list[dict[str, Any]]


class FeatureSearchRequest(BaseModel):
filters: list[Filter] = []
record_srn: str | None = None
sort: str = "id"
order: SortOrder = SortOrder.DESC
cursor: str | None = None
limit: int = Field(default=50, ge=1, le=100)


class FeatureSearchResponse(BaseModel):
rows: list[dict[str, Any]]
cursor: str | None
has_more: bool


# ── Routes ──


@router.post("/records")
async def search_records(
body: RecordSearchRequest,
handler: FromDishka[SearchRecordsHandler],
) -> RecordSearchResponse:
"""Search and filter published records."""
result: SearchRecordsResult = await handler.run(
SearchRecords(
filters=body.filters,
q=body.q,
sort=body.sort,
order=body.order,
cursor=body.cursor,
limit=body.limit,
)
)
return RecordSearchResponse(
results=result.results,
cursor=result.cursor,
has_more=result.has_more,
)


@router.get("/features")
async def get_feature_catalog(
handler: FromDishka[GetFeatureCatalogHandler],
) -> FeatureCatalogResponse:
"""List available feature tables with column schemas and record counts."""
result: GetFeatureCatalogResult = await handler.run(GetFeatureCatalog())
return FeatureCatalogResponse(tables=result.tables)


@router.post("/features/{hook_name}")
async def search_features(
hook_name: str,
body: FeatureSearchRequest,
handler: FromDishka[SearchFeaturesHandler],
) -> FeatureSearchResponse:
"""Query and filter rows in a specific feature table."""
result: SearchFeaturesResult = await handler.run(
SearchFeatures(
hook_name=hook_name,
filters=body.filters,
record_srn=body.record_srn,
sort=body.sort,
order=body.order,
cursor=body.cursor,
limit=body.limit,
)
)
return FeatureSearchResponse(
rows=result.rows,
cursor=result.cursor,
has_more=result.has_more,
)
1 change: 1 addition & 0 deletions server/osa/application/api/v1/routes/records.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,6 @@ async def get_record(
"deposition_srn": str(result.deposition_srn),
"metadata": result.metadata,
"published_at": result.published_at.isoformat(),
"features": result.features,
}
)
2 changes: 2 additions & 0 deletions server/osa/application/di.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from osa.cli.util.paths import OSAPaths
from osa.config import Config
from osa.domain.auth.util.di import AuthProvider
from osa.domain.discovery.util.di import DiscoveryProvider
from osa.domain.deposition.util.di import DepositionProvider
from osa.domain.feature.util.di import FeatureProvider
from osa.domain.semantics.util.di.provider import SemanticsProvider
Expand Down Expand Up @@ -37,6 +38,7 @@ def create_container() -> AsyncContainer:
ValidationProvider(),
AuthProvider(),
AuthInfraProvider(),
DiscoveryProvider(),
context={Config: config, OSAPaths: paths},
scopes=Scope, # type: ignore[arg-type] # Custom scope class
)
1 change: 1 addition & 0 deletions server/osa/domain/discovery/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Discovery domain — read-only search and filter API for records and features."""
Empty file.
100 changes: 100 additions & 0 deletions server/osa/domain/discovery/model/value.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""Discovery domain value objects — filters, cursors, result types."""

from __future__ import annotations

import base64
import json
from datetime import datetime
from enum import StrEnum
from typing import Any

from pydantic import BaseModel

from osa.domain.semantics.model.value import FieldType
from osa.domain.shared.model.srn import RecordSRN


class FilterOperator(StrEnum):
EQ = "eq"
CONTAINS = "contains"
GTE = "gte"
LTE = "lte"


class SortOrder(StrEnum):
ASC = "asc"
DESC = "desc"


class Filter(BaseModel):
field: str
operator: FilterOperator
value: str | float | bool


VALID_OPERATORS: dict[FieldType, set[FilterOperator]] = {
FieldType.TEXT: {FilterOperator.EQ, FilterOperator.CONTAINS},
FieldType.URL: {FilterOperator.EQ, FilterOperator.CONTAINS},
FieldType.NUMBER: {FilterOperator.EQ, FilterOperator.GTE, FilterOperator.LTE},
FieldType.DATE: {FilterOperator.EQ, FilterOperator.GTE, FilterOperator.LTE},
FieldType.BOOLEAN: {FilterOperator.EQ},
FieldType.TERM: {FilterOperator.EQ},
}


def encode_cursor(sort_value: Any, id_value: Any) -> str:
"""Encode a cursor as base64 JSON."""
payload = {"s": sort_value, "id": id_value}
return base64.urlsafe_b64encode(json.dumps(payload).encode()).decode()


def decode_cursor(cursor: str) -> dict[str, Any]:
"""Decode a base64 JSON cursor. Raises ValueError on malformed input."""
try:
raw = base64.urlsafe_b64decode(cursor.encode())
data = json.loads(raw)
except Exception as exc:
raise ValueError(f"Malformed cursor: {exc}") from exc
if not isinstance(data, dict) or "s" not in data or "id" not in data:
raise ValueError("Cursor must contain 's' and 'id' keys")
return data
Comment on lines +51 to +60

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor type validation missing — non-integer id or s values cause unhandled 500

The decode_cursor() function only validates that keys "s" and "id" are present (line 58) but does not validate their types. A client can Base64-encode arbitrary JSON to produce a cursor with non-integer values, e.g.:

{"s": 1, "id": "not_a_number"}

When this cursor is decoded and used in the keyset predicate, SQLAlchemy attempts to bind the string value to a BigInteger column (ft.c.id), causing PostgreSQL to reject it with invalid input syntax for type bigint. This surfaces as an unhandled 500 instead of a clean 422 validation error.

The same issue applies to the "s" (sort value) key when sorting by a numeric or date column.

Fix: Add type validation after decoding:

def decode_cursor(cursor: str) -> dict[str, Any]:
    """Decode a base64 JSON cursor. Raises ValueError on malformed input."""
    try:
        raw = base64.urlsafe_b64decode(cursor.encode())
        data = json.loads(raw)
    except Exception as exc:
        raise ValueError(f"Malformed cursor: {exc}") from exc
    if not isinstance(data, dict) or "s" not in data or "id" not in data:
        raise ValueError("Cursor must contain 's' and 'id' keys")
    # Validate types
    if not isinstance(data.get("id"), int):
        raise ValueError("Cursor 'id' must be an integer")
    return data

In the feature search handler, also validate that "s" is an integer when sorting by id, or matches the column type when sorting by other columns.



class RecordSummary(BaseModel):
srn: RecordSRN
published_at: datetime
metadata: dict[str, Any]


class RecordSearchResult(BaseModel):
results: list[RecordSummary]
cursor: str | None
has_more: bool


class ColumnInfo(BaseModel):
name: str
type: str
required: bool


class FeatureCatalogEntry(BaseModel):
hook_name: str
columns: list[ColumnInfo]
record_count: int


class FeatureCatalog(BaseModel):
tables: list[FeatureCatalogEntry]


class FeatureRow(BaseModel):
row_id: int
record_srn: RecordSRN
data: dict[str, Any]


class FeatureSearchResult(BaseModel):
rows: list[FeatureRow]
cursor: str | None
has_more: bool
Empty file.
17 changes: 17 additions & 0 deletions server/osa/domain/discovery/port/field_definition_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"""FieldDefinitionReader port — cross-domain read port for schema field lookups."""

from __future__ import annotations

from typing import TYPE_CHECKING, Protocol

if TYPE_CHECKING:
from osa.domain.semantics.model.value import FieldType


class FieldDefinitionReader(Protocol):
async def get_all_field_types(self) -> dict[str, FieldType]:
"""Return global field_name -> FieldType map across all schemas.

Raises ValidationError if same field name has conflicting types across schemas.
"""
...
56 changes: 56 additions & 0 deletions server/osa/domain/discovery/port/read_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""DiscoveryReadStore port — read-only access to records and feature data."""

from __future__ import annotations

from typing import TYPE_CHECKING, Protocol

if TYPE_CHECKING:
from osa.domain.discovery.model.value import (
FeatureCatalogEntry,
FeatureRow,
Filter,
RecordSummary,
SortOrder,
)
from osa.domain.semantics.model.value import FieldType
from osa.domain.shared.model.srn import RecordSRN


class DiscoveryReadStore(Protocol):
async def search_records(
self,
filters: list[Filter],
text_fields: list[str],
q: str | None,
sort: str,
order: SortOrder,
cursor: dict | None,
limit: int,
field_types: dict[str, FieldType] | None = None,
) -> list[RecordSummary]:
"""Search and filter published records."""
...

async def get_feature_catalog(self) -> list[FeatureCatalogEntry]:
"""List all feature tables with column schemas and record counts."""
...

async def get_feature_table_schema(self, hook_name: str) -> FeatureCatalogEntry | None:
"""Look up a single feature table's schema by hook name.

Returns None if the hook_name is not found.
"""
...

async def search_features(
self,
hook_name: str,
filters: list[Filter],
record_srn: RecordSRN | None,
sort: str,
order: SortOrder,
cursor: dict | None,
limit: int,
) -> list[FeatureRow]:
"""Search and filter feature rows."""
...
Empty file.
Loading
Loading