Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions profasta/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class ProteinDatabase:

db: dict[str, AbstractDatabaseEntry]
added_fasta_files: list[str]
skipped_fasta_entries: dict[str, list]
skipped_fasta_entries: dict[str, list[str]]

def __init__(self):
self.db = {}
Expand Down Expand Up @@ -128,10 +128,11 @@ def add_fasta(
if skipped_entry_headers:
num_skipped = len(skipped_entry_headers)
num_total = num_skipped + len(parsed_protein_entries)
skipped_list = "\n ".join(skipped_entry_headers)
logger.warning(
f"Skipped {num_skipped}/{num_total} entries while adding "
f"'{fasta_name}' to a ProteinDatabase because their headers could not "
f"be parsed:"
f"be parsed:\n {skipped_list}"
)

def add_entry(self, protein_entry: AbstractDatabaseEntry, overwrite: bool = False):
Expand All @@ -152,7 +153,7 @@ def add_entry(self, protein_entry: AbstractDatabaseEntry, overwrite: bool = Fals

def write_fasta(
self,
path,
path: str,
append: bool = False,
header_writer: Optional[str] = None,
line_width: int = 60,
Expand Down
24 changes: 15 additions & 9 deletions profasta/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
FastaRecord: Representation of a protein record in a FASTA file.

Functions:
parse_fasta: Parse a FASTA file object and return a list of FastaRecords.
parse_fasta: Parse a FASTA file object and yield FastaRecords.
write_fasta: Write a list of FastaRecords to a file object.
"""

Expand Down Expand Up @@ -55,14 +55,20 @@ def parse_fasta(file_object: IO[str]) -> Generator[FastaRecord, None, None]:
Yields:
Yields FastaRecords.
"""
fasta_text = file_object.read()
if not fasta_text.startswith("\n"):
fasta_text = "\n" + fasta_text

for block in fasta_text.split("\n>")[1:]:
lines = block.split("\n")
header = lines[0].strip()
sequence = "".join(lines[1:]).replace(" ", "").rstrip("*").upper()
header = None
sequence_lines: list[str] = []
for line in file_object:
line = line.rstrip("\n")
if line.startswith(">"):
if header is not None:
sequence = "".join(sequence_lines).replace(" ", "").rstrip("*").upper()
yield FastaRecord(header, sequence)
header = line[1:].strip()
sequence_lines = []
else:
sequence_lines.append(line)
if header is not None:
sequence = "".join(sequence_lines).replace(" ", "").rstrip("*").upper()
yield FastaRecord(header, sequence)


Expand Down
22 changes: 11 additions & 11 deletions profasta/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
UniprotParser: Parser for Uniprot FASTA headers.
UniprotLikeParser: Parser for less strict Uniprot like FASTA headers.
DefaultWriter: Default FASTA header writer.
UniprotWriter: Parser for Uniprot FASTA writer.
UniprotLikeWriter: Parser for less strict Uniprot like FASTA writer.
UniprotWriter: Writer for Uniprot FASTA headers.
UniprotLikeWriter: Writer for less strict Uniprot like FASTA headers.

Functions:
register_parser: Register a custom FASTA header parser by name.
Expand Down Expand Up @@ -57,7 +57,7 @@ class AbstractHeaderParser(Protocol):
"""Abstract header parser."""

@classmethod
def parse(self, header: str) -> AbstractParsedHeader:
def parse(cls, header: str) -> AbstractParsedHeader:
"""Parse a FASTA header string into a ParsedHeader object.

Raises:
Expand All @@ -70,7 +70,7 @@ class AbstractHeaderWriter(Protocol):
"""Abstract header writer."""

@classmethod
def write(self, parsed_header: AbstractParsedHeader) -> str:
def write(cls, parsed_header: AbstractParsedHeader) -> str:
"""Write a FASTA header string from a ParsedHeader object."""
...

Expand All @@ -95,9 +95,9 @@ class DefaultParser:

The `parse` method returns a ParsedHeader object with the identifier being the
first whitespace-separated word of the header. The rest of the header is stored
in the "description" field of the `header_fields` dictionary, which might be an
empty string. This parser is guaranteed to work for any FASTA header string and
never fail.
in the "description" field of the `header_fields` dictionary. If there is no
description, the "description" key is absent from `header_fields`. This parser
is guaranteed to work for any FASTA header string and never fail.
"""

@classmethod
Expand Down Expand Up @@ -315,14 +315,14 @@ def get_parser(parser_name: str) -> AbstractHeaderParser:
return PARSER_REGISTRY[parser_name]


def register_writer(name: str, parser: AbstractHeaderWriter):
def register_writer(name: str, writer: AbstractHeaderWriter):
"""Register a custom writer by name."""
WRITER_REGISTRY[name] = parser
WRITER_REGISTRY[name] = writer


def get_writer(parser_name: str) -> AbstractHeaderWriter:
def get_writer(writer_name: str) -> AbstractHeaderWriter:
"""Get a registered writer by name."""
return WRITER_REGISTRY[parser_name]
return WRITER_REGISTRY[writer_name]


PARSER_REGISTRY: dict[str, AbstractHeaderParser] = {
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def parse(cls, header):

class CustomWriter:
@classmethod
def write(self, parsed_header):
def write(cls, parsed_header):
return parsed_header.header


Expand Down
Loading