Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -861,8 +861,15 @@ public ResultMessage executeInternalWithoutCondition(QueryState queryState, Quer
String keyspaceName = mutation.getKeyspaceName();
Token token = mutation.key().getToken();
MutationId id = MutationTrackingService.instance().nextMutationId(keyspaceName, token);
mutation = mutation.withMutationId(id);
mutation.apply();
try
{
mutation = mutation.withMutationId(id);
mutation.apply();
}
finally
{
MutationTrackingService.instance().completeLocalWrite(id);
}
}

for (IMutation mutation : routed.untrackedMutations)
Expand Down
1 change: 1 addition & 0 deletions src/java/org/apache/cassandra/db/CounterMutation.java
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ public Mutation applyCounterMutation(MutationId mutationId) throws WriteTimeoutE
}
}

@Override
public void apply()
{
applyCounterMutation();
Expand Down
8 changes: 6 additions & 2 deletions src/java/org/apache/cassandra/db/SystemKeyspace.java
Original file line number Diff line number Diff line change
Expand Up @@ -564,8 +564,11 @@ private SystemKeyspace()
+ "keyspace_name text,"
+ "range_start text,"
+ "range_end text,"
+ "since_epoch bigint,"
+ "participants frozen<set<int>>,"
+ "PRIMARY KEY ((keyspace_name, range_start, range_end)))")
+ "sealed_participants frozen<set<int>>,"
+ "state text,"
+ "PRIMARY KEY ((keyspace_name, range_start, range_end, since_epoch)))")
.build();

private static final TableMetadata CoordinatorLogs =
Expand All @@ -575,12 +578,13 @@ private SystemKeyspace()
+ "keyspace_name text,"
+ "range_start text,"
+ "range_end text,"
+ "since_epoch bigint,"
+ "host_id int,"
+ "host_log_id int,"
+ "participants frozen<set<int>>,"
+ "witnessed_offsets map<int, frozen<list<int>>>,"
+ "persisted_offsets map<int, frozen<list<int>>>,"
+ "PRIMARY KEY ((keyspace_name, range_start, range_end), host_id, host_log_id))")
+ "PRIMARY KEY ((keyspace_name, range_start, range_end, since_epoch), host_id, host_log_id))")
.build();

@Deprecated(since = "4.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;

import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
Expand All @@ -45,9 +43,6 @@
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.replication.ImmutableCoordinatorLogOffsets;
import org.apache.cassandra.replication.Offsets;
import org.apache.cassandra.replication.ReconciledKeyspaceOffsets;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.streaming.IncomingStream;
import org.apache.cassandra.streaming.OutgoingStream;
Expand Down Expand Up @@ -92,34 +87,8 @@ public StreamReceiver createStreamReceiver(StreamSession session, List<Range<Tok
return new CassandraStreamReceiver(cfs, session, ranges, totalStreams);
}

public Predicate<SSTableReader> getSSTablePredicateForKeyspaceRanges(ReconciledKeyspaceOffsets reconciledKeyspaceOffsets)
{
if (reconciledKeyspaceOffsets == null)
return sstable -> true;

return sstable -> {
if (sstable.isRepaired())
return false;

ImmutableCoordinatorLogOffsets sstableOffsets = sstable.getSSTableMetadata().coordinatorLogOffsets;

// if it's not repaired and there are no offsets, it was probably written before the table was using
// mutation tracking and therefore should be considered unreconciled
if (sstableOffsets.isEmpty())
return true;

for (Map.Entry<Long, Offsets.Immutable> entry : sstableOffsets.entries())
{
if (!reconciledKeyspaceOffsets.isFullyReconciled(entry.getKey(), entry.getValue()))
return true;
}

return false;
};
}

@Override
public Collection<OutgoingStream> createOutgoingStreams(StreamSession session, RangesAtEndpoint replicas, TimeUUID pendingRepair, PreviewKind previewKind, ReconciledKeyspaceOffsets reconciledKeyspaceOffsets)
public Collection<OutgoingStream> createOutgoingStreams(StreamSession session, RangesAtEndpoint replicas, TimeUUID pendingRepair, PreviewKind previewKind)
{
Refs<SSTableReader> refs = new Refs<>();
try
Expand All @@ -131,15 +100,7 @@ public Collection<OutgoingStream> createOutgoingStreams(StreamSession session, R
Set<SSTableReader> sstables = Sets.newHashSet();
SSTableIntervalTree intervalTree = buildSSTableIntervalTree(ImmutableList.copyOf(view.select(SSTableSet.CANONICAL)));
Predicate<SSTableReader> predicate;
// reconciledKeyspaceOffsets are only included when mutation logs are streamed, since we include logs
// for all unreconciled mutations, and SSTables for all reconciled mutations
if (reconciledKeyspaceOffsets != null)
{
Preconditions.checkArgument(previewKind == PreviewKind.NONE);
Preconditions.checkArgument(pendingRepair == ActiveRepairService.NO_PENDING_REPAIR);
predicate = getSSTablePredicateForKeyspaceRanges(reconciledKeyspaceOffsets);
}
else if (previewKind.isPreview())
if (previewKind.isPreview())
{
predicate = previewKind.predicate();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.db.virtual;

import java.util.Collection;
Expand All @@ -29,6 +28,7 @@
import org.apache.cassandra.db.marshal.BooleanType;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.db.marshal.SetType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.dht.LocalPartitioner;
import org.apache.cassandra.journal.ActiveSegment;
Expand Down Expand Up @@ -66,7 +66,7 @@ public static final class MutationJournalTable extends AbstractVirtualTable
private static final String FSYNCED_TO = "fsynced_to";
private static final String NEEDS_REPLAY = "needs_replay";
private static final String FILE_PATH = "file_path";

MutationJournalTable(String keyspace)
{
super(TableMetadata.builder(keyspace, MUTATION_JOURNAL)
Expand All @@ -83,12 +83,12 @@ public static final class MutationJournalTable extends AbstractVirtualTable
.addRegularColumn(FILE_PATH, UTF8Type.instance)
.build());
}

@Override
public DataSet data()
{
SimpleDataSet result = new SimpleDataSet(metadata());

for (Segment<ShortMutationId, Mutation> segment : MutationJournal.instance().getAllSegments())
{
result.row(segment.id())
Expand All @@ -100,39 +100,48 @@ public DataSet data()
.column(NEEDS_REPLAY, segment.metadata().needsReplay())
.column(FILE_PATH, segment.filePath());
}

return result;
}
}

// TODO (expected): fix the types
// TODO (expected): fix perf WTFs
// TODO (expected): split out shards and coordinator logs tables
public static class MutationTrackingShardsTable extends AbstractVirtualTable
{
private static final String KEYSPACE = "keyspace";
private static final String LOG_ID = "log_id";
private static final String SINCE_EPOCH = "since_epoch";
private static final String RANGE_START = "range_start";
private static final String RANGE_END = "range_end";
private static final String LOCAL_NODE_ID = "local_node_id";
private static final String PARTICIPANTS = "participants";
private static final String WITNESSED_OFFSETS = "witnessed_offsets";
private static final String RECONCILED_OFFSETS = "reconciled_offsets";
private static final String PERSISTED_OFFSETS = "persisted_offsets";

MutationTrackingShardsTable(String keyspace) {

private static final SetType<Integer> FROZEN_INT_SET = SetType.getInstance(Int32Type.instance, false);

MutationTrackingShardsTable(String keyspace)
{
super(TableMetadata.builder(keyspace, MUTATION_TRACKING_SHARDS)
.comment("mutation tracking shards and their offset information")
.kind(TableMetadata.Kind.VIRTUAL).partitioner(new LocalPartitioner(UTF8Type.instance))
.addPartitionKeyColumn(KEYSPACE, UTF8Type.instance)
.addClusteringColumn(LOG_ID, UTF8Type.instance)
.addClusteringColumn(SINCE_EPOCH, LongType.instance)
.addClusteringColumn(RANGE_START, UTF8Type.instance)
.addClusteringColumn(RANGE_END, UTF8Type.instance)
.addRegularColumn(LOCAL_NODE_ID, Int32Type.instance)
.addRegularColumn(PARTICIPANTS, UTF8Type.instance)
.addRegularColumn(PARTICIPANTS, FROZEN_INT_SET)
// TODO (expected): change offsets columns to structured types
.addRegularColumn(WITNESSED_OFFSETS, UTF8Type.instance)
.addRegularColumn(RECONCILED_OFFSETS, UTF8Type.instance)
.addRegularColumn(PERSISTED_OFFSETS, UTF8Type.instance)
.build());
}

private void addShardRows(Shard shard, SimpleDataSet result)
{
Shard.DebugInfo shardDebugInfo = shard.getDebugInfo();
Expand All @@ -142,44 +151,35 @@ private void addShardRows(Shard shard, SimpleDataSet result)
CoordinatorLog.DebugInfo logDebugInfo = entry.getValue();
result.row(shardDebugInfo.keyspace,
logId.toString(),
shardDebugInfo.sinceEpoch,
shardDebugInfo.range.left.toString(),
shardDebugInfo.range.right.toString())
.column(LOCAL_NODE_ID, shardDebugInfo.localNodeId)
.column(PARTICIPANTS, shardDebugInfo.participants.toString())
.column(PARTICIPANTS, shardDebugInfo.participants.asSet())
.column(WITNESSED_OFFSETS, logDebugInfo.witnessedOffsets)
.column(RECONCILED_OFFSETS, logDebugInfo.reconciledOffsets)
.column(PERSISTED_OFFSETS, logDebugInfo.persistedOffsets);
}
}

@Override
public DataSet data()
{
SimpleDataSet result = new SimpleDataSet(metadata());

for (Shard shard : MutationTrackingService.instance().getShards())
{
addShardRows(shard, result);
}

return result;
}

@Override
public DataSet data(DecoratedKey key)
{
String keyspaceName = UTF8Type.instance.compose(key.getKey());
SimpleDataSet result = new SimpleDataSet(metadata());

for (Shard shard : MutationTrackingService.instance().getShards())
{
Shard.DebugInfo debugInfo = shard.getDebugInfo();
if (!debugInfo.keyspace.equals(keyspaceName))
continue;

addShardRows(shard, result);
}

if (shard.keyspace.equals(keyspaceName))
addShardRows(shard, result);
return result;
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/dht/Range.java
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,7 @@ public static int compareRightToken(Token a, Token b)
if (a.isMinimum())
return 1;
if (b.isMinimum())
return 0;
return -1;
return a.compareTo(b);
}
}
Loading