Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .flake8

This file was deleted.

19 changes: 7 additions & 12 deletions .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,20 @@ jobs:
steps:
- uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"

- name: Install Poetry
run: pipx install poetry
- name: Set up uv
uses: astral-sh/setup-uv@v5

- name: Install dependencies
run: poetry install --with dev --extras all
run: uv sync --extra all --group dev

- name: Lint
run: poetry run flake8 .
run: uv run ruff check .

- name: Typecheck
run: poetry run mypy .
run: uv run ty check

- name: Format
run: poetry run black .
run: uv run ruff format --check .

- name: Test
run: poetry run pytest -n auto
run: uv run pytest -n auto
14 changes: 3 additions & 11 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,11 @@ jobs:
steps:
- uses: actions/checkout@v4

- name: Set up Python 3.12
uses: actions/setup-python@v5
with:
python-version: "3.12"

- name: Install Poetry
uses: snok/install-poetry@v1
with:
virtualenvs-create: true
virtualenvs-in-project: true
- name: Set up uv
uses: astral-sh/setup-uv@v5

- name: Build package
run: poetry build
run: uv build

- name: Publish to PyPI
uses: pypa/gh-action-pypi-publish@release/v1
54 changes: 31 additions & 23 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,48 @@
#
# SPDX-License-Identifier: Apache-2.0

.PHONY: install env test test-coverage lint typecheck format build publish lock
.PHONY: install test test-coverage lint lint-fix typecheck format build publish lock check bump help

install:
poetry install --with dev --extras all
help: ## Display this help
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}'

env:
poetry env use python3.12
install: ## Install all dependencies including extras and dev group
uv sync --extra all --group dev

build:
poetry build
build: ## Build the package
uv build

publish:
poetry publish
publish: ## Publish the package to PyPI
uv publish

test:
poetry run pytest -n auto
test: ## Run tests in parallel
uv run pytest -n auto

test-coverage:
poetry run pytest -n auto --cov=. --cov-report=term
test-coverage: ## Run tests with coverage report
uv run pytest -n auto --cov=. --cov-report=term

lint:
poetry run flake8 .
lint: ## Run ruff linter
uv run ruff check .

typecheck:
poetry run mypy .
lint-fix: ## Run ruff linter and apply fixes
uv run ruff check --fix .

format:
poetry run black .
typecheck: ## Run type checker
uv run ty check

lock:
poetry lock
format: ## Format code with ruff
uv run ruff format .

license:
poetry run reuse annotate \
bump: ## Bump version: make bump part=patch|minor|major
uv version --bump $(part)

lock: ## Update the lockfile
uv lock

check: lint typecheck format test ## Run all checks (lint, typecheck, format, test)

license: ## Annotate files with REUSE license headers
uv run reuse annotate \
--license Apache-2.0 \
--copyright "Greg Brandt <brandt.greg@gmail.com>" \
--skip-unrecognized \
Expand Down
6 changes: 3 additions & 3 deletions avrokit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,23 +81,22 @@

__version__ = version("avrokit")

from .url import URL, FileURL, parse_url, create_url_mapping, flatten_urls
from .asyncio import BlockingQueueAvroReader, DeferredAvroWriter
from .io import (
Appendable,
PartitionedAvroReader,
PartitionedAvroWriter,
TimePartitionedAvroWriter,
add_avro_schema_fields,
avro_reader,
avro_records,
avro_schema,
avro_writer,
avro_records,
compact_avro_data,
read_avro_schema,
read_avro_schema_from_first_nonempty_file,
validate_avro_schema_evolution,
)
from .asyncio import DeferredAvroWriter, BlockingQueueAvroReader
from .tools import (
CatTool,
ConcatTool,
Expand All @@ -112,6 +111,7 @@
ToJsonTool,
ToParquetTool,
)
from .url import URL, FileURL, create_url_mapping, flatten_urls, parse_url

__all__ = [
"Appendable",
Expand Down
2 changes: 1 addition & 1 deletion avrokit/asyncio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
#
# SPDX-License-Identifier: Apache-2.0

from .writer import DeferredAvroWriter
from .reader import BlockingQueueAvroReader
from .writer import DeferredAvroWriter

__all__ = [
"DeferredAvroWriter",
Expand Down
6 changes: 3 additions & 3 deletions avrokit/asyncio/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
#
# SPDX-License-Identifier: Apache-2.0

import threading
import logging
import queue
from typing import Iterable
import threading
from typing import Any

logger = logging.getLogger(__name__)

Expand All @@ -15,7 +15,7 @@ class BlockingQueueAvroReader:
Reads
"""

def __init__(self, data: Iterable[object], daemon: bool = True) -> None:
def __init__(self, data: Any, daemon: bool = True) -> None:
self.data = data
self._reader_queue: queue.Queue = queue.Queue()
self._reader_thread_done = threading.Event()
Expand Down
3 changes: 2 additions & 1 deletion avrokit/asyncio/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
#
# SPDX-License-Identifier: Apache-2.0

import threading
import logging
import queue
import threading

from avrokit.io.writer import Appendable

logger = logging.getLogger(__name__)
Expand Down
6 changes: 3 additions & 3 deletions avrokit/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@
#
# SPDX-License-Identifier: Apache-2.0

from .compact import compact_avro_data
from .reader import PartitionedAvroReader, avro_reader, avro_records
from .schema import (
add_avro_schema_fields,
avro_schema,
read_avro_schema,
read_avro_schema_from_first_nonempty_file,
validate_avro_schema_evolution,
)
from .reader import avro_reader, PartitionedAvroReader, avro_records
from .writer import (
avro_writer,
Appendable,
PartitionedAvroWriter,
TimePartitionedAvroWriter,
avro_writer,
)
from .compact import compact_avro_data

__all__ = [
"Appendable",
Expand Down
8 changes: 5 additions & 3 deletions avrokit/io/compact.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@
#
# SPDX-License-Identifier: Apache-2.0

from collections.abc import Sequence

from avrokit.io.reader import avro_reader
from avrokit.url.utils import flatten_urls

from ..url import URL
from .writer import avro_writer
from .schema import read_avro_schema_from_first_nonempty_file
from typing import Union, Sequence
from .writer import avro_writer


def compact_avro_data(
src: Union[URL, Sequence[URL]],
src: URL | Sequence[URL],
dst: URL,
expand_src: bool = True,
) -> None:
Expand Down
47 changes: 35 additions & 12 deletions avrokit/io/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,56 @@
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations
from ..url import URL

from collections.abc import Generator, Iterator, Sequence
from contextlib import contextmanager
from typing import IO, Any, Self, cast

from avro.datafile import DataFileReader
from avro.io import DatumReader
from contextlib import contextmanager
from typing import Generator, Iterator, Sequence, IO, Any, Self, Union, cast

from ..url import URL


@contextmanager
def avro_reader(url: URL) -> Generator[DataFileReader, None, None]:
def avro_reader(url: URL) -> Generator[TypedDataFileReader, None, None]:
"""
Opens an Avro DataFileReader for the given URL.

:param url: The URL of the Avro file to read.
:return: A DataFileReader object.
:return: A TypedDataFileReader object.
"""
with url as f, DataFileReader(f, DatumReader()) as reader:
yield reader
yield TypedDataFileReader(reader)


class TypedDataFileReader:
"""Wraps DataFileReader to provide typed iteration over records."""

def __init__(self, reader: DataFileReader) -> None:
self._reader = reader

def tell(self) -> int:
"""Returns the current position in the underlying file stream."""
return self._reader._reader.tell() # type: ignore[union-attr]

def __getattr__(self, name: str) -> Any:
return getattr(self._reader, name)

def __iter__(self) -> Iterator[dict[str, Any]]:
return self

def __next__(self) -> dict[str, Any]:
return cast(dict[str, Any], next(self._reader))


def avro_records(url: URL) -> Generator[dict[str, Any], None, None]:
with avro_reader(url) as reader:
for record in reader:
yield cast(dict[str, Any], record)
yield from reader


class PartitionedAvroReader:
def __init__(self, urls: Union[URL, Sequence[URL]]):
def __init__(self, urls: URL | Sequence[URL]):
self.urls = [urls] if isinstance(urls, URL) else urls
self.expanded_urls: list[URL] = []
self.current_index = 0
Expand Down Expand Up @@ -70,15 +93,15 @@ def __enter__(self) -> Self:
def __exit__(self, exc_type, exc_value, traceback):
self.close()

def __iter__(self) -> Iterator[object]:
def __iter__(self) -> Iterator[dict[str, Any]]:
return self

def __next__(self) -> object:
def __next__(self) -> dict[str, Any]:
while True:
try:
if not self.current_reader:
raise StopIteration
return next(self.current_reader)
return cast(dict[str, Any], next(self.current_reader))
except StopIteration:
self.current_index += 1
self._open_reader()
13 changes: 8 additions & 5 deletions avrokit/io/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
# SPDX-License-Identifier: Apache-2.0

import json
from typing import Sequence, Union, Any
from avro.schema import Field, RecordSchema, Schema, UnionSchema, parse, EnumSchema
from avro.io import DatumReader
from collections.abc import Sequence
from typing import Any, cast

from avro.datafile import DataFileReader
from avro.io import DatumReader
from avro.schema import EnumSchema, Field, RecordSchema, Schema, UnionSchema, parse

from ..url import URL


Expand All @@ -28,7 +31,7 @@ def read_avro_schema_from_first_nonempty_file(urls: Sequence[URL]) -> Schema | N
return None


def avro_schema(schema: Union[str, dict]) -> Schema:
def avro_schema(schema: str | dict) -> Schema:
"""
Converts a dictionary schema to an Avro Schema object.

Expand All @@ -48,7 +51,7 @@ def add_avro_schema_fields(schema: Schema, fields: Sequence[dict[str, Any]]) ->
schema_dict = schema.to_json()
if not isinstance(schema_dict, dict):
raise ValueError("Schema is not a valid Avro record schema.")
schema_dict["fields"].extend(fields)
cast(dict[str, Any], schema_dict)["fields"].extend(fields)
return avro_schema(schema_dict)


Expand Down
Loading
Loading