From 839de8999e59889279cebc1276dbf7fab886778b Mon Sep 17 00:00:00 2001 From: Davis Zhang Date: Fri, 19 Jun 2026 06:22:28 -0700 Subject: [PATCH] [MINOR] Make HoodieTable AutoCloseable and release per-cycle file-system views A HoodieTable created per table-service work unit on the driver lazily builds a FileSystemViewManager. For a SPILLABLE_DISK view the file-group store spills to an on-disk BitCaskDiskMap. HoodieTable had no close() and the per-cycle tables created in BaseHoodieTableServiceClient were never closed, so their cached views' on-disk maps lingered until JVM exit. - HoodieTable now implements AutoCloseable; close() releases the metadata reader and, for locally-managed views (SPILLABLE_DISK / MEMORY / EMBEDDED_KV), the FileSystemViewManager. For REMOTE_ONLY/REMOTE_FIRST views close() leaves the view manager alone: that view talks to an embedded timeline server shared with the write client and later table-service cycles, so tearing it down here would break them. - Every create-use-drop HoodieTable in BaseHoodieTableServiceClient is wrapped in try-with-resources / try-finally (preCommit, logCompact, compact, cluster, scheduleTableServiceInternal, scheduleCleaning, purgePendingClustering, clean, rollbackFailedIndexingCommits, rollback, rollbackFailedBootstrap). - SpillableMapBasedFileSystemView drains its ExternalSpillableMaps from closeResources() (under the AbstractTableFileSystemView writeLock) instead of close(), so the on-disk maps are released while still referenced and without racing a concurrent reader. - Adds TestHoodieTableResourceLifecycle verifying the on-disk BitCaskDiskMap is released by HoodieTable.close(). Co-Authored-By: Claude Opus 4.8 --- .../client/BaseHoodieTableServiceClient.java | 257 +++++++++--------- .../org/apache/hudi/table/HoodieTable.java | 39 ++- .../TestHoodieTableResourceLifecycle.java | 125 +++++++++ .../view/SpillableMapBasedFileSystemView.java | 9 +- 4 files changed, 304 insertions(+), 126 deletions(-) create mode 100644 hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/TestHoodieTableResourceLifecycle.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java index 17106d8d940e5..44bbcc01af291 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java @@ -182,12 +182,13 @@ protected void setTableServiceTimer(WriteOperationType operationType) { protected void preCommit(HoodieCommitMetadata metadata, boolean executeConflictResolution) { // Create a Hoodie table after startTxn which encapsulated the commits and files visible. // Important to create this after the lock to ensure the latest commits show up in the timeline without need for reload - HoodieTable table = createTable(config, storageConf); - if (executeConflictResolution) { - resolveWriteConflict(table, metadata, this.pendingInflightAndRequestedInstants); + try (HoodieTable table = createTable(config, storageConf)) { + if (executeConflictResolution) { + resolveWriteConflict(table, metadata, this.pendingInflightAndRequestedInstants); + } + // Merge rolling metadata after conflict resolution, still within the lock + mergeRollingMetadata(table, metadata); } - // Merge rolling metadata after conflict resolution, still within the lock - mergeRollingMetadata(table, metadata); } /** @@ -219,41 +220,41 @@ private void inlineCompaction(HoodieTable table, Option> ext * @return Collection of Write Status */ protected HoodieWriteMetadata logCompact(String logCompactionInstantTime, boolean shouldComplete) { - HoodieTable table = createTable(config, context.getStorageConf()); - - // Check if a commit or compaction instant with a greater timestamp is on the timeline. - // If an instant is found then abort log compaction, since it is no longer needed. - Set actions = CollectionUtils.createSet(COMMIT_ACTION, COMPACTION_ACTION); - Option compactionInstantWithGreaterTimestamp = - Option.fromJavaOptional(table.getActiveTimeline().getInstantsAsStream() - .filter(hoodieInstant -> actions.contains(hoodieInstant.getAction())) - .filter(hoodieInstant -> compareTimestamps(hoodieInstant.requestedTime(), - GREATER_THAN, logCompactionInstantTime)) - .findFirst()); - if (compactionInstantWithGreaterTimestamp.isPresent()) { - throw new HoodieLogCompactException(String.format("Cannot log compact since a compaction instant with greater " - + "timestamp exists. Instant details %s", compactionInstantWithGreaterTimestamp.get())); - } + try (HoodieTable table = createTable(config, context.getStorageConf())) { + // Check if a commit or compaction instant with a greater timestamp is on the timeline. + // If an instant is found then abort log compaction, since it is no longer needed. + Set actions = CollectionUtils.createSet(COMMIT_ACTION, COMPACTION_ACTION); + Option compactionInstantWithGreaterTimestamp = + Option.fromJavaOptional(table.getActiveTimeline().getInstantsAsStream() + .filter(hoodieInstant -> actions.contains(hoodieInstant.getAction())) + .filter(hoodieInstant -> compareTimestamps(hoodieInstant.requestedTime(), + GREATER_THAN, logCompactionInstantTime)) + .findFirst()); + if (compactionInstantWithGreaterTimestamp.isPresent()) { + throw new HoodieLogCompactException(String.format("Cannot log compact since a compaction instant with greater " + + "timestamp exists. Instant details %s", compactionInstantWithGreaterTimestamp.get())); + } - HoodieTimeline pendingLogCompactionTimeline = table.getActiveTimeline().filterPendingLogCompactionTimeline(); - InstantGenerator instantGenerator = table.getMetaClient().getInstantGenerator(); - HoodieInstant inflightInstant = instantGenerator.getLogCompactionInflightInstant(logCompactionInstantTime); - if (pendingLogCompactionTimeline.containsInstant(inflightInstant)) { - log.info("Found Log compaction inflight file. Rolling back the commit and exiting."); - table.rollbackInflightLogCompaction(inflightInstant, commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false), txnManager); - table.getMetaClient().reloadActiveTimeline(); - throw new HoodieException("Execution is aborted since it found an Inflight logcompaction," - + "log compaction plans are mutable plans, so reschedule another logcompaction."); - } - logCompactionTimer = metrics.getLogCompactionCtx(); - WriteMarkersFactory.get(config.getMarkersType(), table, logCompactionInstantTime); - HoodieWriteMetadata writeMetadata = table.logCompact(context, logCompactionInstantTime); - HoodieWriteMetadata updatedWriteMetadata = partialUpdateTableMetadata(table, writeMetadata, logCompactionInstantTime, WriteOperationType.LOG_COMPACT); - HoodieWriteMetadata logCompactionMetadata = convertToOutputMetadata(updatedWriteMetadata); - if (shouldComplete) { - commitLogCompaction(logCompactionInstantTime, logCompactionMetadata, Option.of(table)); + HoodieTimeline pendingLogCompactionTimeline = table.getActiveTimeline().filterPendingLogCompactionTimeline(); + InstantGenerator instantGenerator = table.getMetaClient().getInstantGenerator(); + HoodieInstant inflightInstant = instantGenerator.getLogCompactionInflightInstant(logCompactionInstantTime); + if (pendingLogCompactionTimeline.containsInstant(inflightInstant)) { + log.info("Found Log compaction inflight file. Rolling back the commit and exiting."); + table.rollbackInflightLogCompaction(inflightInstant, commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false), txnManager); + table.getMetaClient().reloadActiveTimeline(); + throw new HoodieException("Execution is aborted since it found an Inflight logcompaction," + + "log compaction plans are mutable plans, so reschedule another logcompaction."); + } + logCompactionTimer = metrics.getLogCompactionCtx(); + WriteMarkersFactory.get(config.getMarkersType(), table, logCompactionInstantTime); + HoodieWriteMetadata writeMetadata = table.logCompact(context, logCompactionInstantTime); + HoodieWriteMetadata updatedWriteMetadata = partialUpdateTableMetadata(table, writeMetadata, logCompactionInstantTime, WriteOperationType.LOG_COMPACT); + HoodieWriteMetadata logCompactionMetadata = convertToOutputMetadata(updatedWriteMetadata); + if (shouldComplete) { + commitLogCompaction(logCompactionInstantTime, logCompactionMetadata, Option.of(table)); + } + return logCompactionMetadata; } - return logCompactionMetadata; } /** @@ -310,8 +311,9 @@ public Option scheduleCompaction(Option> extraMetada * @return Collection of Write Status */ protected HoodieWriteMetadata compact(String compactionInstantTime, boolean shouldComplete) { - HoodieTable table = createTable(config, context.getStorageConf()); - return compact(table, compactionInstantTime, shouldComplete); + try (HoodieTable table = createTable(config, context.getStorageConf())) { + return compact(table, compactionInstantTime, shouldComplete); + } } /** @@ -515,42 +517,44 @@ public Option scheduleClustering(Option> extraMetada * @return Collection of Write Status */ public HoodieWriteMetadata cluster(String clusteringInstant, boolean shouldComplete) { - HoodieTable table = createTable(config, context.getStorageConf()); - HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceOrClusteringTimeline(); - Option inflightInstantOpt = ClusteringUtils.getInflightClusteringInstant(clusteringInstant, table.getActiveTimeline(), - table.getMetaClient().getInstantGenerator()); - if (inflightInstantOpt.isPresent()) { - if (pendingClusteringTimeline.isPendingClusteringInstant(inflightInstantOpt.get().requestedTime())) { - table.rollbackInflightClustering(inflightInstantOpt.get(), commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false), txnManager); - table.getMetaClient().reloadActiveTimeline(); - } else { - throw new HoodieClusteringException("Non clustering replace-commit inflight at timestamp " + clusteringInstant); + try (HoodieTable table = createTable(config, context.getStorageConf())) { + HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceOrClusteringTimeline(); + Option inflightInstantOpt = ClusteringUtils.getInflightClusteringInstant(clusteringInstant, table.getActiveTimeline(), + table.getMetaClient().getInstantGenerator()); + if (inflightInstantOpt.isPresent()) { + if (pendingClusteringTimeline.isPendingClusteringInstant(inflightInstantOpt.get().requestedTime())) { + table.rollbackInflightClustering(inflightInstantOpt.get(), commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false), txnManager); + table.getMetaClient().reloadActiveTimeline(); + } else { + throw new HoodieClusteringException("Non clustering replace-commit inflight at timestamp " + clusteringInstant); + } } - } - clusteringTimer = metrics.getClusteringCtx(); - log.info("Starting clustering at {} for table {}", clusteringInstant, table.getConfig().getBasePath()); - HoodieWriteMetadata writeMetadata = table.cluster(context, clusteringInstant); - HoodieWriteMetadata updatedWriteMetadata = partialUpdateTableMetadata(table, writeMetadata, clusteringInstant, WriteOperationType.CLUSTER); - HoodieWriteMetadata clusteringMetadata = convertToOutputMetadata(updatedWriteMetadata); + clusteringTimer = metrics.getClusteringCtx(); + log.info("Starting clustering at {} for table {}", clusteringInstant, table.getConfig().getBasePath()); + HoodieWriteMetadata writeMetadata = table.cluster(context, clusteringInstant); + HoodieWriteMetadata updatedWriteMetadata = partialUpdateTableMetadata(table, writeMetadata, clusteringInstant, WriteOperationType.CLUSTER); + HoodieWriteMetadata clusteringMetadata = convertToOutputMetadata(updatedWriteMetadata); - // TODO : Where is shouldComplete used ? - if (shouldComplete) { - commitClustering(clusteringMetadata, table, clusteringInstant); + // TODO : Where is shouldComplete used ? + if (shouldComplete) { + commitClustering(clusteringMetadata, table, clusteringInstant); + } + return clusteringMetadata; } - return clusteringMetadata; } public boolean purgePendingClustering(String clusteringInstant) { - HoodieTable table = createTable(config, context.getStorageConf()); - Option inflightInstantOpt = ClusteringUtils.getInflightClusteringInstant(clusteringInstant, table.getActiveTimeline(), - table.getMetaClient().getInstantGenerator()); - if (inflightInstantOpt.isPresent()) { - table.rollbackInflightClustering(inflightInstantOpt.get(), commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false), true, txnManager); - table.getMetaClient().reloadActiveTimeline(); - return true; + try (HoodieTable table = createTable(config, context.getStorageConf())) { + Option inflightInstantOpt = ClusteringUtils.getInflightClusteringInstant(clusteringInstant, table.getActiveTimeline(), + table.getMetaClient().getInstantGenerator()); + if (inflightInstantOpt.isPresent()) { + table.rollbackInflightClustering(inflightInstantOpt.get(), commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false), true, txnManager); + table.getMetaClient().reloadActiveTimeline(); + return true; + } + return false; } - return false; } /** @@ -724,15 +728,16 @@ Option scheduleTableServiceInternal(Option providedInstantTime, if (tableServiceType == TableServiceType.CLEAN) { // Cleaning is a frequent operation that does not conflict with other operations and is idempotent, // so it is handled differently to avoid locking for planning. - return scheduleCleaning(createTable(config, storageConf), providedInstantTime); + try (HoodieTable table = createTable(config, storageConf)) { + return scheduleCleaning(table, providedInstantTime); + } } Option lastCompletedInstant = lastCompletedTxnAndMetadata.isPresent() ? Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty(); txnManager.beginStateChange(Option.empty(), lastCompletedInstant); - try { + try (HoodieTable table = createTable(config, storageConf)) { Option option; - HoodieTable table = createTable(config, storageConf); String instantTime = providedInstantTime.orElseGet(() -> createNewInstantTime(false)); switch (tableServiceType) { @@ -862,46 +867,51 @@ HoodieCleanMetadata clean(Option suppliedCleanInstant, boolean scheduleI } final Timer.Context timerContext = metrics.getCleanCtx(); HoodieTable initialTable = createTable(config, storageConf); - HoodieTable table; - if (CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(), - HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites(initialTable.getMetaClient()))) { - // if rollback occurred, reload the table - table = createTable(config, storageConf); - } else { - table = initialTable; - } - Option inflightClean = table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().map(HoodieInstant::requestedTime); - Option cleanInstantTime = Option.empty(); - if (config.allowMultipleCleans() || inflightClean.isEmpty()) { - log.info("Cleaner started for table {}", config.getBasePath()); - // proceed only if multiple clean schedules are enabled or if there are no pending cleans. - if (scheduleInline) { - cleanInstantTime = scheduleCleaning(table, suppliedCleanInstant); + HoodieTable table = initialTable; + try { + if (CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(), + HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites(initialTable.getMetaClient()))) { + // if rollback occurred, reload the table + table = createTable(config, storageConf); } + Option inflightClean = table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().map(HoodieInstant::requestedTime); + Option cleanInstantTime = Option.empty(); + if (config.allowMultipleCleans() || inflightClean.isEmpty()) { + log.info("Cleaner started for table {}", config.getBasePath()); + // proceed only if multiple clean schedules are enabled or if there are no pending cleans. + if (scheduleInline) { + cleanInstantTime = scheduleCleaning(table, suppliedCleanInstant); + } - if (shouldDelegateToTableServiceManager(config, ActionType.clean)) { - log.info("Cleaning is not yet supported with Table Service Manager."); - return null; + if (shouldDelegateToTableServiceManager(config, ActionType.clean)) { + log.info("Cleaning is not yet supported with Table Service Manager."); + return null; + } } - } - HoodieCleanMetadata metadata; - if (inflightClean.isPresent() || cleanInstantTime.isPresent()) { - table.getMetaClient().reloadActiveTimeline(); - // Proceeds to execute any requested or inflight clean instances in the timeline - String cleanInstantToExecute = cleanInstantTime.isPresent() ? cleanInstantTime.get() : inflightClean.get(); - metadata = table.clean(context, cleanInstantToExecute); - releaseResources(cleanInstantToExecute); - } else { - metadata = null; - } - if (timerContext != null && metadata != null) { - long durationMs = metrics.getDurationInMs(timerContext.stop()); - metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted()); - log.info("Cleaned {} files Earliest Retained Instant :{} cleanerElapsedMs: {}", - metadata.getTotalFilesDeleted(), metadata.getEarliestCommitToRetain(), durationMs); + HoodieCleanMetadata metadata; + if (inflightClean.isPresent() || cleanInstantTime.isPresent()) { + table.getMetaClient().reloadActiveTimeline(); + // Proceeds to execute any requested or inflight clean instances in the timeline + String cleanInstantToExecute = cleanInstantTime.isPresent() ? cleanInstantTime.get() : inflightClean.get(); + metadata = table.clean(context, cleanInstantToExecute); + releaseResources(cleanInstantToExecute); + } else { + metadata = null; + } + if (timerContext != null && metadata != null) { + long durationMs = metrics.getDurationInMs(timerContext.stop()); + metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted()); + log.info("Cleaned {} files Earliest Retained Instant :{} cleanerElapsedMs: {}", + metadata.getTotalFilesDeleted(), metadata.getEarliestCommitToRetain(), durationMs); + } + return metadata; + } finally { + table.close(); + if (table != initialTable) { + initialTable.close(); + } } - return metadata; } /** @@ -1055,12 +1065,13 @@ protected Map> getPendingRollbackInfos * @return {@code true} if rollback happens; {@code false} otherwise. */ protected boolean rollbackFailedIndexingCommits() { - HoodieTable table = createTable(config, storageConf); - List instantsToRollback = getFailedIndexingCommitsToRollbackForMetadataTable(table.getMetaClient()); - Map> pendingRollbacks = getPendingRollbackInfos(table.getMetaClient()); - instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty())); - rollbackFailedWrites(pendingRollbacks); - return !pendingRollbacks.isEmpty(); + try (HoodieTable table = createTable(config, storageConf)) { + List instantsToRollback = getFailedIndexingCommitsToRollbackForMetadataTable(table.getMetaClient()); + Map> pendingRollbacks = getPendingRollbackInfos(table.getMetaClient()); + instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty())); + rollbackFailedWrites(pendingRollbacks); + return !pendingRollbacks.isEmpty(); + } } private List getFailedIndexingCommitsToRollbackForMetadataTable(HoodieTableMetaClient metaClient) { @@ -1248,8 +1259,7 @@ public boolean rollback(final String commitInstantTime, Option commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream() .filter(instant -> EQUALS.test(instant.requestedTime(), commitInstantTime)) .findFirst()); @@ -1391,15 +1401,16 @@ private Option>> resolveOrSchedul */ public void rollbackFailedBootstrap() { log.info("Rolling back pending bootstrap if present"); - HoodieTable table = createTable(config, storageConf); - HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompactionAndLogCompaction(); - Option instant = Option.fromJavaOptional( - inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::requestedTime).findFirst()); - if (instant.isPresent() && compareTimestamps(instant.get(), LESSER_THAN_OR_EQUALS, - HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) { - log.info("Found pending bootstrap instants. Rolling them back"); - executeUsingTxnManager(Option.empty(), () -> table.rollbackBootstrap(context, createNewInstantTime(false))); - log.info("Finished rolling back pending bootstrap"); + try (HoodieTable table = createTable(config, storageConf)) { + HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompactionAndLogCompaction(); + Option instant = Option.fromJavaOptional( + inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::requestedTime).findFirst()); + if (instant.isPresent() && compareTimestamps(instant.get(), LESSER_THAN_OR_EQUALS, + HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) { + log.info("Found pending bootstrap instants. Rolling them back"); + executeUsingTxnManager(Option.empty(), () -> table.rollbackBootstrap(context, createNewInstantTime(false))); + log.info("Finished rolling back pending bootstrap"); + } } // if bootstrap failed, lets delete metadata and restart from scratch diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index a857ef4417d6b..5f376b9779678 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -63,6 +63,7 @@ import org.apache.hudi.common.table.timeline.InstantFileNameParser; import org.apache.hudi.common.table.timeline.InstantGenerator; import org.apache.hudi.common.table.view.FileSystemViewManager; +import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView; @@ -135,7 +136,7 @@ * @param Type of outputs */ @Slf4j -public abstract class HoodieTable implements Serializable { +public abstract class HoodieTable implements Serializable, AutoCloseable { @Getter protected final HoodieWriteConfig config; @@ -196,6 +197,42 @@ public synchronized FileSystemViewManager getViewManager() { return viewManager; } + /** + * Releases the resources held by this table instance: the lazily-built {@link FileSystemViewManager} + * (for local views only) and the table metadata reader. + * + *

A locally-managed view ({@code SPILLABLE_DISK}/{@code MEMORY}/{@code EMBEDDED_KV}) caches a + * file-system view backed by on-disk spillable maps; closing it here frees those promptly. A + * {@code REMOTE_ONLY}/{@code REMOTE_FIRST} view, however, talks to an embedded timeline server shared + * with the write client and other table-service cycles -- closing it here would tear down server-side + * state still in use and break subsequent operations -- so the view manager is left alone for remote + * views. + * + *

Idempotent and safe to call when nothing was lazily instantiated; never throws. + */ + @Override + public synchronized void close() { + if (viewManager != null) { + FileSystemViewStorageType storageType = config.getViewStorageConfig().getStorageType(); + if (storageType != FileSystemViewStorageType.REMOTE_ONLY && storageType != FileSystemViewStorageType.REMOTE_FIRST) { + try { + viewManager.close(); + } catch (Exception e) { + log.warn("Failed to close FileSystemViewManager for table {}", config.getBasePath(), e); + } + } + viewManager = null; + } + if (metadata != null) { + try { + metadata.close(); + } catch (Exception e) { + log.warn("Failed to close metadata reader for table {}", config.getBasePath(), e); + } + metadata = null; + } + } + /** * Upsert a batch of new records into Hoodie table at the supplied instantTime. * @param context HoodieEngineContext diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/TestHoodieTableResourceLifecycle.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/TestHoodieTableResourceLifecycle.java new file mode 100644 index 0000000000000..0e0befb07bf8a --- /dev/null +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/TestHoodieTableResourceLifecycle.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table; + +import org.apache.hudi.client.HoodieJavaWriteClient; +import org.apache.hudi.common.engine.EngineType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.table.view.FileSystemViewStorageType; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.testutils.HoodieJavaClientTestHarness; + +import org.junit.jupiter.api.Test; + +import java.io.File; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Verifies that a per-cycle {@link HoodieTable} releases the resources held by its lazily-built + * {@code FileSystemViewManager}. For a {@code SPILLABLE_DISK} view the file-group store spills to an + * on-disk {@code BitCaskDiskMap}; {@link HoodieTable#close()} must close the view manager so the disk + * map is cleaned up immediately instead of lingering until JVM exit. + */ +public class TestHoodieTableResourceLifecycle extends HoodieJavaClientTestHarness { + + private HoodieWriteConfig spillableDiskViewConfig(String spillDir) { + // maxMemoryForView=0 forces every file-group entry to spill to a BitCaskDiskMap on first put; + // the metadata table is disabled so the only spillable store is the file-system view's file-group map. + return HoodieWriteConfig.newBuilder() + .withPath(basePath) + .withEngineType(EngineType.JAVA) + .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withParallelism(1, 1) + .withDeleteParallelism(1) + .withEmbeddedTimelineServerEnabled(false) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withStorageType(FileSystemViewStorageType.SPILLABLE_DISK) + .withBaseStoreDir(spillDir) + .withMaxMemoryForView(0L) + .build()) + .build(); + } + + @Test + public void testUnclosedHoodieTableLeavesSpillableViewOnDisk() throws Exception { + String spillDir = basePath + "/.view_spill"; + HoodieWriteConfig writeConfig = spillableDiskViewConfig(spillDir); + try (HoodieJavaWriteClient client = getHoodieWriteClient(writeConfig)) { + insertFirstBatch(writeConfig, client, "001", "000", 10, HoodieJavaWriteClient::insert, false, true, 10, + metaClient.getInstantGenerator()); + } + metaClient = HoodieTableMetaClient.reload(metaClient); + + // Mimic one table-service work unit on the driver: build a HoodieTable, materialize its spillable + // file-system view (forcing a spill to a BitCaskDiskMap), then drop the table without closing it. + HoodieTable table = HoodieJavaTable.create(writeConfig, context, metaClient); + table.getHoodieView().loadAllPartitions(); + assertTrue(countDiskMapDirs(spillDir) > 0, + "Spilling the file-group view should create an on-disk BitCaskDiskMap directory"); + + table = null; + System.gc(); + Thread.sleep(200); + // An unclosed HoodieTable leaves its disk map around until JVM exit; nothing closed the + // table -> view -> map, so GC does not release the on-disk resources. + assertTrue(countDiskMapDirs(spillDir) > 0, + "An unclosed HoodieTable must leave the spillable view's on-disk BitCaskDiskMap behind"); + } + + @Test + public void testCloseReleasesSpillableFileSystemView() throws Exception { + String spillDir = basePath + "/.view_spill"; + HoodieWriteConfig writeConfig = spillableDiskViewConfig(spillDir); + try (HoodieJavaWriteClient client = getHoodieWriteClient(writeConfig)) { + insertFirstBatch(writeConfig, client, "001", "000", 10, HoodieJavaWriteClient::insert, false, true, 10, + metaClient.getInstantGenerator()); + } + metaClient = HoodieTableMetaClient.reload(metaClient); + + HoodieTable table = HoodieJavaTable.create(writeConfig, context, metaClient); + table.getHoodieView().loadAllPartitions(); + assertTrue(countDiskMapDirs(spillDir) > 0, + "Spilling the file-group view should create an on-disk BitCaskDiskMap directory"); + + // The fix: HoodieTable is AutoCloseable and close() releases the view manager -> view -> spillable + // map -> BitCaskDiskMap, which cleans the on-disk directory immediately. + table.close(); + assertEquals(0, countDiskMapDirs(spillDir), + "HoodieTable.close() must release the spillable view's on-disk maps"); + + // close() must be idempotent. + table.close(); + assertEquals(0, countDiskMapDirs(spillDir), + "Calling HoodieTable.close() twice must be safe"); + } + + private static int countDiskMapDirs(String spillDir) { + File[] dirs = new File(spillDir).listFiles((dir, name) -> name.startsWith("hudi-")); + return dirs == null ? 0 : dirs.length; + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java index 89687ef070579..0b574275fbefc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java @@ -225,14 +225,19 @@ protected void removeReplacedFileIdsAtInstants(Set instants) { } @Override - public void close() { + protected void closeResources() throws Exception { + // Close the ExternalSpillableMaps (which hold on-disk BitCaskDiskMaps / RocksDB handles, each with a + // JVM shutdown hook) before super.closeResources(), while still under the writeLock held by + // AbstractTableFileSystemView#close(). Draining them here -- before the view's map fields are nulled + // out and while the lock excludes concurrent readers -- guarantees the disk maps and their shutdown + // hooks are released instead of being orphaned. closeFileGroupsMapIfPresent(); closePendingClusteringMapIfPresent(); closePendingCompactionMapIfPresent(); closePendingLogCompactionMapIfPresent(); closeBootstrapFileMapIfPresent(); closeReplaceInstantsMapIfPresent(); - super.close(); + super.closeResources(); } private void closeReplaceInstantsMapIfPresent() {