From b0656830a18d97bf89219076132b885896118516 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Mon, 25 May 2026 19:03:26 +0100 Subject: [PATCH] [FLINK-39753][state/rocksdb] Close ColumnFamilyOptions from getDescriptor() in Compactor ColumnFamilyHandle.getDescriptor() allocates a new native ColumnFamilyOptions on every call and does not close it, preventing the shared block cache from being freed. Wrap the call in try-with-resources so the options are closed after reading numLevels(). --- .../apache/flink/state/rocksdb/sstmerge/Compactor.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java index 2a4d7b26c7316..e6e0e65d46e54 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java @@ -19,6 +19,7 @@ package org.apache.flink.state.rocksdb.sstmerge; import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.CompactionJobInfo; import org.rocksdb.CompactionOptions; import org.rocksdb.RocksDB; @@ -51,7 +52,13 @@ public Compactor(CompactionTarget target, long targetOutputFileSize) { } void compact(ColumnFamilyHandle cfName, int level, List files) throws RocksDBException { - int outputLevel = Math.min(level + 1, cfName.getDescriptor().getOptions().numLevels() - 1); + // FLINK-39753: getDescriptor() allocates a new native ColumnFamilyOptions on every call. + // Closing it is required to release the native object and its reference to the shared + // block cache; leaking it keeps the LRUCache alive and grows native memory until OOM. + final int outputLevel; + try (ColumnFamilyOptions cfOptions = cfName.getDescriptor().getOptions()) { + outputLevel = Math.min(level + 1, cfOptions.numLevels() - 1); + } LOG.debug( "Manually compacting {} files from level {} to {}: {}", files.size(),