Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,37 @@ def remove_branch(self, branch_name: str) -> ManageSnapshots:
"""
return self._remove_ref_snapshot(ref_name=branch_name)

def rename_branch(self, name: str, new_name: str) -> ManageSnapshots:
"""
Rename a branch.

Args:
name (str): name of branch to rename
new_name (str): the desired new name of the branch
Returns:
This for method chaining
"""
self._commit_if_ref_updates_exist()

if name == MAIN_BRANCH:
raise ValueError("Cannot rename main branch")

refs = self._transaction.table_metadata.refs
if name not in refs:
raise ValueError(f"Branch does not exist: {name}")

ref = refs[name]
if ref.snapshot_ref_type != SnapshotRefType.BRANCH:
raise ValueError(f"Ref {name} is not a branch")

if new_name in refs:
raise ValueError(f"Ref {new_name} already exists")

self.create_branch(ref.snapshot_id, new_name, ref.max_ref_age_ms, ref.max_snapshot_age_ms, ref.min_snapshots_to_keep)
self.remove_branch(name)

return self

def set_current_snapshot(self, snapshot_id: int | None = None, ref_name: str | None = None) -> ManageSnapshots:
"""Set the current snapshot to a specific snapshot ID or ref.

Expand Down
80 changes: 79 additions & 1 deletion tests/integration/test_snapshot_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from pyiceberg.catalog import Catalog
from pyiceberg.table import Table
from pyiceberg.table.refs import SnapshotRef
from pyiceberg.table.refs import SnapshotRef, SnapshotRefType


@pytest.fixture
Expand Down Expand Up @@ -107,6 +107,84 @@ def test_remove_branch(catalog: Catalog) -> None:
assert tbl.metadata.refs.get(branch_name, None) is None


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [lf("session_catalog_hive"), lf("session_catalog")])
def test_rename_branch(catalog: Catalog) -> None:
identifier = "default.test_table_snapshot_operations"
tbl = catalog.load_table(identifier)
assert len(tbl.history()) > 2

# create the branch to rename
name = "source"
snapshot_id = tbl.history()[-2].snapshot_id
tbl.manage_snapshots().create_branch(
snapshot_id=snapshot_id, branch_name=name, max_ref_age_ms=1, max_snapshot_age_ms=2, min_snapshots_to_keep=3
).commit()
assert tbl.metadata.refs[name] == SnapshotRef(
snapshot_id=snapshot_id, snapshot_ref_type="branch", max_ref_age_ms=1, max_snapshot_age_ms=2, min_snapshots_to_keep=3
)

# rename the branch
new_name = "target"
tbl.manage_snapshots().rename_branch(name=name, new_name=new_name).commit()
assert tbl.metadata.refs.get(name, None) is None

# all attributes should be the same, except for the new name
renamed_ref = tbl.metadata.refs.get(new_name, None)
assert renamed_ref is not None
assert renamed_ref.snapshot_id == snapshot_id
assert renamed_ref.snapshot_ref_type == SnapshotRefType.BRANCH
assert renamed_ref.max_ref_age_ms == 1
assert renamed_ref.max_snapshot_age_ms == 2
assert renamed_ref.min_snapshots_to_keep == 3


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [lf("session_catalog_hive"), lf("session_catalog")])
def test_rename_main_branch(catalog: Catalog) -> None:
identifier = "default.test_table_snapshot_operations"
tbl = catalog.load_table(identifier)

with pytest.raises(ValueError, match="Cannot rename main branch"):
tbl.manage_snapshots().rename_branch(name="main", new_name="renamed").commit()


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [lf("session_catalog_hive"), lf("session_catalog")])
def test_rename_missing_branch(catalog: Catalog) -> None:
identifier = "default.test_table_snapshot_operations"
tbl = catalog.load_table(identifier)

with pytest.raises(ValueError, match="Branch does not exist: test"):
tbl.manage_snapshots().rename_branch(name="test", new_name="renamed").commit()


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [lf("session_catalog_hive"), lf("session_catalog")])
def test_rename_tag(catalog: Catalog) -> None:
identifier = "default.test_table_snapshot_operations"
tbl = catalog.load_table(identifier)
snapshot_id = tbl.history()[-1].snapshot_id
tbl.manage_snapshots().create_tag(snapshot_id=snapshot_id, tag_name="test").commit()

with pytest.raises(ValueError, match="Ref test is not a branch"):
tbl.manage_snapshots().rename_branch(name="test", new_name="renamed").commit()


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [lf("session_catalog_hive"), lf("session_catalog")])
def test_rename_to_existing_branch(catalog: Catalog) -> None:
identifier = "default.test_table_snapshot_operations"
tbl = catalog.load_table(identifier)
snapshot_id = tbl.history()[-1].snapshot_id

tbl.manage_snapshots().create_branch(snapshot_id=snapshot_id, branch_name="source").commit()
tbl.manage_snapshots().create_branch(snapshot_id=snapshot_id, branch_name="target").commit()

with pytest.raises(ValueError, match="Ref target already exists"):
tbl.manage_snapshots().rename_branch(name="source", new_name="target").commit()


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [lf("session_catalog_hive"), lf("session_catalog")])
def test_set_current_snapshot(catalog: Catalog) -> None:
Expand Down