Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.lance.spark.utils.Utils.createPathBasedReadOptions;
import static org.lance.spark.utils.Utils.createReadOptions;

public abstract class BaseLanceNamespaceSparkCatalog
Expand Down Expand Up @@ -527,7 +528,7 @@ public boolean tableExists(Identifier ident) {
private boolean tableExistsAtPath(Identifier ident) {
String datasetUri = getDatasetUri(ident);
LanceSparkReadOptions readOptions =
createReadOptions(
createPathBasedReadOptions(
datasetUri, catalogConfig, Optional.empty(), Optional.empty(), Optional.empty(), name);
try (Dataset dataset = Utils.openDatasetBuilder(readOptions).build()) {
return true;
Expand Down Expand Up @@ -609,6 +610,7 @@ public Table createTable(
Optional.empty(),
Optional.of(namespace),
Optional.of(tableIdList),
Optional.ofNullable(initialStorageOptions),
name);
return createDataset(
readOptions,
Expand All @@ -632,7 +634,7 @@ private Table createTableAtPath(

StructType processedSchema = SchemaConverter.processSchemaWithProperties(schema, properties);
LanceSparkReadOptions readOptions =
createReadOptions(
createPathBasedReadOptions(
datasetUri, catalogConfig, Optional.empty(), Optional.empty(), Optional.empty(), name);

String fileFormatVersion = catalogConfig.getFileFormatVersion(properties);
Expand Down Expand Up @@ -848,6 +850,7 @@ public StagedTable stageCreate(
Optional.empty(),
Optional.of(namespace),
Optional.of(tableIdList),
Optional.ofNullable(initialStorageOptions),
name);

Schema arrowSchema = LanceArrowUtils.toArrowSchema(processedSchema, "UTC", true);
Expand Down Expand Up @@ -881,7 +884,7 @@ private StagedTable stageCreateAtPath(
StructType processedSchema = SchemaConverter.processSchemaWithProperties(schema, properties);

LanceSparkReadOptions readOptions =
createReadOptions(
createPathBasedReadOptions(
datasetUri, catalogConfig, Optional.empty(), Optional.empty(), Optional.empty(), name);

Schema arrowSchema = LanceArrowUtils.toArrowSchema(processedSchema, "UTC", true);
Expand Down Expand Up @@ -955,7 +958,7 @@ private StagedTable stageReplaceAtPath(
StructType processedSchema = SchemaConverter.processSchemaWithProperties(schema, properties);

LanceSparkReadOptions readOptions =
createReadOptions(
createPathBasedReadOptions(
datasetUri, catalogConfig, Optional.empty(), Optional.empty(), Optional.empty(), name);

Dataset ds;
Expand Down Expand Up @@ -1035,6 +1038,7 @@ public StagedTable stageCreateOrReplace(
Optional.empty(),
Optional.of(namespace),
Optional.of(tableIdList),
Optional.ofNullable(initialStorageOptions),
name);

Schema arrowSchema = LanceArrowUtils.toArrowSchema(processedSchema, "UTC", true);
Expand Down Expand Up @@ -1078,7 +1082,7 @@ private StagedTable stageCreateOrReplaceAtPath(
StructType processedSchema = SchemaConverter.processSchemaWithProperties(schema, properties);

LanceSparkReadOptions readOptions =
createReadOptions(
createPathBasedReadOptions(
datasetUri, catalogConfig, Optional.empty(), Optional.empty(), Optional.empty(), name);

boolean exists = tableExistsAtPath(ident);
Expand Down Expand Up @@ -1143,7 +1147,7 @@ private ResolvedTable resolveIdentifier(Identifier ident) throws NoSuchTableExce
if (isPathBasedIdentifier(ident)) {
String datasetUri = getDatasetUri(ident);
LanceSparkReadOptions readOptions =
createReadOptions(
createPathBasedReadOptions(
datasetUri,
catalogConfig,
Optional.empty(),
Expand Down Expand Up @@ -1171,6 +1175,7 @@ private ResolvedTable resolveIdentifier(Identifier ident) throws NoSuchTableExce
Optional.empty(),
Optional.of(namespace),
Optional.of(tableIdList),
Optional.ofNullable(describeResponse.getStorageOptions()),
name);
return new ResolvedTable(readOptions, describeResponse, tableIdList);
}
Expand Down Expand Up @@ -1328,6 +1333,7 @@ private Table loadTableInternal(
versionId,
Optional.of(namespace),
Optional.of(resolved.tableIdList),
Optional.ofNullable(initialStorageOptions),
name);
} else {
readOptions = resolved.readOptions;
Expand Down Expand Up @@ -1394,7 +1400,7 @@ private Table loadTableFromPath(
versionId = Optional.of(Utils.parseVersion(version.get()));
} else if (timestamp.isPresent()) {
LanceSparkReadOptions readOptions =
createReadOptions(
createPathBasedReadOptions(
datasetUri,
catalogConfig,
Optional.empty(),
Expand All @@ -1409,7 +1415,7 @@ private Table loadTableFromPath(
}

LanceSparkReadOptions readOptions =
createReadOptions(
createPathBasedReadOptions(
datasetUri, catalogConfig, versionId, Optional.empty(), Optional.empty(), name);

// Read schema, file format version, and config from the dataset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,16 @@ public Dataset build() {
}

/**
* Creates LanceSparkReadOptions for this catalog.
* Creates LanceSparkReadOptions for a namespace-backed table.
*
* @param location the dataset URI
* @param catalogConfig catalog configuration
* @param versionId optional dataset version id
* @param namespace optional namespace for credential vending
* @param tableId optional table identifier
* @param storageOptions optional per-table storage options returned by the namespace service
* (e.g. vended credentials from {@code describeTable} / {@code declareTable}); wins over
* {@code catalogConfig} storage options on key collisions
* @param catalogName catalog name for cache isolation
* @return a new LanceSparkReadOptions with catalog settings
*/
Expand All @@ -189,12 +192,10 @@ public static LanceSparkReadOptions createReadOptions(
Optional<Long> versionId,
Optional<LanceNamespace> namespace,
Optional<List<String>> tableId,
Optional<Map<String, String>> storageOptions,
String catalogName) {
LanceSparkReadOptions.Builder builder =
LanceSparkReadOptions.builder()
.datasetUri(location)
.withCatalogDefaults(catalogConfig)
.catalogName(catalogName);
LanceSparkReadOptions.builder().datasetUri(location).catalogName(catalogName);

if (versionId.isPresent()) {
builder.version(versionId.get().intValue());
Expand All @@ -205,10 +206,32 @@ public static LanceSparkReadOptions createReadOptions(
if (namespace.isPresent()) {
builder.namespace(namespace.get());
}

if (storageOptions.isPresent() && !storageOptions.get().isEmpty()) {
builder.storageOptions(storageOptions.get());
}
// Order matters: storage options above are set first so withCatalogDefaults
// treats catalog values as fallbacks under them, not overrides.
builder.withCatalogDefaults(catalogConfig);
return builder.build();
}

/**
* Convenience overload for path-based callers that have no namespace-vended storage options.
* Namespace-based call sites must use {@link #createReadOptions} with an explicit {@code
* storageOptions} argument so that credentials vended by the namespace service reach the read
* path.
*/
public static LanceSparkReadOptions createPathBasedReadOptions(
String location,
LanceSparkCatalogConfig catalogConfig,
Optional<Long> versionId,
Optional<LanceNamespace> namespace,
Optional<List<String>> tableId,
String catalogName) {
return createReadOptions(
location, catalogConfig, versionId, namespace, tableId, Optional.empty(), catalogName);
}

// Determine if the timestamp is in microseconds or nanoseconds and convert to Instant
private static Instant instantFromTimestamp(long timestamp) {
if (timestamp <= 0) {
Expand Down
Loading