Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.
.PHONY: help install install-uv check-license lint \
test test-integration test-integration-setup test-integration-exec test-integration-cleanup test-integration-rebuild \
test-s3 test-adls test-gcs test-coverage coverage-report test test-notebook\
test-s3 test-adls test-gcs test-coverage coverage-report \
docs-serve docs-build notebook notebook-infra \
clean

Expand All @@ -38,10 +38,12 @@ else
PYTHON_ARG =
endif

# --no-sync so that overlays applied after `make install` (e.g. install-pyarrow-nightly for
# the encryption integration test) aren't reverted by uv re-syncing the lockfile on `uv run`.
ifeq ($(COVERAGE),1)
TEST_RUNNER = uv run $(PYTHON_ARG) python -m coverage run --parallel-mode --source=pyiceberg -m
TEST_RUNNER = uv run --no-sync $(PYTHON_ARG) python -m coverage run --parallel-mode --source=pyiceberg -m
else
TEST_RUNNER = uv run $(PYTHON_ARG) python -m
TEST_RUNNER = uv run --no-sync $(PYTHON_ARG) python -m
endif

ifeq ($(KEEP_COMPOSE),1)
Expand Down Expand Up @@ -112,7 +114,16 @@ test-integration-setup: install ## Start Docker services for integration tests
docker compose -f dev/docker-compose-integration.yml kill
docker compose -f dev/docker-compose-integration.yml rm -f
docker compose -f dev/docker-compose-integration.yml up -d --build --wait
uv run $(PYTHON_ARG) python dev/provision.py
uv run --no-sync $(PYTHON_ARG) python dev/provision.py
$(MAKE) install-pyarrow-nightly

# Parquet Modular Encryption decryption (tests/integration/test_encryption.py) needs the
# pyarrow.parquet.encryption.create_decryption_properties API from apache/arrow#49667. That
# lands in pyarrow 25, which hasn't been released — pull the nightly until it is. Runs AFTER
# provision so that the implicit `uv run` sync during provision.py doesn't revert this overlay.
install-pyarrow-nightly: ## Overlay nightly pyarrow on top of the installed env (for PME)
uv pip install $(PYTHON_ARG) --prerelease=allow --upgrade --force-reinstall \
-i https://pypi.anaconda.org/scientific-python-nightly-wheels/simple pyarrow

test-integration-exec: ## Run integration tests (excluding provision)
$(TEST_RUNNER) pytest tests/ -m integration $(PYTEST_ARGS)
Expand Down Expand Up @@ -150,9 +161,6 @@ coverage-report: ## Combine and report coverage
uv run $(PYTHON_ARG) coverage html
uv run $(PYTHON_ARG) coverage xml

test-notebook: ## Run notebook tests (pyiceberg_example and spark_integration_example) via papermill
$(TEST_RUNNER) pytest tests/notebooks/test_pyiceberg_example.py tests/notebooks/test_spark_integration_example.py -m notebook $(PYTEST_ARGS)

# ================
# Documentation
# ================
Expand Down
8 changes: 8 additions & 0 deletions dev/provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,3 +395,11 @@
)
spark.sql(f"ALTER TABLE {catalog_name}.default.test_empty_scan_ordered_str WRITE ORDERED BY id")
spark.sql(f"INSERT INTO {catalog_name}.default.test_empty_scan_ordered_str VALUES 'a', 'c'")

# Encrypted Hive-cataloged table; read back via PyIceberg in tests/integration/test_encryption.py.
spark.sql("""
CREATE OR REPLACE TABLE hive.default.test_encrypted (id bigint, data string, value float)
USING iceberg
TBLPROPERTIES ('encryption.key-id'='keyA', 'format-version'='3')
""")
spark.sql("INSERT INTO hive.default.test_encrypted VALUES (1, 'alice', 1.0), (2, 'bob', 2.0), (3, 'charlie', 3.0)")
37 changes: 32 additions & 5 deletions dev/spark/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ ARG BASE_IMAGE_SPARK_VERSION=4.0.1
FROM apache/spark:${BASE_IMAGE_SPARK_VERSION}

# Dependency versions - keep these compatible
# Changing these will invalidate the JAR download cache layer
ARG ICEBERG_VERSION=1.10.1
# Changing these will invalidate the JAR download cache layer.
# Iceberg 1.11.0 carries the Hive encryption integration (apache/iceberg#13066) — the prior
# 1.10.x release predates that work and silently no-ops encryption.kms-impl / encryption.key-id.
ARG ICEBERG_VERSION=1.11.0
ARG ICEBERG_SPARK_RUNTIME_VERSION=4.0_2.13
ARG HADOOP_VERSION=3.4.1
ARG HADOOP_VERSION=3.4.3
ARG AWS_SDK_VERSION=2.24.6
ARG MAVEN_MIRROR=https://repo.maven.apache.org/maven2

Expand All @@ -36,14 +38,21 @@ RUN apt-get update -qq && \
mkdir -p /home/iceberg/spark-events && \
chown -R spark:spark /home/iceberg

# Download JARs with retry logic (most cacheable - only changes when versions change)
# This is the slowest step, so we do it before copying config files
# Download JARs with retry logic (most cacheable - only changes when versions change).
# iceberg-core-${ICEBERG_VERSION}-tests.jar ships org.apache.iceberg.encryption.UnitestKMS, a
# fixed-master-key KMS used by the encryption integration test on the Spark write path.
RUN set -e && \
cd "${SPARK_HOME}/jars" && \
# Spark 4.0.1 ships hadoop-client-{api,runtime} 3.4.1; replace them with HADOOP_VERSION so
# hadoop-aws methods like ConfigurationHelper.resolveEnum / S3ABlockOutputStream.builder()
# that iceberg-aws-bundle 1.11.0 expects are present at runtime.
rm -f hadoop-client-api-*.jar hadoop-client-runtime-*.jar && \
for jar_path in \
"org/apache/iceberg/iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}/${ICEBERG_VERSION}/iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}-${ICEBERG_VERSION}.jar" \
"org/apache/iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar" \
"org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar" \
"org/apache/hadoop/hadoop-client-api/${HADOOP_VERSION}/hadoop-client-api-${HADOOP_VERSION}.jar" \
"org/apache/hadoop/hadoop-client-runtime/${HADOOP_VERSION}/hadoop-client-runtime-${HADOOP_VERSION}.jar" \
"software/amazon/awssdk/bundle/${AWS_SDK_VERSION}/bundle-${AWS_SDK_VERSION}.jar"; \
do \
jar_name=$(basename "${jar_path}") && \
Expand All @@ -53,6 +62,24 @@ RUN set -e && \
chown spark:spark "${jar_name}"; \
done

# Pull UnitestKMS + MemoryMockKMS out of iceberg-core's -tests.jar into a slim jar that only
# ships those two classes. The full -tests.jar contains testing stubs for unrelated classes
# (e.g. a stub S3ABlockOutputStream that lacks builder()) which would shadow the real ones in
# hadoop-aws and break Spark's S3A writes; this avoids that classpath collision.
RUN set -e && \
tmp="$(mktemp -d)" && cd "${tmp}" && \
curl -fsSL --retry 3 --retry-delay 5 \
-o iceberg-core-tests.jar \
"${MAVEN_MIRROR}/org/apache/iceberg/iceberg-core/${ICEBERG_VERSION}/iceberg-core-${ICEBERG_VERSION}-tests.jar" && \
/opt/java/openjdk/bin/jar xf iceberg-core-tests.jar \
org/apache/iceberg/encryption/UnitestKMS.class \
org/apache/iceberg/encryption/MemoryMockKMS.class && \
/opt/java/openjdk/bin/jar cf "${SPARK_HOME}/jars/iceberg-core-${ICEBERG_VERSION}-tests-kms-only.jar" \
org/apache/iceberg/encryption/UnitestKMS.class \
org/apache/iceberg/encryption/MemoryMockKMS.class && \
chown spark:spark "${SPARK_HOME}/jars/iceberg-core-${ICEBERG_VERSION}-tests-kms-only.jar" && \
rm -rf "${tmp}"

# Copy configuration last (changes more frequently than JARs)
COPY --chown=spark:spark spark-defaults.conf ${SPARK_HOME}/conf/

Expand Down
5 changes: 5 additions & 0 deletions dev/spark/spark-defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ spark.sql.catalog.hive.io-impl org.apache.iceberg.aws.s3.S3FileIO
spark.sql.catalog.hive.warehouse s3://warehouse/hive/
spark.sql.catalog.hive.s3.endpoint http://minio:9000

# Test-only KMS so Spark can write encrypted Iceberg tables for the encryption integration test.
# UnitestKMS comes from iceberg-core-<version>-tests.jar and uses fixed master keys ("keyA",
# "keyB") that match the InMemoryKms config used on the PyIceberg side.
spark.sql.catalog.hive.encryption.kms-impl org.apache.iceberg.encryption.UnitestKMS

# Configure Spark's default session catalog (spark_catalog) to use Iceberg backed by the Hive Metastore
spark.sql.catalog.spark_catalog org.apache.iceberg.spark.SparkSessionCatalog
spark.sql.catalog.spark_catalog.type hive
Expand Down
17 changes: 17 additions & 0 deletions pyiceberg/encryption/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Iceberg table encryption support."""
82 changes: 82 additions & 0 deletions pyiceberg/encryption/ciphers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""AES-GCM primitives and Iceberg AGS1 stream decryption."""

from __future__ import annotations

import os
import struct

from cryptography.hazmat.primitives.ciphers.aead import AESGCM

NONCE_LENGTH = 12
GCM_TAG_LENGTH = 16


def aes_gcm_encrypt(key: bytes, plaintext: bytes, aad: bytes | None = None) -> bytes:
nonce = os.urandom(NONCE_LENGTH)
return nonce + AESGCM(key).encrypt(nonce, plaintext, aad)


def aes_gcm_decrypt(key: bytes, ciphertext: bytes, aad: bytes | None = None) -> bytes:
if len(ciphertext) < NONCE_LENGTH + GCM_TAG_LENGTH:
raise ValueError(f"Ciphertext too short: {len(ciphertext)} bytes")
return AESGCM(key).decrypt(ciphertext[:NONCE_LENGTH], ciphertext[NONCE_LENGTH:], aad)


GCM_STREAM_MAGIC = b"AGS1"
GCM_STREAM_HEADER_LENGTH = 8 # 4 magic + 4 little-endian block size


def stream_block_aad(aad_prefix: bytes, block_index: int) -> bytes:
return aad_prefix + struct.pack("<I", block_index)


def decrypt_ags1_stream(key: bytes, encrypted_data: bytes, aad_prefix: bytes) -> bytes:
"""Decrypt an Iceberg AGS1 stream.

Layout: "AGS1" (4) | plain_block_size LE (4) | one or more {nonce(12) | cipher | tag(16)} blocks.
Each block's AAD is `aad_prefix || block_index_le32`.
"""
if len(encrypted_data) < GCM_STREAM_HEADER_LENGTH:
raise ValueError(f"AGS1 stream too short: {len(encrypted_data)} bytes")
if encrypted_data[:4] != GCM_STREAM_MAGIC:
raise ValueError(f"Invalid AGS1 magic: {encrypted_data[:4]!r}")

plain_block_size = struct.unpack_from("<I", encrypted_data, 4)[0]
cipher_block_size = plain_block_size + NONCE_LENGTH + GCM_TAG_LENGTH
stream_data = encrypted_data[GCM_STREAM_HEADER_LENGTH:]
if not stream_data:
return b""

aesgcm = AESGCM(key)
result = bytearray()
offset = 0
block_index = 0
while offset < len(stream_data):
block_cipher_size = min(cipher_block_size, len(stream_data) - offset)
if block_cipher_size < NONCE_LENGTH + GCM_TAG_LENGTH:
raise ValueError(f"Truncated AGS1 block at offset {offset}: {block_cipher_size} bytes")

block = stream_data[offset : offset + block_cipher_size]
result.extend(
aesgcm.decrypt(block[:NONCE_LENGTH], block[NONCE_LENGTH:], stream_block_aad(aad_prefix, block_index))
)
offset += block_cipher_size
block_index += 1

return bytes(result)
69 changes: 69 additions & 0 deletions pyiceberg/encryption/io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""In-memory InputFile/InputStream used to wrap decrypted Avro buffers for AvroFile."""

from __future__ import annotations

import io
from types import TracebackType

from pyiceberg.io import InputFile, InputStream


class BytesInputStream(InputStream):
def __init__(self, data: bytes) -> None:
self._buffer = io.BytesIO(data)

def read(self, size: int = 0) -> bytes:
if size <= 0:
return self._buffer.read()
return self._buffer.read(size)

def seek(self, offset: int, whence: int = 0) -> int:
return self._buffer.seek(offset, whence)

def tell(self) -> int:
return self._buffer.tell()

def close(self) -> None:
self._buffer.close()

def __enter__(self) -> BytesInputStream:
return self

def __exit__(
self,
exctype: type[BaseException] | None,
excinst: BaseException | None,
exctb: TracebackType | None,
) -> None:
self.close()


class BytesInputFile(InputFile):
def __init__(self, location: str, data: bytes) -> None:
super().__init__(location)
self._data = data

def __len__(self) -> int:
return len(self._data)

def exists(self) -> bool:
return True

def open(self, seekable: bool = True) -> InputStream:
return BytesInputStream(self._data)
Loading
Loading