Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ data class PreprocessingAnnotationSource(
enum class PreprocessingAnnotationSourceType {
Metadata,
NucleotideSequence,
SubmittedFile,
}

data class GetSequenceResponse(
Expand Down
7 changes: 7 additions & 0 deletions kubernetes/loculus/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ defaultOrganismConfig: &defaultOrganismConfig
submissionDataTypes: &defaultSubmissionDataTypes
consensusSequences: true
maxSequencesPerEntry: 1
files:
enabled: true
categories:
- name: raw_reads
displayName: Raw reads
loadSequencesAutomatically: true
earliestReleaseDate:
enabled: true
Expand All @@ -89,6 +94,8 @@ defaultOrganismConfig: &defaultOrganismConfig
files:
- name: annotations
displayName: Annotations
- name: raw_reads
displayName: Raw reads
### Field list
## General fields
# name: Key used across app to refer to this field (required)
Expand Down
4 changes: 2 additions & 2 deletions preprocessing/dummy/main.py

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file caught some stray updates from running the formatter

Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def parse_ndjson(ndjson_data: str) -> list[Sequence]:


def process(unprocessed: list[Sequence]) -> list[Sequence]:
with open("mock-sequences.json", "r") as f:
with open("mock-sequences.json") as f:
mock_sequences = json.load(f)
possible_lineages = ["A.1", "A.1.1", "A.2"]

Expand Down Expand Up @@ -160,7 +160,7 @@ def process(unprocessed: list[Sequence]) -> list[Sequence]:
"nucleotideInsertions": {},
"aminoAcidInsertions": {},
}

if not disableConsensusSequences:
data = {**data, **mock_sequences}
data["sequenceNameToFastaId"] = {"main": submissionId}
Expand Down
11 changes: 11 additions & 0 deletions preprocessing/nextclade/src/loculus_preprocessing/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from .config import Config
from .datatypes import (
FileIdAndName,
FileUploadInfo,
ProcessedEntry,
UnprocessedData,
Expand Down Expand Up @@ -93,6 +94,15 @@ def parse_ndjson(ndjson_data: str) -> Sequence[UnprocessedEntry]:
key: trim_ns(value) if value else None
for key, value in unaligned_nucleotide_sequences.items()
}
submitted_files = json_object["data"].get("files")
file_mapping = (
{
category: [FileIdAndName(fileId=f["fileId"], name=f["name"]) for f in files]
for category, files in submitted_files.items()
}
if submitted_files
else None
)
unprocessed_data = UnprocessedData(
submitter=json_object["submitter"],
group_id=json_object["groupId"],
Expand All @@ -102,6 +112,7 @@ def parse_ndjson(ndjson_data: str) -> Sequence[UnprocessedEntry]:
unalignedNucleotideSequences=trimmed_unaligned_nucleotide_sequences
if unaligned_nucleotide_sequences
else {},
files=file_mapping,
)
entry = UnprocessedEntry(
accessionVersion=f"{json_object['accession']}.{json_object['version']}",
Expand Down
23 changes: 16 additions & 7 deletions preprocessing/nextclade/src/loculus_preprocessing/datatypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,17 @@
ProcessingAnnotationAlignment: Final = "alignment"


@unique
class FileCategory(StrEnum):
RAW_READS = "raw_reads"
ANNOTATIONS = "annotations"


@unique
class AnnotationSourceType(StrEnum):
METADATA = "Metadata"
NUCLEOTIDE_SEQUENCE = "NucleotideSequence"
SUBMITTED_FILE = "SubmittedFile"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this name is a bit confusing as it implies this is a file submitted by the user but annotations are created by the prepro pipeline, I think just "File" is ok



@dataclass(frozen=True)
Expand Down Expand Up @@ -74,6 +81,12 @@ def from_single(cls, name: str, type, message: str):
return cls.from_fields([name], [name], type, message)


@dataclass
class FileIdAndName:
fileId: str # noqa: N815
name: str


@dataclass
class UnprocessedData:
submitter: str
Expand All @@ -82,6 +95,7 @@ class UnprocessedData:
submissionId: str # noqa: N815
metadata: InputMetadata
unalignedNucleotideSequences: dict[SequenceName, NucleotideSequence | None] # noqa: N815
files: dict[FileCategory, list[FileIdAndName]] | None


@dataclass
Expand All @@ -97,6 +111,7 @@ class UnprocessedEntry:
@dataclass
class UnprocessedAfterNextclade:
inputMetadata: InputMetadata # noqa: N815
files: dict[FileCategory, list[FileIdAndName]] | None
# Derived metadata produced by Nextclade
nextcladeMetadata: dict[SequenceName, Any] | None # noqa: N815
unalignedNucleotideSequences: dict[SequenceName, NucleotideSequence | None] # noqa: N815
Expand All @@ -109,22 +124,16 @@ class UnprocessedAfterNextclade:
warnings: list[ProcessingAnnotation]


@dataclass
class FileIdAndName:
fileId: str # noqa: N815
name: str


@dataclass
class ProcessedData:
metadata: ProcessedMetadata
files: dict[FileCategory, list[FileIdAndName]] | None
unalignedNucleotideSequences: dict[SequenceName, Any] # noqa: N815
alignedNucleotideSequences: dict[SequenceName, Any] # noqa: N815
nucleotideInsertions: dict[SequenceName, Any] # noqa: N815
alignedAminoAcidSequences: dict[GeneName, Any] # noqa: N815
aminoAcidInsertions: dict[GeneName, Any] # noqa: N815
sequenceNameToFastaId: dict[SequenceName, FastaId] # noqa: N815
files: dict[str, list[FileIdAndName]] | None = None


@dataclass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
AminoAcidSequence,
AnnotationSourceType,
FastaId,
FileCategory,
FileIdAndName,
GeneName,
GenericSequence,
NucleotideInsertion,
Expand Down Expand Up @@ -801,6 +803,9 @@ def enrich_with_nextclade( # noqa: PLR0914
}
for entry in unprocessed
}
input_files: dict[AccessionVersion, dict[FileCategory, list[FileIdAndName]] | None] = {
entry.accessionVersion: entry.data.files for entry in unprocessed
}

batch = assign_segment_for_alignment(unprocessed, config=config, dataset_dir=dataset_dir)
unaligned_nucleotide_sequences = batch.unalignedNucleotideSequences
Expand Down Expand Up @@ -893,6 +898,7 @@ def enrich_with_nextclade( # noqa: PLR0914
return {
id: UnprocessedAfterNextclade(
inputMetadata=input_metadata[id],
files=input_files[id],
nextcladeMetadata=nextclade_metadata[id],
unalignedNucleotideSequences=unaligned_nucleotide_sequences[id],
alignedNucleotideSequences=aligned_nucleotide_sequences[id],
Expand Down
49 changes: 43 additions & 6 deletions preprocessing/nextclade/src/loculus_preprocessing/prepro.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
AminoAcidSequence,
AnnotationSource,
AnnotationSourceType,
FileCategory,
FileIdAndName,
GeneName,
InputData,
Expand Down Expand Up @@ -60,6 +61,7 @@
process_mutations_from_clade_founder,
process_phenotype_values,
process_stop_codons,
validate_raw_reads_submission,
)
from .sequence_checks import error_on_excess_sequences, errors_if_non_iupac

Expand Down Expand Up @@ -296,6 +298,7 @@ def processed_entry_no_alignment( # noqa: PLR0913, PLR0917
version=version_from_str(accession_version),
data=ProcessedData(
metadata=output_metadata,
files=unprocessed.files,
unalignedNucleotideSequences=unprocessed.unalignedNucleotideSequences,
alignedNucleotideSequences=aligned_nucleotide_sequences,
nucleotideInsertions=nucleotide_insertions,
Expand Down Expand Up @@ -435,6 +438,27 @@ def get_output_metadata(
return output_metadata, errors, warnings


def process_submitted_files(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe worth moving the file related functions (this one and validate raw reads) to a new file to not grow this 2k line monstrosity even further

file_mapping: dict[FileCategory, list[FileIdAndName]],
) -> tuple[list[ProcessingAnnotation], list[ProcessingAnnotation]]:
errors: list[ProcessingAnnotation] = []
warnings: list[ProcessingAnnotation] = []

for category, files in file_mapping.items():
if not files:
# Backend always includes a key with empty list for enabled categories
continue
match category:
case FileCategory.RAW_READS:
rr_errors, rr_warnings = validate_raw_reads_submission(files)
errors.extend(rr_errors)
warnings.extend(rr_warnings)
case _:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FileCategory.ANNOTATIONS is currently in the enum (added for internal use by upload_flatfiles), so if the backend ever sends annotations as an input category, this branch would log a warning. Worth noting for future maintainers that ANNOTATIONS is preprocessing-generated output, not a user-submittable input category — so it should never appear in unprocessed.files.

@maverbiest maverbiest Jun 29, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes currently when we add a new input file category, this match statement will need to be updated to handle it. That is as intended (see discussion of alternative in the PR description)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if this should emit a hard exception, as (if my understanding is correct) a file category could be added to the values.yaml, and so will be accepted in the backend, but then if its missing in this match statement, it would pass through without validation?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess a hard exception would prevent other categories being allowed for other users of loculus which might not be a good thing

logger.warning(f"Submitted file is of unexpected category {category}")

return errors, warnings


def alignment_errors_warnings(
unprocessed: UnprocessedAfterNextclade,
config: Config,
Expand Down Expand Up @@ -529,11 +553,14 @@ def process_single(
accession_version, unprocessed, config
)

file_errors, file_warnings = process_submitted_files(unprocessed.files or {})

processed_entry = ProcessedEntry(
accession=accession_from_str(accession_version),
version=version_from_str(accession_version),
data=ProcessedData(
metadata=output_metadata,
files=unprocessed.files,
unalignedNucleotideSequences=unprocessed.unalignedNucleotideSequences,
alignedNucleotideSequences=unprocessed.alignedNucleotideSequences,
nucleotideInsertions=unprocessed.nucleotideInsertions,
Expand All @@ -548,9 +575,12 @@ def process_single(
+ max_seq_errors
+ alignment_errors
+ metadata_errors
+ file_errors
)
),
warnings=list(set(unprocessed.warnings + alignment_warnings + metadata_warnings)),
warnings=list(
set(unprocessed.warnings + alignment_warnings + metadata_warnings + file_warnings)
),
)

return SubmissionData(
Expand Down Expand Up @@ -578,12 +608,16 @@ def process_single_unaligned(
accession_version, unprocessed, config
)

file_errors, file_warnings = process_submitted_files(unprocessed.files or {})

return processed_entry_no_alignment(
accession_version=accession_version,
unprocessed=unprocessed,
output_metadata=output_metadata,
errors=list(set(iupac_errors + metadata_errors + segment_assignment.alert.errors)),
warnings=list(set(metadata_warnings)),
errors=list(
set(iupac_errors + metadata_errors + segment_assignment.alert.errors + file_errors)
),
warnings=list(set(metadata_warnings + file_warnings)),
sequenceNameToFastaId=segment_assignment.sequenceNameToFastaId,
)

Expand All @@ -595,6 +629,7 @@ def processed_entry_with_errors(id) -> SubmissionData:
version=version_from_str(id),
data=ProcessedData(
metadata=dict[str, ProcessedMetadataValue](),
files=None,
unalignedNucleotideSequences=defaultdict(dict[str, Any]),
alignedNucleotideSequences=defaultdict(dict[str, Any]),
nucleotideInsertions=defaultdict(dict[str, Any]),
Expand Down Expand Up @@ -660,9 +695,11 @@ def upload_flatfiles(processed: Sequence[SubmissionData], config: Config) -> Non
file_id = upload_info.fileId
url = upload_info.url
upload_embl_file_to_presigned_url(file_content, url)
submission_data.processed_entry.data.files = {
"annotations": [FileIdAndName(fileId=file_id, name=file_name)]
}
processed_files = submission_data.processed_entry.data.files or {}
processed_files.setdefault(FileCategory.ANNOTATIONS, []).append(
FileIdAndName(fileId=file_id, name=file_name)
)
submission_data.processed_entry.data.files = processed_files
except Exception as e:
logger.error("Error creating or uploading EMBL file: %s", e)
submission_data.processed_entry.errors.append(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from .datatypes import (
AnnotationSource,
AnnotationSourceType,
FileIdAndName,
FunctionArgs,
InputData,
InputMetadata,
Expand Down Expand Up @@ -1948,6 +1949,39 @@ def single_metadata_annotation(
]


def validate_raw_reads_submission(
files: list[FileIdAndName],
) -> tuple[list[ProcessingAnnotation], list[ProcessingAnnotation]]:
errors: list[ProcessingAnnotation] = []
warnings: list[ProcessingAnnotation] = []

if len(files) > 2: # noqa: PLR2004
message = f"Raw reads must be submitted as one or two files, got {len(files)}"
Comment thread
maverbiest marked this conversation as resolved.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a message for submitters so I would actually state we want to have paired-end or single-end raw reads, and thus accept a max of 2 files.

errors.append(
ProcessingAnnotation.from_fields(
input_fields=[f.name for f in files],
output_fields=[f.name for f in files],
type=AnnotationSourceType.SUBMITTED_FILE,
message=message,
)
)

allowed_extensions = [".fastq", ".fastq.gz", ".fq", ".fq.gz"]
for file in files:
if not any(file.name.endswith(extension) for extension in allowed_extensions):
Comment thread
maverbiest marked this conversation as resolved.
message = (
f"Raw reads file '{file.name}' has unrecognized extension."
f" Allowed extensions: {', '.join(allowed_extensions)}"
)
errors.append(
ProcessingAnnotation.from_single(
name=file.name, type=AnnotationSourceType.SUBMITTED_FILE, message=message
)
)

return errors, warnings


def process_frameshifts(input: str | None) -> InputData:
"""Converts frameshift string to InputData for processing"""
try:
Expand Down
Loading
Loading