diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 162686724593..9dc3c046f1e5 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -538,6 +538,20 @@ public ColumnFamilyStore(Keyspace keyspace, data.subscribe(StorageService.instance.sstablesTracker); data.subscribe(SnapshotManager.instance); + // Tracked tables hold mutation journal segments alive while their unrepaired sstables reference them + // (CASSANDRA-21406). Subscribe before initial sstables load so the InitialSSTableAddedNotification + // populates the segment refcount on startup. + // + // TODO: this gates on replicationType().isTracked() at CFS init time. A keyspace that migrates from + // untracked to tracked at runtime (ALTER KEYSPACE) does not re-init its CFSes, so the subscription + // never happens for the live process. The refcount self-heals on the next restart via + // InitialSSTableAddedNotification, but during the live migration segments could be dropped while + // unrepaired tracked sstables still reference them. A runtime subscription hook on the migration + // transition (likely in MutationTrackingService.maybeUpdateKeyspaceShards on MIGRATE_TO/CREATE) is + // tracked as a follow-up to CASSANDRA-21406. + if (DatabaseDescriptor.getMutationTrackingEnabled() && metadata().replicationType().isTracked()) + data.subscribe(MutationJournal.instance().segmentReferenceTracker()); + Collection sstables = null; // scan for sstables corresponding to this cf and load them if (data.loadsstables) diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLog.java b/src/java/org/apache/cassandra/replication/CoordinatorLog.java index 317a3e6eea59..9956c4aea843 100644 --- a/src/java/org/apache/cassandra/replication/CoordinatorLog.java +++ b/src/java/org/apache/cassandra/replication/CoordinatorLog.java @@ -593,11 +593,6 @@ void collectRemotelyMissingMutations(Offsets localOffsets, IntArrayList remoteNo } } - void collectDurablyReconciledOffsets(Log2OffsetsMap.Mutable into) - { - into.add(reconciledPersistedOffsets); - } - boolean isDurablyReconciled(ShortMutationId id) { lock.readLock().lock(); diff --git a/src/java/org/apache/cassandra/replication/MutationJournal.java b/src/java/org/apache/cassandra/replication/MutationJournal.java index 1fa4909804af..e064ba08e8a6 100644 --- a/src/java/org/apache/cassandra/replication/MutationJournal.java +++ b/src/java/org/apache/cassandra/replication/MutationJournal.java @@ -36,7 +36,6 @@ import com.google.common.collect.ImmutableSet; import org.agrona.collections.Long2LongHashMap; -import org.agrona.collections.Long2ObjectHashMap; import org.jctools.maps.NonBlockingHashMapLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -102,6 +101,7 @@ public PendingClearReplay(ImmutableSet segments) private final Journal journal; private final Map segmentStateTrackers; + private final SegmentReferenceTracker segmentReferenceTracker; // Static segments awaiting durable cleanup of their needsReplay=false metadata. private final Set pendingClearReplay = ConcurrentHashMap.newKeySet(); @@ -189,6 +189,7 @@ protected void closeActiveSegmentAndOpenAsStatic(ActiveSegment(); } @@ -441,12 +442,19 @@ public void replay(DeserializedRecordConsumer replayO } @VisibleForTesting - public int dropReconciledSegments(Log2OffsetsMap reconciledOffsets) + int dropUnreferencedSegments() { - return journal.dropStaticSegments((segment) -> { - StaticOffsetRanges ranges = (StaticOffsetRanges) segment.keyStats(); - return ranges.isFullyCovered(reconciledOffsets) && !segment.metadata().needsReplay(); - }); + return journal.dropStaticSegments(segment -> !segment.metadata().needsReplay() + && !segmentReferenceTracker.isReferenced(segment.id())); + } + + /** + * Listener tracking how many unrepaired sstables of tracked tables reference each static segment. + * Subscribed by {@link org.apache.cassandra.db.ColumnFamilyStore} on init for every tracked CFS. + */ + public SegmentReferenceTracker segmentReferenceTracker() + { + return segmentReferenceTracker; } public void readAll(RecordConsumer consumer) @@ -802,30 +810,6 @@ static StaticOffsetRanges read(DataInputPlus in) throws IOException Crc.validate(crc, in.readInt()); return new StaticOffsetRanges(ranges); } - - /** - * @return whether all keys in the segment are fully covered by the specified (durably reconciled) offsets map - */ - boolean isFullyCovered(Log2OffsetsMap durablyReconciled) - { - Long2ObjectHashMap reconciledMap = ((Log2OffsetsMap) durablyReconciled).asMap(); - for (Long2LongHashMap.EntryIterator iter = ranges.entrySet().iterator(); iter.hasNext();) - { - iter.next(); - - long logId = iter.getLongKey(); - long range = iter.getLongValue(); - int min = minOffset(range); - int max = maxOffset(range); - - Offsets offsets = reconciledMap.get(logId); - if (offsets == null) - return false; - if (!offsets.containsRange(min, max)) - return false; - } - return true; - } } static final class OffsetRangesFactory implements KeyStats.Factory diff --git a/src/java/org/apache/cassandra/replication/MutationTrackingService.java b/src/java/org/apache/cassandra/replication/MutationTrackingService.java index db87a88f010a..2eff682d7d98 100644 --- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java +++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java @@ -1057,19 +1057,7 @@ private void onNewLog(Shard shard, CoordinatorLog log) private void truncateMutationJournal() { - Log2OffsetsMap.Mutable reconciledOffsets = new Log2OffsetsMap.Mutable(); - collectDurablyReconciledOffsets(reconciledOffsets); - MutationJournal.instance().dropReconciledSegments(reconciledOffsets); - } - - /** - * Collect every log's durably reconciled offsets. Every mutation covered - * by these offsets can be compacted away by the journal, assuming that all - * relevant memtables had been flushed to disk. - */ - private void collectDurablyReconciledOffsets(Log2OffsetsMap.Mutable into) - { - forEachKeyspace(keyspace -> keyspace.collectDurablyReconciledOffsets(into)); + MutationJournal.instance().dropUnreferencedSegments(); } public SyncTasks alignToShardBoundaries(Keyspace keyspace, List tasks) @@ -1380,11 +1368,6 @@ void recordFullyReconciledOffsets(ReconciledKeyspaceOffsets keyspaceOffsets) }); } - void collectDurablyReconciledOffsets(Log2OffsetsMap.Mutable into) - { - forEachShard(shard -> shard.collectDurablyReconciledOffsets(into)); - } - void forEachShard(Consumer consumer) { for (Shard shard : shards.values()) diff --git a/src/java/org/apache/cassandra/replication/SegmentReferenceTracker.java b/src/java/org/apache/cassandra/replication/SegmentReferenceTracker.java new file mode 100644 index 000000000000..df6a19c20303 --- /dev/null +++ b/src/java/org/apache/cassandra/replication/SegmentReferenceTracker.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.replication; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.LongConsumer; + +import com.google.common.annotations.VisibleForTesting; + +import org.agrona.collections.Long2LongHashMap; + +import accord.utils.Invariants; + +import org.apache.cassandra.db.commitlog.CommitLogPosition; +import org.apache.cassandra.db.commitlog.IntervalSet; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.notifications.INotification; +import org.apache.cassandra.notifications.INotificationConsumer; +import org.apache.cassandra.notifications.InitialSSTableAddedNotification; +import org.apache.cassandra.notifications.SSTableAddedNotification; +import org.apache.cassandra.notifications.SSTableListChangedNotification; +import org.apache.cassandra.notifications.SSTableRepairStatusChanged; + +/** + * Tracks how many unrepaired sstables of tracked tables reference each mutation journal segment. + * + *

A sstable references a segment iff the sstable's {@code StatsMetadata.commitLogIntervals} — + * which for tracked tables stores mutation journal positions, not commit log positions — covers + * any position within that segment. The granularity is coarse: the referenced range is the union + * of the memtable lower/upper bounds for the sstable's lineage. This matches existing commit log + * retention semantics; it can include a segment with no actual data from this sstable, which only + * defers (never incorrectly enables) segment dropping. + * + *

Used by {@link MutationJournal} to decide when a static segment may be dropped: a segment may be + * dropped only once it has no unrepaired references and {@code !needsReplay}. This guarantees the + * journal can rebuild any unrepaired sstable from the journal if minority writes need to be filtered + * out (CASSANDRA-21407). + */ +public class SegmentReferenceTracker implements INotificationConsumer +{ + private static final long NO_REF = 0L; + + // Guards both refsBySegment and trackedSstables to keep transitions atomic across notifications. + private final ReentrantLock lock = new ReentrantLock(); + + private final Long2LongHashMap refsBySegment = new Long2LongHashMap(NO_REF); + + // Sstables we currently hold refs for (i.e. those that were unrepaired at the time we observed them). + // Required so SSTableRepairStatusChanged can transition an sstable in/out without the notification + // having to carry the previous repair state. + private final Set trackedSstables = new HashSet<>(); + + @Override + public void handleNotification(INotification notification, Object sender) + { + if (notification instanceof SSTableAddedNotification) + onAdded(((SSTableAddedNotification) notification).added); + else if (notification instanceof InitialSSTableAddedNotification) + onAdded(((InitialSSTableAddedNotification) notification).added); + else if (notification instanceof SSTableListChangedNotification) + onListChanged((SSTableListChangedNotification) notification); + else if (notification instanceof SSTableRepairStatusChanged) + onRepairStatusChanged(((SSTableRepairStatusChanged) notification).sstables); + + // Other lifecycle notifications are deliberately not handled because the actual sstable-lifecycle + // effect is delivered by SSTableListChangedNotification: + // - SSTableDeletingNotification: fires when the on-disk files are scheduled for deletion, after + // the sstable has already left the live view via SSTableListChangedNotification. Handling it + // here would double-decrement. + // - TruncationNotification: truncate calls notifyTruncated for higher-level concerns (snapshots, + // truncatedAt persistence), then discardSSTables -> Tracker.dropSSTables -> notifySSTablesChanged + // which fires SSTableListChangedNotification(removed, empty) covering the refcount release. + // - TableDroppedNotification: drop table fires notifyDropped for MBean/snapshot cleanup, then + // CFS.invalidate(..., dropData=true) -> data.dropSSTables() which again fires + // SSTableListChangedNotification(removed, empty) covering the refcount release. + } + + /** + * Whether any unrepaired sstable references the given segment id. + */ + boolean isReferenced(long segmentId) + { + lock.lock(); + try + { + return refsBySegment.get(segmentId) > NO_REF; + } + finally + { + lock.unlock(); + } + } + + private void onAdded(Iterable added) + { + lock.lock(); + try + { + for (SSTableReader sstable : added) + acquireIfUnrepaired(sstable); + } + finally + { + lock.unlock(); + } + } + + private void onListChanged(SSTableListChangedNotification notification) + { + lock.lock(); + try + { + // Process additions before removals so refcounts are never observed briefly empty + // between a compaction's input drop and output add when both span the same segment. + for (SSTableReader sstable : notification.added) + acquireIfUnrepaired(sstable); + for (SSTableReader sstable : notification.removed) + releaseIfTracked(sstable); + } + finally + { + lock.unlock(); + } + } + + private void onRepairStatusChanged(Collection changed) + { + lock.lock(); + try + { + for (SSTableReader sstable : changed) + { + if (sstable.isRepaired()) + releaseIfTracked(sstable); + else + acquireIfUnrepaired(sstable); + } + } + finally + { + lock.unlock(); + } + } + + private void acquireIfUnrepaired(SSTableReader sstable) + { + if (!sstable.isRepaired() && trackedSstables.add(sstable)) + forEachSegment(sstable, this::incrementRef); + } + + private void releaseIfTracked(SSTableReader sstable) + { + if (trackedSstables.remove(sstable)) + forEachSegment(sstable, this::decrementRef); + } + + private void incrementRef(long segmentId) + { + refsBySegment.compute(segmentId, (k, currentValue) -> currentValue + 1); + } + + private void decrementRef(long segmentId) + { + refsBySegment.compute(segmentId, (k, prev) -> { + Invariants.require(prev > NO_REF, "Refcount underflow for segment %d", segmentId); + return prev - 1; + }); + } + + private static void forEachSegment(SSTableReader sstable, LongConsumer consumer) + { + IntervalSet intervals = sstable.getSSTableMetadata().commitLogIntervals; + if (intervals.isEmpty()) + return; + + // IntervalSet guarantees starts and ends are returned in matching order. + Iterator startIt = intervals.starts().iterator(); + Iterator endIt = intervals.ends().iterator(); + while (startIt.hasNext()) + { + CommitLogPosition start = startIt.next(); + CommitLogPosition end = endIt.next(); + for (long s = start.segmentId; s <= end.segmentId; s++) + consumer.accept(s); + } + } + + @VisibleForTesting + long referenceCountForTesting(long segmentId) + { + lock.lock(); + try + { + return refsBySegment.get(segmentId); + } + finally + { + lock.unlock(); + } + } + + @VisibleForTesting + int trackedSstableCountForTesting() + { + lock.lock(); + try + { + return trackedSstables.size(); + } + finally + { + lock.unlock(); + } + } + + @VisibleForTesting + void incrementRefForTesting(long segmentId) + { + lock.lock(); + try + { + incrementRef(segmentId); + } + finally + { + lock.unlock(); + } + } + + @VisibleForTesting + void decrementRefForTesting(long segmentId) + { + lock.lock(); + try + { + decrementRef(segmentId); + } + finally + { + lock.unlock(); + } + } +} diff --git a/src/java/org/apache/cassandra/replication/Shard.java b/src/java/org/apache/cassandra/replication/Shard.java index dd4e34902376..0b63544e562e 100644 --- a/src/java/org/apache/cassandra/replication/Shard.java +++ b/src/java/org/apache/cassandra/replication/Shard.java @@ -315,11 +315,6 @@ List collectLocallyMissingOffsets() return result; } - void collectDurablyReconciledOffsets(Log2OffsetsMap.Mutable into) - { - logs.values().forEach(log -> log.collectDurablyReconciledOffsets(into)); - } - private CoordinatorLog getOrCreate(Mutation mutation) { return getOrCreate(mutation.id()); diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationJournalSegmentRefcountTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationJournalSegmentRefcountTest.java new file mode 100644 index 000000000000..246c2ecbb142 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationJournalSegmentRefcountTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.tracking; + +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.net.Verb; +import org.apache.cassandra.replication.MutationJournal; +import org.apache.cassandra.replication.MutationTrackingService; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * End-to-end coverage of CASSANDRA-21406: a static journal segment must be retained while any + * unrepaired sstable references it, and must be droppable once every referencing sstable has + * been promoted to repaired (e.g. by compaction once mutations are durably reconciled). + */ +public class MutationJournalSegmentRefcountTest extends TestBaseImpl +{ + private static final String CREATE_KEYSPACE = + "CREATE KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type = 'tracked'"; + + private static final String CREATE_TABLE = "CREATE TABLE %s.tbl (pk int PRIMARY KEY, val text)"; + + @Test(timeout = 120_000) + public void testSegmentRetainedUntilSSTableRepaired() throws Throwable + { + try (Cluster cluster = Cluster.build(3) + .withConfig(cfg -> cfg.with(Feature.NETWORK).with(Feature.GOSSIP)) + .start()) + { + cluster.schemaChange(withKeyspace(CREATE_KEYSPACE)); + cluster.schemaChange(String.format(CREATE_TABLE, KEYSPACE)); + + // Disable autocompaction so flushed sstables stay until we explicitly compact. + cluster.forEach(i -> i.nodetoolResult("disableautocompaction", KEYSPACE, "tbl").asserts().success()); + + // Block offset broadcasts: each node only sees its own witnesses, so isDurablyReconciled is + // false everywhere and SSTableWriter cannot auto-mark the flushed sstables as repaired. + cluster.filters().verbs(Verb.MT_BROADCAST_LOG_OFFSETS.id).drop(); + + for (int i = 0; i < 50; i++) + { + cluster.coordinator(1) + .execute(withKeyspace("INSERT INTO %s.tbl (pk, val) VALUES (?, ?)"), + ConsistencyLevel.QUORUM, i, "v" + i); + } + + // Flush and force the active journal segment to roll so we have a static segment to inspect. + cluster.forEach(i -> i.nodetoolResult("flush", KEYSPACE).asserts().success()); + cluster.forEach(i -> i.runOnInstance(() -> MutationJournal.instance().closeCurrentSegmentForTestingIfNonEmpty())); + + // Confirm there is a static segment that the new refcount is keeping alive, then try to drop: + // the dropping pass must be a no-op because every flushed sstable is unrepaired. + cluster.forEach(i -> i.runOnInstance(() -> { + int before = MutationJournal.instance().countStaticSegmentsForTesting(); + assertTrue("Expected at least one static segment after flush+segment close, got " + before, before > 0); + MutationTrackingService.instance().persistLogStateForTesting(); + int after = MutationJournal.instance().countStaticSegmentsForTesting(); + assertEquals("Static segments must not be dropped while unrepaired sstables reference them", + before, after); + })); + + // Restore broadcast, exchange witnesses, and persist so isDurablyReconciled is now true everywhere. + cluster.filters().reset(); + cluster.forEach(i -> i.runOnInstance(() -> MutationTrackingService.instance().broadcastOffsetsForTesting())); + cluster.forEach(i -> i.runOnInstance(() -> MutationTrackingService.instance().persistLogStateForTesting())); + + // Major-compact the table. Compaction rewrites the sstable through SSTableWriter.finalizeMetadata, + // which detects that all mutations are durably reconciled and stamps repairedAt on the output. + // SSTableListChangedNotification then releases refs from the (unrepaired) inputs without acquiring + // any from the (repaired) output -> refcount drops to zero. + cluster.forEach(i -> i.nodetoolResult("compact", KEYSPACE, "tbl").asserts().success()); + + // Now the persister can drop the static segments. + cluster.forEach(i -> i.runOnInstance(() -> MutationTrackingService.instance().persistLogStateForTesting())); + cluster.forEach(i -> i.runOnInstance(() -> { + int remaining = MutationJournal.instance().countStaticSegmentsForTesting(); + assertEquals("Static segments must be dropped once their sstables are promoted to repaired", + 0, remaining); + })); + } + } +} diff --git a/test/unit/org/apache/cassandra/replication/MutationJournalTest.java b/test/unit/org/apache/cassandra/replication/MutationJournalTest.java index 54f69d34f04e..61385d5ddc60 100644 --- a/test/unit/org/apache/cassandra/replication/MutationJournalTest.java +++ b/test/unit/org/apache/cassandra/replication/MutationJournalTest.java @@ -181,7 +181,7 @@ public void testStaticOffsetRanges() } @Test - public void testDropReconcileSegments() + public void testDropUnreferencedSegments() { ShortMutationId id1 = id(100L, 0); ShortMutationId id2 = id(100L, 1); @@ -193,26 +193,23 @@ public void testDropReconcileSegments() Mutation mutation3 = mutation("key3", "ck3", "value3"); Mutation mutation4 = mutation("key4", "ck4", "value4"); + SegmentReferenceTracker refs = journal.segmentReferenceTracker(); + // write two mutations to the first segment and flush it to make static + long firstSegment = journal.getCurrentPosition().segmentId; journal.write(id1, mutation1); journal.write(id2, mutation2); journal.closeCurrentSegmentForTestingIfNonEmpty(); // write two mutations to the second segment and flush it to make static + long secondSegment = journal.getCurrentPosition().segmentId; journal.write(id3, mutation3); journal.write(id4, mutation4); journal.closeCurrentSegmentForTestingIfNonEmpty(); { - // call dropReconciledSegments() with a log2offsets map that covers both segments fully - // *BUT* with the segments still marked as needing replay nothing should be dropped - Log2OffsetsMap.Immutable.Builder builder = new Log2OffsetsMap.Immutable.Builder(); - builder.add(id1); - builder.add(id2); - builder.add(id3); - builder.add(id4); - assertEquals(0, journal.dropReconciledSegments(builder.build())); - // confirm that no static segments have been dropped + // Both segments still need replay; even with no sstable references they must be retained. + assertEquals(0, journal.dropUnreferencedSegments()); assertEquals(2, journal.countStaticSegmentsForTesting()); } @@ -220,33 +217,17 @@ public void testDropReconcileSegments() journal.clearNeedsReplayForTesting(); { - // call dropReconciledSegments() with a log2offsets map that doesn't cover any segments fully - Log2OffsetsMap.Immutable.Builder builder = new Log2OffsetsMap.Immutable.Builder(); - builder.add(id1); - assertEquals(0, journal.dropReconciledSegments(builder.build())); - // confirm that no static segments got dropped - assertEquals(2, journal.countStaticSegmentsForTesting()); - } - - { - // call dropReconciledSegments() with a log2offsets map that covers only the first segment fully - Log2OffsetsMap.Immutable.Builder builder = new Log2OffsetsMap.Immutable.Builder(); - builder.add(id1); - builder.add(id2); - assertEquals(1, journal.dropReconciledSegments(builder.build())); - // confirm that only one static segment got dropped + // Pretend an unrepaired sstable references the first segment. + refs.incrementRefForTesting(firstSegment); + assertEquals(1, journal.dropUnreferencedSegments()); + // Only the second (unreferenced) segment got dropped. assertEquals(1, journal.countStaticSegmentsForTesting()); } { - // call dropReconciledSegments() with a log2offsets map that covers both segments fully - Log2OffsetsMap.Immutable.Builder builder = new Log2OffsetsMap.Immutable.Builder(); - builder.add(id1); - builder.add(id2); - builder.add(id3); - builder.add(id4); - assertEquals(1, journal.dropReconciledSegments(builder.build())); - // confirm that all static segments have now been dropped + // Releasing the last reference allows the first segment to drop too. + refs.decrementRefForTesting(firstSegment); + assertEquals(1, journal.dropUnreferencedSegments()); assertEquals(0, journal.countStaticSegmentsForTesting()); } } diff --git a/test/unit/org/apache/cassandra/replication/SegmentReferenceTrackerTest.java b/test/unit/org/apache/cassandra/replication/SegmentReferenceTrackerTest.java new file mode 100644 index 000000000000..bfcd9065dfdb --- /dev/null +++ b/test/unit/org/apache/cassandra/replication/SegmentReferenceTrackerTest.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.replication; + +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BooleanSupplier; + +import org.junit.Test; +import org.mockito.Mockito; + +import org.apache.cassandra.db.Slice; +import org.apache.cassandra.db.commitlog.CommitLogPosition; +import org.apache.cassandra.db.commitlog.IntervalSet; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.notifications.InitialSSTableAddedNotification; +import org.apache.cassandra.notifications.SSTableAddedNotification; +import org.apache.cassandra.notifications.SSTableListChangedNotification; +import org.apache.cassandra.notifications.SSTableRepairStatusChanged; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.EstimatedHistogram; +import org.apache.cassandra.utils.streamhist.TombstoneHistogram; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class SegmentReferenceTrackerTest +{ + @Test + public void testInitialAddRefsEverySegmentInTheInterval() + { + int startSegment = 5; + int endSegment = 7; + SegmentReferenceTracker tracker = new SegmentReferenceTracker(); + SSTableReader sstable = unrepaired(intervals(startSegment, 0, endSegment, 100)); + + tracker.handleNotification(new InitialSSTableAddedNotification(List.of(sstable)), null); + + // Coarse range covers segments 5..7 inclusive. + for (long segment = startSegment; segment <= endSegment; segment++) + assertEquals("segment " + segment, 1L, tracker.referenceCountForTesting(segment)); + assertFalse(tracker.isReferenced(4)); + assertFalse(tracker.isReferenced(8)); + assertEquals(1, tracker.trackedSstableCountForTesting()); + } + + @Test + public void testAddedRepairedSSTableHoldsNoRefs() + { + int startSegment = 5; + int endSegment = 7; + SegmentReferenceTracker tracker = new SegmentReferenceTracker(); + SSTableReader sstable = repaired(intervals(startSegment, 0, endSegment, 100)); + + tracker.handleNotification(new SSTableAddedNotification(List.of(sstable), null), null); + + for (long segment = startSegment; segment <= endSegment; segment++) + assertFalse("segment " + segment, tracker.isReferenced(segment)); + assertEquals(0, tracker.trackedSstableCountForTesting()); + } + + @Test + public void testAddIsIdempotent() + { + int startSegment = 5; + int endSegment = 7; + SegmentReferenceTracker tracker = new SegmentReferenceTracker(); + SSTableReader sstable = unrepaired(intervals(startSegment, 0, endSegment, 100)); + + tracker.handleNotification(new SSTableAddedNotification(List.of(sstable), null), null); + tracker.handleNotification(new SSTableAddedNotification(List.of(sstable), null), null); + + assertEquals(1L, tracker.referenceCountForTesting(5)); + assertEquals(1, tracker.trackedSstableCountForTesting()); + } + + @Test + public void testMultipleDisjointIntervalsRefEachContainedSegment() + { + SegmentReferenceTracker tracker = new SegmentReferenceTracker(); + // Two disjoint intervals: [3:0..3:100] and [9:0..10:50]. + IntervalSet.Builder builder = new IntervalSet.Builder<>(); + builder.add(new CommitLogPosition(3, 0), new CommitLogPosition(3, 100)); + builder.add(new CommitLogPosition(9, 0), new CommitLogPosition(10, 50)); + SSTableReader sstable = unrepaired(builder.build()); + + tracker.handleNotification(new InitialSSTableAddedNotification(List.of(sstable)), null); + + assertEquals(1L, tracker.referenceCountForTesting(3)); + assertEquals(1L, tracker.referenceCountForTesting(9)); + assertEquals(1L, tracker.referenceCountForTesting(10)); + // Gap between disjoint intervals is not referenced. + for (long gap = 4; gap <= 8; gap++) + assertFalse("segment " + gap, tracker.isReferenced(gap)); + } + + @Test + public void testEmptyIntervalsHoldNoRefs() + { + SegmentReferenceTracker tracker = new SegmentReferenceTracker(); + SSTableReader sstable = unrepaired(IntervalSet.empty()); + + tracker.handleNotification(new SSTableAddedNotification(List.of(sstable), null), null); + + // Tracked sstable but no segments to ref. + assertEquals(1, tracker.trackedSstableCountForTesting()); + assertFalse(tracker.isReferenced(0)); + } + + @Test + public void testCompactionPreservesRefsWhenInputAndOutputOverlapSameSegment() + { + SegmentReferenceTracker tracker = new SegmentReferenceTracker(); + SSTableReader input = unrepaired(intervals(5, 0, 5, 100)); + SSTableReader output = unrepaired(intervals(5, 0, 5, 200)); + + tracker.handleNotification(new SSTableAddedNotification(List.of(input), null), null); + assertEquals(1L, tracker.referenceCountForTesting(5)); + + // Compaction emits the SSTableListChangedNotification with added + removed atomically. + tracker.handleNotification( + new SSTableListChangedNotification(List.of(output), + List.of(input), + OperationType.COMPACTION), + null); + + // Net: still one unrepaired sstable referencing segment 5. + assertEquals(1L, tracker.referenceCountForTesting(5)); + assertEquals(1, tracker.trackedSstableCountForTesting()); + } + + @Test + public void testCompactionToRepairedOutputReleasesRefs() + { + SegmentReferenceTracker tracker = new SegmentReferenceTracker(); + SSTableReader input = unrepaired(intervals(5, 0, 5, 100)); + SSTableReader output = repaired(intervals(5, 0, 5, 200)); + + tracker.handleNotification(new SSTableAddedNotification(List.of(input), null), null); + assertEquals(1L, tracker.referenceCountForTesting(5)); + + tracker.handleNotification( + new SSTableListChangedNotification(List.of(output), + List.of(input), + OperationType.COMPACTION), + null); + + assertFalse(tracker.isReferenced(5)); + assertEquals(0, tracker.trackedSstableCountForTesting()); + } + + @Test + public void testRepairPromotionReleasesRefs() + { + SegmentReferenceTracker tracker = new SegmentReferenceTracker(); + AtomicReference repaired = new AtomicReference<>(false); + SSTableReader sstable = sstableWithRepairSupplier(intervals(5, 0, 7, 0), repaired::get); + + tracker.handleNotification(new SSTableAddedNotification(List.of(sstable), null), null); + for (long segment = 5; segment <= 7; segment++) + assertEquals(1L, tracker.referenceCountForTesting(segment)); + + // Promote to repaired; repair-status-changed delivers the transition. + repaired.set(true); + tracker.handleNotification(new SSTableRepairStatusChanged(List.of(sstable)), null); + + for (long segment = 5; segment <= 7; segment++) + assertFalse("segment " + segment, tracker.isReferenced(segment)); + assertEquals(0, tracker.trackedSstableCountForTesting()); + } + + @Test + public void testRepairStatusFlippingBackToUnrepairedReAcquires() + { + SegmentReferenceTracker tracker = new SegmentReferenceTracker(); + AtomicReference repaired = new AtomicReference<>(true); + SSTableReader sstable = sstableWithRepairSupplier(intervals(5, 0, 5, 100), repaired::get); + + // Starts repaired -> add is a no-op. + tracker.handleNotification(new SSTableAddedNotification(List.of(sstable), null), null); + assertEquals(0, tracker.trackedSstableCountForTesting()); + + // Flip back to unrepaired (e.g. failed repair session) and deliver repair-status-changed. + repaired.set(false); + tracker.handleNotification(new SSTableRepairStatusChanged(List.of(sstable)), null); + + assertEquals(1L, tracker.referenceCountForTesting(5)); + assertEquals(1, tracker.trackedSstableCountForTesting()); + } + + @Test + public void testMultipleSstablesAccumulateRefsOnSharedSegments() + { + SegmentReferenceTracker tracker = new SegmentReferenceTracker(); + SSTableReader a = unrepaired(intervals(5, 0, 6, 0)); + SSTableReader b = unrepaired(intervals(6, 0, 7, 0)); + + tracker.handleNotification(new SSTableAddedNotification(List.of(a, b), null), null); + + assertEquals(1L, tracker.referenceCountForTesting(5)); + assertEquals(2L, tracker.referenceCountForTesting(6)); + assertEquals(1L, tracker.referenceCountForTesting(7)); + + // Drop one; the shared segment still has a holder. + tracker.handleNotification( + new SSTableListChangedNotification(List.of(), + List.of(a), + OperationType.COMPACTION), + null); + + assertFalse(tracker.isReferenced(5)); + assertEquals(1L, tracker.referenceCountForTesting(6)); + assertEquals(1L, tracker.referenceCountForTesting(7)); + } + + @Test + public void testReleaseOfUntrackedSstableIsNoOp() + { + SegmentReferenceTracker tracker = new SegmentReferenceTracker(); + SSTableReader sstable = unrepaired(intervals(5, 0, 5, 100)); + + assertFalse(tracker.isReferenced(5)); + + // Never added -- removal must not underflow. + tracker.handleNotification( + new SSTableListChangedNotification(List.of(), + List.of(sstable), + OperationType.COMPACTION), + null); + + assertFalse(tracker.isReferenced(5)); + } + + // -- helpers --------------------------------------------------------- + + private static IntervalSet intervals(long startSegment, int startPosition, long endSegment, int endPosition) + { + assertTrue("startSegment " + startSegment + " is less than or equal to endSegment " + endSegment, startSegment <= endSegment); + assertTrue("startPosition " + startPosition + " is less than or equal to endPosition " + endPosition, startPosition <= endPosition); + return new IntervalSet<>(new CommitLogPosition(startSegment, startPosition), + new CommitLogPosition(endSegment, endPosition)); + } + + private static SSTableReader unrepaired(IntervalSet intervals) + { + return stub(intervals, false); + } + + private static SSTableReader repaired(IntervalSet intervals) + { + return stub(intervals, true); + } + + private static SSTableReader stub(IntervalSet intervals, boolean isRepaired) + { + return sstableWithRepairSupplier(intervals, () -> isRepaired); + } + + private static SSTableReader sstableWithRepairSupplier(IntervalSet intervals, + BooleanSupplier isRepairedSupplier) + { + SSTableReader reader = Mockito.mock(SSTableReader.class); + Mockito.when(reader.isRepaired()).thenAnswer(ref -> isRepairedSupplier.getAsBoolean()); + Mockito.when(reader.getSSTableMetadata()).thenReturn(stats(intervals)); + return reader; + } + + private static StatsMetadata stats(IntervalSet intervals) + { + return new StatsMetadata(new EstimatedHistogram(155), // estimatedPartitionSize + new EstimatedHistogram(118), // estimatedCellPerPartitionCount + intervals, // commitLogIntervals + 0L, // minTimestamp + 0L, // maxTimestamp + Cell.NO_DELETION_TIME, // minLocalDeletionTime + Cell.NO_DELETION_TIME, // maxLocalDeletionTime + Cell.NO_TTL, // minTTL + Cell.NO_TTL, // maxTTL + -1.0, // compressionRatio + TombstoneHistogram.createDefault(), + 0, // sstableLevel + List.of(), // clusteringTypes + Slice.ALL, // coveredClustering + false, // hasLegacyCounterShards + ActiveRepairService.UNREPAIRED_SSTABLE, + 0L, // totalColumnsSet + 0L, // totalRows + Double.NaN, // tokenSpaceCoverage + null, // originatingHostId + ActiveRepairService.NO_PENDING_REPAIR, + false, // hasPartitionLevelDeletions + ImmutableCoordinatorLogOffsets.NONE, + ByteBufferUtil.EMPTY_BYTE_BUFFER, + ByteBufferUtil.EMPTY_BYTE_BUFFER); + } +}