Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions src/main/java/com/cinchapi/runway/RetryExhaustedException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2013-2026 Cinchapi Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.cinchapi.runway;

/**
* A {@link RetryExhaustedException} is thrown by an atomic read-modify-write
* operation ({@code findAndEdit}, {@code findUniqueAndEdit}, and
* {@code findFirstAndEdit}) when it cannot commit because it lost the
* write-conflict race on every attempt up to the bounded retry limit.
* <p>
* This is semantically distinct from a {@code null} or empty result, which
* means no record matched. A {@link RetryExhaustedException} means matching
* record(s) existed but persistent contention prevented this caller from
* committing its edit; the caller may back off and retry.
*
* @author Javier Lores
*/
@SuppressWarnings("serial")
public class RetryExhaustedException extends RunwayException {

/**
* The number of attempts that were made before giving up.
*/
private final int attempts;

/**
* Construct a new instance.
*
* @param attempts the number of attempts that were made before giving up
*/
public RetryExhaustedException(int attempts) {
super("Failed to atomically commit after " + attempts
+ " attempts due to persistent write contention");
this.attempts = attempts;
}

/**
* Return the number of attempts that were made before giving up.
*
* @return the attempt count
*/
public int attempts() {
return attempts;
}

}
290 changes: 290 additions & 0 deletions src/main/java/com/cinchapi/runway/Runway.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -370,6 +371,12 @@ private static void restore(Map<Record, Record.Snapshot> snapshot) {
*/
private static final int MAX_SPURIOUS_SAVE_RETRIES = 5;

/**
* The base interval, in milliseconds, for the jittered exponential backoff
* between atomic read-modify-write retries.
*/
private static final long RETRY_BACKOFF_BASE_MILLIS = 10;

/**
* The development {@link Version} sentinel; a server reporting this version
* is treated as providing every feature, regardless of the {@link Version}
Expand Down Expand Up @@ -964,6 +971,289 @@ public boolean save(Record... records) {
return save(false, records);
}

/**
* Atomically find every {@link Record} of type {@code clazz} that matches
* the {@code criteria}, apply the {@code consumer} to each, and persist all
* of the edits as a single transaction.
* <p>
* The find, the {@code consumer} application, and the save run as one
* transaction: either every edit commits or none do. When no record
* matches, the {@code consumer} is never invoked, nothing is committed, and
* an empty {@link Set} is returned. The returned {@link Set} preserves the
* iteration order of the underlying find.
* <p>
* On a write conflict the whole cycle (re-find, re-apply, re-save) is
* retried with jittered backoff up to a bounded number of attempts, so the
* {@code consumer} may run more than once and must be safe to do so; it
* must mutate the {@link Record} it is handed rather than one captured
* earlier. The {@code consumer} runs before the save's validation, so an
* edit that violates a {@code Required}/{@code Unique} constraint surfaces
* from the save path.
*
* @param clazz the {@link Record} type to find
* @param criteria the {@link Criteria} the records must match
* @param consumer the mutation to apply to each matching {@link Record}
* @return the {@link Set} of edited {@link Record Records}, empty when none
* matched
* @throws RetryExhaustedException if the edit cannot commit within the
* bounded number of attempts due to persistent contention
*/
public <T extends Record> Set<T> findAndEdit(Class<T> clazz,
Criteria criteria, Consumer<T> consumer) {
return editWithinTransaction(clazz, criteria, null, null, consumer,
found -> {});
}

/**
* Atomically find the one {@link Record} of type {@code clazz} that matches
* the {@code criteria}, apply the {@code consumer} to it, and persist the
* edit as a single transaction.
* <p>
* Returns the edited {@link Record}, or {@code null} when nothing matches
* (in which case the {@code consumer} is never invoked and nothing is
* committed). Throws {@link DuplicateEntryException} when more than one
* record matches, consistent with
* {@link DatabaseInterface#findUnique(Class, Criteria) findUnique}; the
* duplicate check happens inside the transaction before the
* {@code consumer} runs, so a violation neither mutates nor commits.
* <p>
* On a write conflict the whole cycle is retried with jittered backoff up
* to a bounded number of attempts, so the {@code consumer} may run more
* than once and must be safe to do so; it must mutate the {@link Record} it
* is handed rather than one captured earlier.
*
* @param clazz the {@link Record} type to find
* @param criteria the {@link Criteria} the record must match
* @param consumer the mutation to apply to the matching {@link Record}
* @return the edited {@link Record}, or {@code null} if none matches
* @throws DuplicateEntryException if more than one record matches
* @throws RetryExhaustedException if the edit cannot commit within the
* bounded number of attempts due to persistent contention
*/
public <T extends Record> T findUniqueAndEdit(Class<T> clazz,
Criteria criteria, Consumer<T> consumer) {
Set<T> edited = editWithinTransaction(clazz, criteria, null,
DatabaseInterface.UNIQUE_PAGINATION, consumer, found -> {
if(found.size() > 1) {
throw duplicateEntryException(
"Multiple records match {} in {}", criteria,
clazz);
}
});
return Iterables.getFirst(edited, null);
}

/**
* Atomically find the first {@link Record} of type {@code clazz} that
* matches the {@code criteria} under the supplied {@code order}, apply the
* {@code consumer} to it, and persist the edit as a single transaction.
* <p>
* "First" is defined entirely by {@code order}, which is required. Returns
* the edited {@link Record}, or {@code null} when nothing matches (in which
* case the {@code consumer} is never invoked and nothing is committed).
* <p>
* This is the claim-and-update primitive: concurrent callers contending for
* the same record are serialized by the database, so at most one commits
* and the rest are preempted and retried. On a write conflict the whole
* cycle (re-find under {@code order}, re-apply, re-save) is retried with
* jittered backoff up to a bounded number of attempts, so the
* {@code consumer} may run more than once and must be safe to do so; it
* must mutate the {@link Record} it is handed rather than one captured
* earlier.
*
* @param clazz the {@link Record} type to find
* @param criteria the {@link Criteria} the record must match
* @param order the {@link Order} that defines "first"
* @param consumer the mutation to apply to the matching {@link Record}
* @return the edited {@link Record}, or {@code null} if none matches
* @throws RetryExhaustedException if the edit cannot commit within the
* bounded number of attempts due to persistent contention
*/
public <T extends Record> T findFirstAndEdit(Class<T> clazz,
Criteria criteria, Order order, Consumer<T> consumer) {
Preconditions.checkNotNull(order, "findFirstAndEdit requires an Order");
Set<T> edited = editWithinTransaction(clazz, criteria, order,
Page.limit(1), consumer, found -> {});
return Iterables.getFirst(edited, null);
}

/**
* Perform an atomic find-modify-save: within a single staged transaction,
* find the records of type {@code clazz} matching {@code criteria} (under
* the optional {@code order} and {@code page}), let {@code validator}
* inspect the freshly read {@link Set} before any edit, apply
* {@code consumer} to each record, save, and commit. The find's read and
* the save's write share one transaction (and therefore one set of locks),
* which is what gives concurrent callers true mutual exclusion.
* <p>
* On a {@link TransactionException} the cycle is aborted and retried with
* jittered backoff up to {@link #MAX_SPURIOUS_SAVE_RETRIES} attempts, each
* re-finding fresh records so the {@code consumer} always observes current
* state. When the attempt budget is exhausted a
* {@link RetryExhaustedException} is thrown rather than returning a
* non-committed result.
*
* @param clazz the {@link Record} type to find
* @param criteria the {@link Criteria} the records must match
* @param order the sort {@link Order}, or {@code null}
* @param page the {@link Page} limit, or {@code null} for all matches
* @param consumer the mutation applied to each matching {@link Record}
* @param validator a hook that inspects the freshly read {@link Set} inside
* the transaction before any edit (e.g. to enforce uniqueness)
* @return the {@link Set} of edited and committed {@link Record Records}
* @throws RetryExhaustedException if no attempt commits within the bound
*/
private <T extends Record> Set<T> editWithinTransaction(Class<T> clazz,
Criteria criteria, @Nullable Order order, @Nullable Page page,
Consumer<T> consumer, Consumer<Set<T>> validator) {
// NOTE: Unlike #save, every TransactionException is retried here
// regardless of #spuriousSaveFailureStrategy and without a stale-data
// check. These primitives are built for write-conflict contention (the
// claim use case), where a lost commit race is exactly the condition a
// caller wants retried; staleness checks are off because the read is
// re-issued fresh inside each attempt's transaction.
Concourse concourse = connections.request();
try {
// NOTE: The connection is held across the bounded backoff sleeps
// rather than released between attempts. The staged transaction
// lives on this connection, and re-requesting per attempt would
// churn the pool; the backoff is capped at a few hundred millis so
// the idle occupancy is brief.
int attempts = 0;
while (true) {
// NOTE: The incremental path is used unconditionally (even when
// #supportsBulkCommands) because BatchSaver defers the
// server-side STAGE until commit time, which would place the
// find's read outside the transaction. IncrementalSaver stages
// synchronously, so the find's read and the save's write share
// one transaction (and one set of locks) &mdash; the property
// that gives concurrent callers mutual exclusion.
Saver saver = new IncrementalSaver(concourse);
try {
saver.stage();
// NOTE: A legacy server cannot sort or paginate
// server-side, so the order/page are withheld from the
// find and applied client-side below. The find still reads
// (and therefore locks) every match inside the
// transaction, so the atomicity and mutual-exclusion
// guarantees are preserved.
boolean nativeOrderAndPage = hasNativeSortingAndPagination
|| doesNotRequireSortingOrPagination(order, page);
Set<T> found;
// NOTE: The Reader is bound to the same staged connection
// as the Saver so the find reads inside the transaction;
// this deliberately bypasses the result cache used by the
// pooled #select path.
try (Reader reader = new IncrementalReader(concourse)) {
Read read = enqueueRead(reader, false, clazz, criteria,
nativeOrderAndPage ? order : null,
nativeOrderAndPage ? page : null);
AtomicReference<Set<T>> ref = new AtomicReference<>();
read.data
.then($data -> read.navigated
.then($navigated -> resolveLinkTargets(
reader, $data, $navigated))
.map($targets -> instantiateAll(clazz,
false, $data, $targets)))
.onResolve(ref::set);
reader.drain();
found = ref.get();
}
if(!nativeOrderAndPage) {
found = sortAndPage(found, order, page);
}
validator.accept(found);
if(found.isEmpty()) {
saver.abort();
return found;
}
Map<Record, Boolean> seen = new HashMap<>();
Map<Record, Snapshot> snapshots = new HashMap<>();
for (T record : found) {
consumer.accept(record);
record.assign(this);
record.saveWithinTransaction(saver, seen, snapshots,
false);
}
if(saver.commit()) {
seen.entrySet().stream().filter(Entry::getValue)
.map(Entry::getKey).forEach(record -> {
enqueueSaveNotification(record);
record.checkpoint();
});
return found;
}
else {
// Trigger the retry path below.
throw new TransactionException();
}
}
catch (TransactionException e) {
saver.abort();
if(++attempts > MAX_SPURIOUS_SAVE_RETRIES) {
throw new RetryExhaustedException(attempts);
}
backoffWithJitter(attempts);
continue;
}
catch (Throwable t) {
// A non-transaction failure (e.g. a duplicate-entry or
// constraint violation) is terminal: abort and propagate
// without retrying so nothing is committed or mutated.
saver.abort();
throw t;
}
}
}
finally {
connections.release(concourse);
}
}

/**
* Sleep for a jittered, exponentially growing interval before the next
* atomic read-modify-write {@code attempt}, dispersing contending callers
* so they do not collide again in lockstep.
*
* @param attempt the attempt number that just failed (1-based)
*/
private void backoffWithJitter(int attempt) {
long ceiling = RETRY_BACKOFF_BASE_MILLIS * (1L << (attempt - 1));
long delay = ThreadLocalRandom.current().nextLong(ceiling + 1);
try {
Thread.sleep(delay);
}
catch (InterruptedException e) {
// Honor the interrupt by restoring the flag and abandoning the
// retry loop rather than silently continuing to contend.
Thread.currentThread().interrupt();
throw CheckedExceptions.throwAsRuntimeException(e);
}
}

/**
* Apply {@code order} and {@code page} to {@code records} client-side, for
* the legacy-server path where the find could not sort or paginate
* server-side.
*
* @param records the records read for the match, in find order
* @param order the sort {@link Order} to apply, or {@code null}
* @param page the {@link Page} to apply, or {@code null} for all records
* @return the sorted and paginated {@link Set}, preserving iteration order
*/
private <T extends Record> Set<T> sortAndPage(Set<T> records,
@Nullable Order order, @Nullable Page page) {
if(order != null) {
records = DatabaseInterface.sort(records,
backwardsCompatible(order));
}
if(page != null) {
records = records.stream().skip(page.skip()).limit(page.limit())
.collect(Collectors.toCollection(LinkedHashSet::new));
}
return records;
}

/**
* Search for records in {@code clazz} that match the search {@query} across
* any of the provided {@code keys}.
Expand Down
Loading