diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index aaa79929256f6..973ef8884f2a0 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -24,7 +24,7 @@ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.HoodieConversionUtils.{toProperties, toScalaOption} import org.apache.hudi.HoodieSchemaConversionUtils.{convertStructTypeToHoodieSchema, getRecordNameAndNamespace} import org.apache.hudi.HoodieSparkSqlWriter.StreamingWriteParams -import org.apache.hudi.HoodieSparkSqlWriterInternal.{handleInsertDuplicates, shouldDropDuplicatesForInserts, shouldFailWhenDuplicatesFound} +import org.apache.hudi.HoodieSparkSqlWriterInternal.{handleInsertDuplicates, refreshSparkCatalogTableCache, shouldDropDuplicatesForInserts, shouldFailWhenDuplicatesFound} import org.apache.hudi.HoodieWriterUtils._ import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient} import org.apache.hudi.client.common.HoodieSparkEngineContext @@ -81,6 +81,7 @@ import java.util.function.BiConsumer import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.{Failure, Success, Try} +import scala.util.control.NonFatal object HoodieSparkSqlWriter { @@ -947,13 +948,7 @@ class HoodieSparkSqlWriterInternal { // Since Hive tables are now synced as Spark data source tables which are cached after Spark SQL queries // we must invalidate this table in the cache so writes are reflected in later queries if (metaSyncEnabled) { - getHiveTableNames(hoodieConfig).foreach(name => { - val syncDb = hoodieConfig.getStringOrDefault(HIVE_DATABASE) - val qualifiedTableName = String.join(".", syncDb, name) - if (spark.catalog.databaseExists(syncDb) && spark.catalog.tableExists(qualifiedTableName)) { - spark.catalog.refreshTable(qualifiedTableName) - } - }) + refreshSparkCatalogTableCache(spark, hoodieConfig.getStringOrDefault(HIVE_DATABASE), getHiveTableNames(hoodieConfig)) } true } @@ -1181,6 +1176,45 @@ class HoodieSparkSqlWriterInternal { } object HoodieSparkSqlWriterInternal { + private val log = LoggerFactory.getLogger(classOf[HoodieSparkSqlWriterInternal]) + + /** + * Best-effort invalidation of the Spark catalog relation cache for the just-synced table(s), so + * subsequent reads in the same Spark session reflect the new write. + * + * The table name is always qualified with the sync database ([[HIVE_DATABASE]] / + * `hoodie.datasource.hive_sync.database`) so a same-named table in another database - in + * particular the session's current/`default` database - is never resolved and refreshed by + * mistake (HUDI-18139). + * + * Any failure is logged and swallowed: by this point the data has already been committed and + * meta-synced successfully, so a cache-invalidation problem (a transient catalog error, or a + * same-named table backed by storage the writer cannot access) must never fail the write. + */ + def refreshSparkCatalogTableCache(spark: SparkSession, syncDb: String, tableNames: Seq[String]): Unit = { + try { + if (spark.catalog.databaseExists(syncDb)) { + tableNames.foreach { name => + val qualifiedTableName = String.join(".", syncDb, name) + try { + if (spark.catalog.tableExists(qualifiedTableName)) { + spark.catalog.refreshTable(qualifiedTableName) + } + } catch { + case NonFatal(e) => + log.warn(s"Failed to refresh Spark catalog cache for table '$qualifiedTableName' after a " + + s"successful write and meta-sync; the write is already committed. Skipping cache " + + s"invalidation for this table.", e) + } + } + } + } catch { + case NonFatal(e) => + log.warn(s"Failed to refresh Spark catalog cache for database '$syncDb' after a successful write " + + s"and meta-sync; the write is already committed. Skipping cache invalidation.", e) + } + } + // Check if duplicates should be dropped. def shouldDropDuplicatesForInserts(hoodieConfig: HoodieConfig): Boolean = { hoodieConfig.contains(INSERT_DUP_POLICY) && diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkCatalogCacheRefresh.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkCatalogCacheRefresh.scala new file mode 100644 index 0000000000000..0e4a60ff83854 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkCatalogCacheRefresh.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hudi.HoodieSparkSqlWriterInternal + +import org.apache.commons.io.FileUtils +import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase + +import java.io.File + +/** + * Regression test for HUDI-18139. + * + * After a successful write + meta-sync, Hudi invalidates the Spark catalog relation cache for the + * synced table so later reads in the same session see the new data. Two failure modes are covered: + * + * 1. The refresh must target the table in the SYNC database, never a same-named table in the + * session's current/`default` database (which, in the reported issue, pointed at a bucket the + * writer could not access and produced an AccessDenied). + * 2. The refresh is best-effort: even if it fails (e.g. the table's storage is momentarily + * inaccessible), it must NOT fail the write, which has already been committed and synced. + */ +class TestSparkCatalogCacheRefresh extends HoodieSparkSqlTestBase { + + test("HUDI-18139: post-sync catalog refresh targets the sync db and never fails a committed write") { + withTempDir { tmp => + val syncDb = "hudi_18139_sync_db" + val tableName = "refresh_t" + val pathDefault = new File(tmp, "default_refresh_t").getCanonicalPath + val pathSyncDb = new File(tmp, "syncdb_refresh_t").getCanonicalPath + try { + spark.sql(s"create database if not exists $syncDb") + // Same table name `refresh_t` in two databases, backed by different locations. + spark.sql( + s"""create table default.$tableName (id int, name string, ts long) using hudi + | location '$pathDefault' + | tblproperties (primaryKey = 'id', preCombineField = 'ts')""".stripMargin) + spark.sql(s"insert into default.$tableName values (1, 'a', 1)") + spark.sql( + s"""create table $syncDb.$tableName (id int, name string, ts long) using hudi + | location '$pathSyncDb' + | tblproperties (primaryKey = 'id', preCombineField = 'ts')""".stripMargin) + spark.sql(s"insert into $syncDb.$tableName values (1, 'a', 1)") + + // Current database is `default`; break the unrelated default.refresh_t so that ANY resolution + // of it fails (mimics the inaccessible wrong-bucket table from the issue). + spark.sql("use default") + FileUtils.deleteDirectory(new File(pathDefault)) + + // (1) Refreshing for the sync db must target `$syncDb.refresh_t` (intact), never the broken + // `default.refresh_t`. A buggy (unqualified) refresh would resolve `default.refresh_t` + // and throw here. + HoodieSparkSqlWriterInternal.refreshSparkCatalogTableCache(spark, syncDb, Seq(tableName)) + + // (2) Best-effort: even when the intended table's storage is also inaccessible, the refresh + // must swallow the error and not fail the (already committed + synced) write. + FileUtils.deleteDirectory(new File(pathSyncDb)) + HoodieSparkSqlWriterInternal.refreshSparkCatalogTableCache(spark, syncDb, Seq(tableName)) + } finally { + spark.sql("use default") + spark.sql(s"drop table if exists default.$tableName") + spark.sql(s"drop table if exists $syncDb.$tableName") + spark.sql(s"drop database if exists $syncDb cascade") + } + } + } + + test("HUDI-18139: refresh invalidates the cached relation so newly committed data becomes visible") { + withTempDir { tmp => + val syncDb = "hudi_18139_fresh_db" + val tableName = "fresh_t" + val path = new File(tmp, "fresh_t").getCanonicalPath + try { + spark.sql(s"create database if not exists $syncDb") + spark.sql( + s"""create table $syncDb.$tableName (id int, name string, ts long) using hudi + | location '$path' + | tblproperties (primaryKey = 'id', preCombineField = 'ts')""".stripMargin) + spark.sql(s"insert into $syncDb.$tableName values (1, 'a', 1)") + + // Read once to cache the catalog relation (file listing). + assertResult(1)(spark.table(s"$syncDb.$tableName").count()) + + // Append a row directly to the table's storage, bypassing the catalog so the cached relation + // is now stale - this mimics the just-completed write whose data Hudi must make visible. + spark.sql("select 2 as id, 'b' as name, cast(2 as bigint) as ts") + .write.format("hudi") + .option("hoodie.datasource.write.recordkey.field", "id") + .option("hoodie.datasource.write.precombine.field", "ts") + .option("hoodie.datasource.write.partitionpath.field", "") + .option("hoodie.table.name", tableName) + .mode(SaveMode.Append) + .save(path) + + // Without refresh the cached relation still reports the stale row count, proving the refresh + // below is doing real work (not a no-op). + assertResult(1)(spark.table(s"$syncDb.$tableName").count()) + + HoodieSparkSqlWriterInternal.refreshSparkCatalogTableCache(spark, syncDb, Seq(tableName)) + + assertResult(2)(spark.table(s"$syncDb.$tableName").count()) + } finally { + spark.sql("use default") + spark.sql(s"drop table if exists $syncDb.$tableName") + spark.sql(s"drop database if exists $syncDb cascade") + } + } + } +}