diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/BaseLanceNamespaceSparkCatalog.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/BaseLanceNamespaceSparkCatalog.java index 6cb9ab925..7f5bbca54 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/BaseLanceNamespaceSparkCatalog.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/BaseLanceNamespaceSparkCatalog.java @@ -71,6 +71,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.lance.spark.utils.Utils.createPathBasedReadOptions; import static org.lance.spark.utils.Utils.createReadOptions; public abstract class BaseLanceNamespaceSparkCatalog @@ -527,7 +528,7 @@ public boolean tableExists(Identifier ident) { private boolean tableExistsAtPath(Identifier ident) { String datasetUri = getDatasetUri(ident); LanceSparkReadOptions readOptions = - createReadOptions( + createPathBasedReadOptions( datasetUri, catalogConfig, Optional.empty(), Optional.empty(), Optional.empty(), name); try (Dataset dataset = Utils.openDatasetBuilder(readOptions).build()) { return true; @@ -609,6 +610,7 @@ public Table createTable( Optional.empty(), Optional.of(namespace), Optional.of(tableIdList), + Optional.ofNullable(initialStorageOptions), name); return createDataset( readOptions, @@ -632,7 +634,7 @@ private Table createTableAtPath( StructType processedSchema = SchemaConverter.processSchemaWithProperties(schema, properties); LanceSparkReadOptions readOptions = - createReadOptions( + createPathBasedReadOptions( datasetUri, catalogConfig, Optional.empty(), Optional.empty(), Optional.empty(), name); String fileFormatVersion = catalogConfig.getFileFormatVersion(properties); @@ -848,6 +850,7 @@ public StagedTable stageCreate( Optional.empty(), Optional.of(namespace), Optional.of(tableIdList), + Optional.ofNullable(initialStorageOptions), name); Schema arrowSchema = LanceArrowUtils.toArrowSchema(processedSchema, "UTC", true); @@ -881,7 +884,7 @@ private StagedTable stageCreateAtPath( StructType processedSchema = SchemaConverter.processSchemaWithProperties(schema, properties); LanceSparkReadOptions readOptions = - createReadOptions( + createPathBasedReadOptions( datasetUri, catalogConfig, Optional.empty(), Optional.empty(), Optional.empty(), name); Schema arrowSchema = LanceArrowUtils.toArrowSchema(processedSchema, "UTC", true); @@ -955,7 +958,7 @@ private StagedTable stageReplaceAtPath( StructType processedSchema = SchemaConverter.processSchemaWithProperties(schema, properties); LanceSparkReadOptions readOptions = - createReadOptions( + createPathBasedReadOptions( datasetUri, catalogConfig, Optional.empty(), Optional.empty(), Optional.empty(), name); Dataset ds; @@ -1035,6 +1038,7 @@ public StagedTable stageCreateOrReplace( Optional.empty(), Optional.of(namespace), Optional.of(tableIdList), + Optional.ofNullable(initialStorageOptions), name); Schema arrowSchema = LanceArrowUtils.toArrowSchema(processedSchema, "UTC", true); @@ -1078,7 +1082,7 @@ private StagedTable stageCreateOrReplaceAtPath( StructType processedSchema = SchemaConverter.processSchemaWithProperties(schema, properties); LanceSparkReadOptions readOptions = - createReadOptions( + createPathBasedReadOptions( datasetUri, catalogConfig, Optional.empty(), Optional.empty(), Optional.empty(), name); boolean exists = tableExistsAtPath(ident); @@ -1143,7 +1147,7 @@ private ResolvedTable resolveIdentifier(Identifier ident) throws NoSuchTableExce if (isPathBasedIdentifier(ident)) { String datasetUri = getDatasetUri(ident); LanceSparkReadOptions readOptions = - createReadOptions( + createPathBasedReadOptions( datasetUri, catalogConfig, Optional.empty(), @@ -1171,6 +1175,7 @@ private ResolvedTable resolveIdentifier(Identifier ident) throws NoSuchTableExce Optional.empty(), Optional.of(namespace), Optional.of(tableIdList), + Optional.ofNullable(describeResponse.getStorageOptions()), name); return new ResolvedTable(readOptions, describeResponse, tableIdList); } @@ -1328,6 +1333,7 @@ private Table loadTableInternal( versionId, Optional.of(namespace), Optional.of(resolved.tableIdList), + Optional.ofNullable(initialStorageOptions), name); } else { readOptions = resolved.readOptions; @@ -1394,7 +1400,7 @@ private Table loadTableFromPath( versionId = Optional.of(Utils.parseVersion(version.get())); } else if (timestamp.isPresent()) { LanceSparkReadOptions readOptions = - createReadOptions( + createPathBasedReadOptions( datasetUri, catalogConfig, Optional.empty(), @@ -1409,7 +1415,7 @@ private Table loadTableFromPath( } LanceSparkReadOptions readOptions = - createReadOptions( + createPathBasedReadOptions( datasetUri, catalogConfig, versionId, Optional.empty(), Optional.empty(), name); // Read schema, file format version, and config from the dataset diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/utils/Utils.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/utils/Utils.java index eb019c646..9da4a17a1 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/utils/Utils.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/utils/Utils.java @@ -173,13 +173,16 @@ public Dataset build() { } /** - * Creates LanceSparkReadOptions for this catalog. + * Creates LanceSparkReadOptions for a namespace-backed table. * * @param location the dataset URI * @param catalogConfig catalog configuration * @param versionId optional dataset version id * @param namespace optional namespace for credential vending * @param tableId optional table identifier + * @param storageOptions optional per-table storage options returned by the namespace service + * (e.g. vended credentials from {@code describeTable} / {@code declareTable}); wins over + * {@code catalogConfig} storage options on key collisions * @param catalogName catalog name for cache isolation * @return a new LanceSparkReadOptions with catalog settings */ @@ -189,12 +192,10 @@ public static LanceSparkReadOptions createReadOptions( Optional versionId, Optional namespace, Optional> tableId, + Optional> storageOptions, String catalogName) { LanceSparkReadOptions.Builder builder = - LanceSparkReadOptions.builder() - .datasetUri(location) - .withCatalogDefaults(catalogConfig) - .catalogName(catalogName); + LanceSparkReadOptions.builder().datasetUri(location).catalogName(catalogName); if (versionId.isPresent()) { builder.version(versionId.get().intValue()); @@ -205,10 +206,32 @@ public static LanceSparkReadOptions createReadOptions( if (namespace.isPresent()) { builder.namespace(namespace.get()); } - + if (storageOptions.isPresent() && !storageOptions.get().isEmpty()) { + builder.storageOptions(storageOptions.get()); + } + // Order matters: storage options above are set first so withCatalogDefaults + // treats catalog values as fallbacks under them, not overrides. + builder.withCatalogDefaults(catalogConfig); return builder.build(); } + /** + * Convenience overload for path-based callers that have no namespace-vended storage options. + * Namespace-based call sites must use {@link #createReadOptions} with an explicit {@code + * storageOptions} argument so that credentials vended by the namespace service reach the read + * path. + */ + public static LanceSparkReadOptions createPathBasedReadOptions( + String location, + LanceSparkCatalogConfig catalogConfig, + Optional versionId, + Optional namespace, + Optional> tableId, + String catalogName) { + return createReadOptions( + location, catalogConfig, versionId, namespace, tableId, Optional.empty(), catalogName); + } + // Determine if the timestamp is in microseconds or nanoseconds and convert to Instant private static Instant instantFromTimestamp(long timestamp) { if (timestamp <= 0) {