Skip to content
14 changes: 7 additions & 7 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import re
import uuid
from abc import ABC, abstractmethod
from collections.abc import Callable
from collections.abc import Callable, Iterator
from dataclasses import dataclass
from enum import Enum
from typing import (
Expand Down Expand Up @@ -607,42 +607,42 @@ def drop_namespace(self, namespace: str | Identifier) -> None:
"""

@abstractmethod
def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
def list_tables(self, namespace: str | Identifier) -> Iterator[Identifier]:
"""List tables under the given namespace in the catalog.

Args:
namespace (str | Identifier): Namespace identifier to search.

Returns:
List[Identifier]: list of table identifiers.
Iterator[Identifier]: an iterator of table identifiers.

Raises:
NoSuchNamespaceError: If a namespace with the given name does not exist.
"""

@abstractmethod
def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:
def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifier]:
"""List namespaces from the given namespace. If not given, list top-level namespaces from the catalog.

Args:
namespace (str | Identifier): Namespace identifier to search.

Returns:
List[Identifier]: a List of namespace identifiers.
Iterator[Identifier]: an iterator of namespace identifiers.

Raises:
NoSuchNamespaceError: If a namespace with the given name does not exist.
"""

@abstractmethod
def list_views(self, namespace: str | Identifier) -> list[Identifier]:
def list_views(self, namespace: str | Identifier) -> Iterator[Identifier]:
"""List views under the given namespace in the catalog.

Args:
namespace (str | Identifier): Namespace identifier to search.

Returns:
List[Identifier]: list of table identifiers.
Iterator[Identifier]: an iterator of view identifiers.

Raises:
NoSuchNamespaceError: If a namespace with the given name does not exist.
Expand Down
11 changes: 6 additions & 5 deletions pyiceberg/catalog/bigquery_metastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from __future__ import annotations

import json
from collections.abc import Iterator
from typing import TYPE_CHECKING, Any

from google.api_core.exceptions import NotFound
Expand Down Expand Up @@ -252,7 +253,7 @@ def drop_namespace(self, namespace: str | Identifier) -> None:
raise NoSuchNamespaceError(f"Namespace {namespace} does not exist.") from e

@override
def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
def list_tables(self, namespace: str | Identifier) -> Iterator[Identifier]:
database_name = self.identifier_to_database(namespace)
iceberg_tables: list[Identifier] = []
try:
Expand All @@ -264,18 +265,18 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
iceberg_tables.append((database_name, bq_table_list_item.table_id))
except NotFound:
raise NoSuchNamespaceError(f"Namespace (dataset) '{database_name}' not found.") from None
return iceberg_tables
return iter(iceberg_tables)

@override
def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:
def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifier]:
# Since this catalog only supports one-level namespaces, it always returns an empty list unless
# passed an empty namespace to list all namespaces within the catalog.
if namespace:
raise NoSuchNamespaceError(f"Namespace (dataset) '{namespace}' not found.") from None

# List top-level datasets
datasets_iterator = self.client.list_datasets()
return [(dataset.dataset_id,) for dataset in datasets_iterator]
return iter([(dataset.dataset_id,) for dataset in datasets_iterator])

@override
def register_table(self, identifier: str | Identifier, metadata_location: str, overwrite: bool = False) -> Table:
Expand Down Expand Up @@ -314,7 +315,7 @@ def register_table(self, identifier: str | Identifier, metadata_location: str, o
return self.load_table(identifier=identifier)

@override
def list_views(self, namespace: str | Identifier) -> list[Identifier]:
def list_views(self, namespace: str | Identifier) -> Iterator[Identifier]:
raise NotImplementedError

@override
Expand Down
22 changes: 13 additions & 9 deletions pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
import uuid
from collections.abc import Iterator
from time import time
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -396,8 +397,11 @@ def drop_namespace(self, namespace: str | Identifier) -> None:
database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
table_identifiers = self.list_tables(namespace=database_name)

if len(table_identifiers) > 0:
try:
next(table_identifiers)
raise NamespaceNotEmptyError(f"Database {database_name} is not empty")
except StopIteration:
pass

try:
self._delete_dynamo_item(
Expand All @@ -409,14 +413,14 @@ def drop_namespace(self, namespace: str | Identifier) -> None:
raise NoSuchNamespaceError(f"Database does not exist: {database_name}") from e

@override
def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
def list_tables(self, namespace: str | Identifier) -> Iterator[Identifier]:
"""List Iceberg tables under the given namespace in the catalog.

Args:
namespace (str | Identifier): Namespace identifier to search.

Returns:
List[Identifier]: list of table identifiers.
Iterator[Identifier]: an iterator of table identifiers.
"""
database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)

Expand Down Expand Up @@ -451,20 +455,20 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]:

table_identifiers.append(self.identifier_to_tuple(identifier_col))

return table_identifiers
return iter(table_identifiers)

@override
def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:
def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifier]:
"""List top-level namespaces from the catalog.

We do not support hierarchical namespace.

Returns:
List[Identifier]: a List of namespace identifiers.
Iterator[Identifier]: an iterator of namespace identifiers.
"""
# Hierarchical namespace is not supported. Return an empty list
if namespace:
return []
return iter([])

paginator = self.dynamodb.get_paginator("query")

Expand Down Expand Up @@ -494,7 +498,7 @@ def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:
namespace_col = _dict[DYNAMODB_COL_NAMESPACE]
database_identifiers.append(self.identifier_to_tuple(namespace_col))

return database_identifiers
return iter(database_identifiers)

@override
def load_namespace_properties(self, namespace: str | Identifier) -> Properties:
Expand Down Expand Up @@ -565,7 +569,7 @@ def create_view(
raise NotImplementedError

@override
def list_views(self, namespace: str | Identifier) -> list[Identifier]:
def list_views(self, namespace: str | Identifier) -> Iterator[Identifier]:
raise NotImplementedError

@override
Expand Down
17 changes: 9 additions & 8 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@


import logging
from collections.abc import Iterator
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -860,14 +861,14 @@ def drop_namespace(self, namespace: str | Identifier) -> None:
self.glue.delete_database(Name=database_name)

@override
def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
def list_tables(self, namespace: str | Identifier) -> Iterator[Identifier]:
"""List Iceberg tables under the given namespace in the catalog.

Args:
namespace (str | Identifier): Namespace identifier to search.

Returns:
List[Identifier]: list of table identifiers.
Iterator[Identifier]: an iterator of table identifiers.

Raises:
NoSuchNamespaceError: If a namespace with the given name does not exist, or the identifier is invalid.
Expand All @@ -889,18 +890,18 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]:

except self.glue.exceptions.EntityNotFoundException as e:
raise NoSuchNamespaceError(f"Database does not exist: {database_name}") from e
return [(database_name, table["Name"]) for table in table_list if self.__is_iceberg_table(table)]
return iter([(database_name, table["Name"]) for table in table_list if self.__is_iceberg_table(table)])

@override
def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:
def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifier]:
"""List namespaces from the given namespace. If not given, list top-level namespaces from the catalog.

Returns:
List[Identifier]: a List of namespace identifiers.
Iterator[Identifier]: an iterator of namespace identifiers.
"""
# Hierarchical namespace is not supported. Return an empty list
if namespace:
return []
return iter([])

database_list: list[DatabaseTypeDef] = []
next_token: str | None = None
Expand All @@ -912,7 +913,7 @@ def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:
if not next_token:
break

return [self.identifier_to_tuple(database["Name"]) for database in database_list]
return iter([self.identifier_to_tuple(database["Name"]) for database in database_list])

@override
def load_namespace_properties(self, namespace: str | Identifier) -> Properties:
Expand Down Expand Up @@ -982,7 +983,7 @@ def create_view(
raise NotImplementedError

@override
def list_views(self, namespace: str | Identifier) -> list[Identifier]:
def list_views(self, namespace: str | Identifier) -> Iterator[Identifier]:
raise NotImplementedError

@override
Expand Down
31 changes: 17 additions & 14 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import logging
import socket
import time
from collections.abc import Iterator
from types import TracebackType
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -479,7 +480,7 @@ def register_table(self, identifier: str | Identifier, metadata_location: str, o
return self._convert_hive_into_iceberg(hive_table)

@override
def list_views(self, namespace: str | Identifier) -> list[Identifier]:
def list_views(self, namespace: str | Identifier) -> Iterator[Identifier]:
raise NotImplementedError

@override
Expand Down Expand Up @@ -760,7 +761,7 @@ def drop_namespace(self, namespace: str | Identifier) -> None:
raise NoSuchNamespaceError(f"Database does not exists: {database_name}") from e

@override
def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
def list_tables(self, namespace: str | Identifier) -> Iterator[Identifier]:
"""List Iceberg tables under the given namespace in the catalog.

When the database doesn't exist, it will just return an empty list.
Expand All @@ -769,34 +770,36 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
namespace: Database to list.

Returns:
List[Identifier]: list of table identifiers.
Iterator[Identifier]: an iterator of table identifiers.

Raises:
NoSuchNamespaceError: If a namespace with the given name does not exist, or the identifier is invalid.
"""
database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
with self._client as open_client:
return [
(database_name, table.tableName)
for table in open_client.get_table_objects_by_name(
dbname=database_name, tbl_names=open_client.get_all_tables(db_name=database_name)
)
if table.parameters.get(TABLE_TYPE, "").lower() == ICEBERG
]
return iter(
[
(database_name, table.tableName)
for table in open_client.get_table_objects_by_name(
dbname=database_name, tbl_names=open_client.get_all_tables(db_name=database_name)
)
if table.parameters.get(TABLE_TYPE, "").lower() == ICEBERG
]
)

@override
def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:
def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifier]:
"""List namespaces from the given namespace. If not given, list top-level namespaces from the catalog.

Returns:
List[Identifier]: a List of namespace identifiers.
Iterator[Identifier]: an iterator of namespace identifiers.
"""
# Hierarchical namespace is not supported. Return an empty list
if namespace:
return []
return iter([])

with self._client as open_client:
return list(map(self.identifier_to_tuple, open_client.get_all_databases()))
return iter(list(map(self.identifier_to_tuple, open_client.get_all_databases())))

@override
def load_namespace_properties(self, namespace: str | Identifier) -> Properties:
Expand Down
7 changes: 4 additions & 3 deletions pyiceberg/catalog/noop.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations

from collections.abc import Iterator
from typing import (
TYPE_CHECKING,
)
Expand Down Expand Up @@ -124,11 +125,11 @@ def drop_namespace(self, namespace: str | Identifier) -> None:
raise NotImplementedError

@override
def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
def list_tables(self, namespace: str | Identifier) -> Iterator[Identifier]:
raise NotImplementedError

@override
def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:
def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifier]:
raise NotImplementedError

@override
Expand All @@ -142,7 +143,7 @@ def update_namespace_properties(
raise NotImplementedError

@override
def list_views(self, namespace: str | Identifier) -> list[Identifier]:
def list_views(self, namespace: str | Identifier) -> Iterator[Identifier]:
raise NotImplementedError

@override
Expand Down
Loading
Loading