Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -894,6 +894,36 @@ def remove_tag(self, tag_name: str) -> ManageSnapshots:
"""
return self._remove_ref_snapshot(ref_name=tag_name)

def replace_tag(self, tag_name: str, snapshot_id: int) -> ManageSnapshots:
"""
Replace the tag with the given name to point to the specified snapshot.

Args:
tag_name (str): Tag to replace
snapshot_id (int): new snapshot id for the given tag
Returns:
This for method chaining
"""
self._commit_if_ref_updates_exist()

refs = self._transaction.table_metadata.refs
if tag_name not in refs:
raise ValueError(f"Tag does not exist: {tag_name}")

ref = refs[tag_name]
if ref.snapshot_ref_type != SnapshotRefType.TAG:
raise ValueError(f"Ref {tag_name} is not a tag")

update, requirement = self._transaction._set_ref_snapshot(
snapshot_id=snapshot_id,
ref_name=tag_name,
type=SnapshotRefType.TAG,
max_ref_age_ms=ref.max_ref_age_ms,
)
self._updates += update
self._requirements += requirement
return self

def create_branch(
self,
snapshot_id: int,
Expand Down
51 changes: 50 additions & 1 deletion tests/integration/test_snapshot_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from pyiceberg.catalog import Catalog
from pyiceberg.table import Table
from pyiceberg.table.refs import SnapshotRef
from pyiceberg.table.refs import SnapshotRef, SnapshotRefType


@pytest.fixture
Expand Down Expand Up @@ -107,6 +107,55 @@ def test_remove_branch(catalog: Catalog) -> None:
assert tbl.metadata.refs.get(branch_name, None) is None


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [lf("session_catalog_hive"), lf("session_catalog")])
def test_replace_tag(catalog: Catalog) -> None:
identifier = "default.test_table_snapshot_operations"
tbl = catalog.load_table(identifier)
assert len(tbl.history()) > 2

current_snapshot_id = tbl.history()[-1].snapshot_id
older_snapshot_id = tbl.history()[-2].snapshot_id

tag_name = "my-tag"
tbl.manage_snapshots().create_tag(older_snapshot_id, tag_name, 1).commit()
tag = tbl.metadata.refs.get(tag_name)
assert tag is not None
assert tag.snapshot_id == older_snapshot_id
assert tag.snapshot_ref_type == SnapshotRefType.TAG
assert tag.max_ref_age_ms == 1

tbl.manage_snapshots().replace_tag(tag_name=tag_name, snapshot_id=current_snapshot_id).commit()

tag = tbl.metadata.refs.get(tag_name)
assert tag is not None
assert tag.snapshot_id == current_snapshot_id
assert tag.snapshot_ref_type == SnapshotRefType.TAG
assert tag.max_ref_age_ms == 1


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [lf("session_catalog_hive"), lf("session_catalog")])
def test_replace_missing_tag(catalog: Catalog) -> None:
identifier = "default.test_table_snapshot_operations"
tbl = catalog.load_table(identifier)
snapshot_id = tbl.history()[-1].snapshot_id

with pytest.raises(ValueError, match="Tag does not exist: test"):
tbl.manage_snapshots().replace_tag(tag_name="test", snapshot_id=snapshot_id).commit()


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [lf("session_catalog_hive"), lf("session_catalog")])
def test_replace_tag_with_branch(catalog: Catalog) -> None:
identifier = "default.test_table_snapshot_operations"
tbl = catalog.load_table(identifier)
snapshot_id = tbl.history()[-1].snapshot_id

with pytest.raises(ValueError, match="Ref main is not a tag"):
tbl.manage_snapshots().replace_tag(tag_name="main", snapshot_id=snapshot_id).commit()


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [lf("session_catalog_hive"), lf("session_catalog")])
def test_set_current_snapshot(catalog: Catalog) -> None:
Expand Down