Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,20 @@ public ColumnFamilyStore(Keyspace keyspace,
data.subscribe(StorageService.instance.sstablesTracker);
data.subscribe(SnapshotManager.instance);

// Tracked tables hold mutation journal segments alive while their unrepaired sstables reference them
// (CASSANDRA-21406). Subscribe before initial sstables load so the InitialSSTableAddedNotification
// populates the segment refcount on startup.
//
// TODO: this gates on replicationType().isTracked() at CFS init time. A keyspace that migrates from
// untracked to tracked at runtime (ALTER KEYSPACE) does not re-init its CFSes, so the subscription
// never happens for the live process. The refcount self-heals on the next restart via
// InitialSSTableAddedNotification, but during the live migration segments could be dropped while
// unrepaired tracked sstables still reference them. A runtime subscription hook on the migration
// transition (likely in MutationTrackingService.maybeUpdateKeyspaceShards on MIGRATE_TO/CREATE) is
// tracked as a follow-up to CASSANDRA-21406.
if (DatabaseDescriptor.getMutationTrackingEnabled() && metadata().replicationType().isTracked())
data.subscribe(MutationJournal.instance().segmentReferenceTracker());

Collection<SSTableReader> sstables = null;
// scan for sstables corresponding to this cf and load them
if (data.loadsstables)
Expand Down
5 changes: 0 additions & 5 deletions src/java/org/apache/cassandra/replication/CoordinatorLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -593,11 +593,6 @@ void collectRemotelyMissingMutations(Offsets localOffsets, IntArrayList remoteNo
}
}

void collectDurablyReconciledOffsets(Log2OffsetsMap.Mutable into)
{
into.add(reconciledPersistedOffsets);
}

boolean isDurablyReconciled(ShortMutationId id)
{
lock.readLock().lock();
Expand Down
44 changes: 14 additions & 30 deletions src/java/org/apache/cassandra/replication/MutationJournal.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import com.google.common.collect.ImmutableSet;

import org.agrona.collections.Long2LongHashMap;
import org.agrona.collections.Long2ObjectHashMap;
import org.jctools.maps.NonBlockingHashMapLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -102,6 +101,7 @@ public PendingClearReplay(ImmutableSet<Long> segments)

private final Journal<ShortMutationId, Mutation> journal;
private final Map<Long, SegmentStateTracker> segmentStateTrackers;
private final SegmentReferenceTracker segmentReferenceTracker;

// Static segments awaiting durable cleanup of their needsReplay=false metadata.
private final Set<Long> pendingClearReplay = ConcurrentHashMap.newKeySet();
Expand Down Expand Up @@ -189,6 +189,7 @@ protected void closeActiveSegmentAndOpenAsStatic(ActiveSegment<ShortMutationId,
});
}
};
segmentReferenceTracker = new SegmentReferenceTracker();
segmentStateTrackers = new NonBlockingHashMapLong<>();
}

Expand Down Expand Up @@ -441,12 +442,19 @@ public void replay(DeserializedRecordConsumer<ShortMutationId, Mutation> replayO
}

@VisibleForTesting
public int dropReconciledSegments(Log2OffsetsMap<?> reconciledOffsets)
int dropUnreferencedSegments()
{
return journal.dropStaticSegments((segment) -> {
StaticOffsetRanges ranges = (StaticOffsetRanges) segment.keyStats();
return ranges.isFullyCovered(reconciledOffsets) && !segment.metadata().needsReplay();
});
return journal.dropStaticSegments(segment -> !segment.metadata().needsReplay()
&& !segmentReferenceTracker.isReferenced(segment.id()));
}

/**
* Listener tracking how many unrepaired sstables of tracked tables reference each static segment.
* Subscribed by {@link org.apache.cassandra.db.ColumnFamilyStore} on init for every tracked CFS.
*/
public SegmentReferenceTracker segmentReferenceTracker()
{
return segmentReferenceTracker;
}

public void readAll(RecordConsumer<ShortMutationId> consumer)
Expand Down Expand Up @@ -802,30 +810,6 @@ static StaticOffsetRanges read(DataInputPlus in) throws IOException
Crc.validate(crc, in.readInt());
return new StaticOffsetRanges(ranges);
}

/**
* @return whether all keys in the segment are fully covered by the specified (durably reconciled) offsets map
*/
boolean isFullyCovered(Log2OffsetsMap<?> durablyReconciled)
{
Long2ObjectHashMap<Offsets> reconciledMap = ((Log2OffsetsMap<Offsets>) durablyReconciled).asMap();
for (Long2LongHashMap.EntryIterator iter = ranges.entrySet().iterator(); iter.hasNext();)
{
iter.next();

long logId = iter.getLongKey();
long range = iter.getLongValue();
int min = minOffset(range);
int max = maxOffset(range);

Offsets offsets = reconciledMap.get(logId);
if (offsets == null)
return false;
if (!offsets.containsRange(min, max))
return false;
}
return true;
}
}

static final class OffsetRangesFactory implements KeyStats.Factory<ShortMutationId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1057,19 +1057,7 @@ private void onNewLog(Shard shard, CoordinatorLog log)

private void truncateMutationJournal()
{
Log2OffsetsMap.Mutable reconciledOffsets = new Log2OffsetsMap.Mutable();
collectDurablyReconciledOffsets(reconciledOffsets);
MutationJournal.instance().dropReconciledSegments(reconciledOffsets);
}

/**
* Collect every log's durably reconciled offsets. Every mutation covered
* by these offsets can be compacted away by the journal, assuming that all
* relevant memtables had been flushed to disk.
*/
private void collectDurablyReconciledOffsets(Log2OffsetsMap.Mutable into)
{
forEachKeyspace(keyspace -> keyspace.collectDurablyReconciledOffsets(into));
MutationJournal.instance().dropUnreferencedSegments();
}

public SyncTasks alignToShardBoundaries(Keyspace keyspace, List<SyncTask> tasks)
Expand Down Expand Up @@ -1380,11 +1368,6 @@ void recordFullyReconciledOffsets(ReconciledKeyspaceOffsets keyspaceOffsets)
});
}

void collectDurablyReconciledOffsets(Log2OffsetsMap.Mutable into)
{
forEachShard(shard -> shard.collectDurablyReconciledOffsets(into));
}

void forEachShard(Consumer<Shard> consumer)
{
for (Shard shard : shards.values())
Expand Down
Loading