diff --git a/src/main/java/com/cinchapi/runway/RetryExhaustedException.java b/src/main/java/com/cinchapi/runway/RetryExhaustedException.java new file mode 100644 index 0000000..2e81972 --- /dev/null +++ b/src/main/java/com/cinchapi/runway/RetryExhaustedException.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2013-2026 Cinchapi Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.cinchapi.runway; + +/** + * A {@link RetryExhaustedException} is thrown by an atomic read-modify-write + * operation ({@code findAndEdit}, {@code findUniqueAndEdit}, and + * {@code findFirstAndEdit}) when it cannot commit because it lost the + * write-conflict race on every attempt up to the bounded retry limit. + *

+ * This is semantically distinct from a {@code null} or empty result, which + * means no record matched. A {@link RetryExhaustedException} means matching + * record(s) existed but persistent contention prevented this caller from + * committing its edit; the caller may back off and retry. + * + * @author Javier Lores + */ +@SuppressWarnings("serial") +public class RetryExhaustedException extends RunwayException { + + /** + * The number of attempts that were made before giving up. + */ + private final int attempts; + + /** + * Construct a new instance. + * + * @param attempts the number of attempts that were made before giving up + */ + public RetryExhaustedException(int attempts) { + super("Failed to atomically commit after " + attempts + + " attempts due to persistent write contention"); + this.attempts = attempts; + } + + /** + * Return the number of attempts that were made before giving up. + * + * @return the attempt count + */ + public int attempts() { + return attempts; + } + +} diff --git a/src/main/java/com/cinchapi/runway/Runway.java b/src/main/java/com/cinchapi/runway/Runway.java index bede6dc..1b5c331 100644 --- a/src/main/java/com/cinchapi/runway/Runway.java +++ b/src/main/java/com/cinchapi/runway/Runway.java @@ -37,6 +37,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; @@ -370,6 +371,12 @@ private static void restore(Map snapshot) { */ private static final int MAX_SPURIOUS_SAVE_RETRIES = 5; + /** + * The base interval, in milliseconds, for the jittered exponential backoff + * between atomic read-modify-write retries. + */ + private static final long RETRY_BACKOFF_BASE_MILLIS = 10; + /** * The development {@link Version} sentinel; a server reporting this version * is treated as providing every feature, regardless of the {@link Version} @@ -964,6 +971,289 @@ public boolean save(Record... records) { return save(false, records); } + /** + * Atomically find every {@link Record} of type {@code clazz} that matches + * the {@code criteria}, apply the {@code consumer} to each, and persist all + * of the edits as a single transaction. + *

+ * The find, the {@code consumer} application, and the save run as one + * transaction: either every edit commits or none do. When no record + * matches, the {@code consumer} is never invoked, nothing is committed, and + * an empty {@link Set} is returned. The returned {@link Set} preserves the + * iteration order of the underlying find. + *

+ * On a write conflict the whole cycle (re-find, re-apply, re-save) is + * retried with jittered backoff up to a bounded number of attempts, so the + * {@code consumer} may run more than once and must be safe to do so; it + * must mutate the {@link Record} it is handed rather than one captured + * earlier. The {@code consumer} runs before the save's validation, so an + * edit that violates a {@code Required}/{@code Unique} constraint surfaces + * from the save path. + * + * @param clazz the {@link Record} type to find + * @param criteria the {@link Criteria} the records must match + * @param consumer the mutation to apply to each matching {@link Record} + * @return the {@link Set} of edited {@link Record Records}, empty when none + * matched + * @throws RetryExhaustedException if the edit cannot commit within the + * bounded number of attempts due to persistent contention + */ + public Set findAndEdit(Class clazz, + Criteria criteria, Consumer consumer) { + return editWithinTransaction(clazz, criteria, null, null, consumer, + found -> {}); + } + + /** + * Atomically find the one {@link Record} of type {@code clazz} that matches + * the {@code criteria}, apply the {@code consumer} to it, and persist the + * edit as a single transaction. + *

+ * Returns the edited {@link Record}, or {@code null} when nothing matches + * (in which case the {@code consumer} is never invoked and nothing is + * committed). Throws {@link DuplicateEntryException} when more than one + * record matches, consistent with + * {@link DatabaseInterface#findUnique(Class, Criteria) findUnique}; the + * duplicate check happens inside the transaction before the + * {@code consumer} runs, so a violation neither mutates nor commits. + *

+ * On a write conflict the whole cycle is retried with jittered backoff up + * to a bounded number of attempts, so the {@code consumer} may run more + * than once and must be safe to do so; it must mutate the {@link Record} it + * is handed rather than one captured earlier. + * + * @param clazz the {@link Record} type to find + * @param criteria the {@link Criteria} the record must match + * @param consumer the mutation to apply to the matching {@link Record} + * @return the edited {@link Record}, or {@code null} if none matches + * @throws DuplicateEntryException if more than one record matches + * @throws RetryExhaustedException if the edit cannot commit within the + * bounded number of attempts due to persistent contention + */ + public T findUniqueAndEdit(Class clazz, + Criteria criteria, Consumer consumer) { + Set edited = editWithinTransaction(clazz, criteria, null, + DatabaseInterface.UNIQUE_PAGINATION, consumer, found -> { + if(found.size() > 1) { + throw duplicateEntryException( + "Multiple records match {} in {}", criteria, + clazz); + } + }); + return Iterables.getFirst(edited, null); + } + + /** + * Atomically find the first {@link Record} of type {@code clazz} that + * matches the {@code criteria} under the supplied {@code order}, apply the + * {@code consumer} to it, and persist the edit as a single transaction. + *

+ * "First" is defined entirely by {@code order}, which is required. Returns + * the edited {@link Record}, or {@code null} when nothing matches (in which + * case the {@code consumer} is never invoked and nothing is committed). + *

+ * This is the claim-and-update primitive: concurrent callers contending for + * the same record are serialized by the database, so at most one commits + * and the rest are preempted and retried. On a write conflict the whole + * cycle (re-find under {@code order}, re-apply, re-save) is retried with + * jittered backoff up to a bounded number of attempts, so the + * {@code consumer} may run more than once and must be safe to do so; it + * must mutate the {@link Record} it is handed rather than one captured + * earlier. + * + * @param clazz the {@link Record} type to find + * @param criteria the {@link Criteria} the record must match + * @param order the {@link Order} that defines "first" + * @param consumer the mutation to apply to the matching {@link Record} + * @return the edited {@link Record}, or {@code null} if none matches + * @throws RetryExhaustedException if the edit cannot commit within the + * bounded number of attempts due to persistent contention + */ + public T findFirstAndEdit(Class clazz, + Criteria criteria, Order order, Consumer consumer) { + Preconditions.checkNotNull(order, "findFirstAndEdit requires an Order"); + Set edited = editWithinTransaction(clazz, criteria, order, + Page.limit(1), consumer, found -> {}); + return Iterables.getFirst(edited, null); + } + + /** + * Perform an atomic find-modify-save: within a single staged transaction, + * find the records of type {@code clazz} matching {@code criteria} (under + * the optional {@code order} and {@code page}), let {@code validator} + * inspect the freshly read {@link Set} before any edit, apply + * {@code consumer} to each record, save, and commit. The find's read and + * the save's write share one transaction (and therefore one set of locks), + * which is what gives concurrent callers true mutual exclusion. + *

+ * On a {@link TransactionException} the cycle is aborted and retried with + * jittered backoff up to {@link #MAX_SPURIOUS_SAVE_RETRIES} attempts, each + * re-finding fresh records so the {@code consumer} always observes current + * state. When the attempt budget is exhausted a + * {@link RetryExhaustedException} is thrown rather than returning a + * non-committed result. + * + * @param clazz the {@link Record} type to find + * @param criteria the {@link Criteria} the records must match + * @param order the sort {@link Order}, or {@code null} + * @param page the {@link Page} limit, or {@code null} for all matches + * @param consumer the mutation applied to each matching {@link Record} + * @param validator a hook that inspects the freshly read {@link Set} inside + * the transaction before any edit (e.g. to enforce uniqueness) + * @return the {@link Set} of edited and committed {@link Record Records} + * @throws RetryExhaustedException if no attempt commits within the bound + */ + private Set editWithinTransaction(Class clazz, + Criteria criteria, @Nullable Order order, @Nullable Page page, + Consumer consumer, Consumer> validator) { + // NOTE: Unlike #save, every TransactionException is retried here + // regardless of #spuriousSaveFailureStrategy and without a stale-data + // check. These primitives are built for write-conflict contention (the + // claim use case), where a lost commit race is exactly the condition a + // caller wants retried; staleness checks are off because the read is + // re-issued fresh inside each attempt's transaction. + Concourse concourse = connections.request(); + try { + // NOTE: The connection is held across the bounded backoff sleeps + // rather than released between attempts. The staged transaction + // lives on this connection, and re-requesting per attempt would + // churn the pool; the backoff is capped at a few hundred millis so + // the idle occupancy is brief. + int attempts = 0; + while (true) { + // NOTE: The incremental path is used unconditionally (even when + // #supportsBulkCommands) because BatchSaver defers the + // server-side STAGE until commit time, which would place the + // find's read outside the transaction. IncrementalSaver stages + // synchronously, so the find's read and the save's write share + // one transaction (and one set of locks) — the property + // that gives concurrent callers mutual exclusion. + Saver saver = new IncrementalSaver(concourse); + try { + saver.stage(); + // NOTE: A legacy server cannot sort or paginate + // server-side, so the order/page are withheld from the + // find and applied client-side below. The find still reads + // (and therefore locks) every match inside the + // transaction, so the atomicity and mutual-exclusion + // guarantees are preserved. + boolean nativeOrderAndPage = hasNativeSortingAndPagination + || doesNotRequireSortingOrPagination(order, page); + Set found; + // NOTE: The Reader is bound to the same staged connection + // as the Saver so the find reads inside the transaction; + // this deliberately bypasses the result cache used by the + // pooled #select path. + try (Reader reader = new IncrementalReader(concourse)) { + Read read = enqueueRead(reader, false, clazz, criteria, + nativeOrderAndPage ? order : null, + nativeOrderAndPage ? page : null); + AtomicReference> ref = new AtomicReference<>(); + read.data + .then($data -> read.navigated + .then($navigated -> resolveLinkTargets( + reader, $data, $navigated)) + .map($targets -> instantiateAll(clazz, + false, $data, $targets))) + .onResolve(ref::set); + reader.drain(); + found = ref.get(); + } + if(!nativeOrderAndPage) { + found = sortAndPage(found, order, page); + } + validator.accept(found); + if(found.isEmpty()) { + saver.abort(); + return found; + } + Map seen = new HashMap<>(); + Map snapshots = new HashMap<>(); + for (T record : found) { + consumer.accept(record); + record.assign(this); + record.saveWithinTransaction(saver, seen, snapshots, + false); + } + if(saver.commit()) { + seen.entrySet().stream().filter(Entry::getValue) + .map(Entry::getKey).forEach(record -> { + enqueueSaveNotification(record); + record.checkpoint(); + }); + return found; + } + else { + // Trigger the retry path below. + throw new TransactionException(); + } + } + catch (TransactionException e) { + saver.abort(); + if(++attempts > MAX_SPURIOUS_SAVE_RETRIES) { + throw new RetryExhaustedException(attempts); + } + backoffWithJitter(attempts); + continue; + } + catch (Throwable t) { + // A non-transaction failure (e.g. a duplicate-entry or + // constraint violation) is terminal: abort and propagate + // without retrying so nothing is committed or mutated. + saver.abort(); + throw t; + } + } + } + finally { + connections.release(concourse); + } + } + + /** + * Sleep for a jittered, exponentially growing interval before the next + * atomic read-modify-write {@code attempt}, dispersing contending callers + * so they do not collide again in lockstep. + * + * @param attempt the attempt number that just failed (1-based) + */ + private void backoffWithJitter(int attempt) { + long ceiling = RETRY_BACKOFF_BASE_MILLIS * (1L << (attempt - 1)); + long delay = ThreadLocalRandom.current().nextLong(ceiling + 1); + try { + Thread.sleep(delay); + } + catch (InterruptedException e) { + // Honor the interrupt by restoring the flag and abandoning the + // retry loop rather than silently continuing to contend. + Thread.currentThread().interrupt(); + throw CheckedExceptions.throwAsRuntimeException(e); + } + } + + /** + * Apply {@code order} and {@code page} to {@code records} client-side, for + * the legacy-server path where the find could not sort or paginate + * server-side. + * + * @param records the records read for the match, in find order + * @param order the sort {@link Order} to apply, or {@code null} + * @param page the {@link Page} to apply, or {@code null} for all records + * @return the sorted and paginated {@link Set}, preserving iteration order + */ + private Set sortAndPage(Set records, + @Nullable Order order, @Nullable Page page) { + if(order != null) { + records = DatabaseInterface.sort(records, + backwardsCompatible(order)); + } + if(page != null) { + records = records.stream().skip(page.skip()).limit(page.limit()) + .collect(Collectors.toCollection(LinkedHashSet::new)); + } + return records; + } + /** * Search for records in {@code clazz} that match the search {@query} across * any of the provided {@code keys}. diff --git a/src/test/java/com/cinchapi/runway/FindAndEditTest.java b/src/test/java/com/cinchapi/runway/FindAndEditTest.java new file mode 100644 index 0000000..c923402 --- /dev/null +++ b/src/test/java/com/cinchapi/runway/FindAndEditTest.java @@ -0,0 +1,219 @@ +/* + * Copyright (c) 2013-2026 Cinchapi Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.cinchapi.runway; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import com.cinchapi.common.reflect.Reflection; +import com.cinchapi.concourse.lang.Criteria; +import com.cinchapi.concourse.thrift.Operator; + +/** + * Tests for + * {@link Runway#findAndEdit(Class, Criteria, java.util.function.Consumer) + * findAndEdit}. Each test runs under both Command-API modes (bulk enabled and + * disabled); the atomic edit itself always uses the incremental, + * synchronously-staged transaction path, so the matrix additionally guards the + * surrounding save and load operations under both modes. + * + * @author Javier Lores + */ +@RunWith(Parameterized.class) +public class FindAndEditTest extends RunwayBaseClientServerTest { + + /** + * Return the parameter matrix that drives each test once per Command-API + * capability. + * + * @return one row with bulk commands enabled and one with it disabled + */ + @Parameters(name = "bulkCommands={0}") + public static Collection parameters() { + return Arrays.asList(new Object[][] { { true }, { false } }); + } + + private final boolean useBulkCommands; + + /** + * Construct a new instance. + * + * @param useBulkCommands {@code true} to exercise the bulk Command-API read + * path; {@code false} for the incremental path + */ + public FindAndEditTest(boolean useBulkCommands) { + this.useBulkCommands = useBulkCommands; + } + + @Override + protected void beforeTestRun() { + super.beforeTestRun(); + Reflection.set("supportsBulkCommands", useBulkCommands, runway); // (authorized) + } + + /** + * Goal: Verify that {@code findAndEdit} edits every + * matching record and durably persists all of the edits. + *

+ * Start state: Three {@link Doc Docs} that all match the + * criteria. + *

+ * Workflow: + *

+ *

+ * Expected: Three {@link Doc Docs} are returned, each with + * {@code owner == "worker"}, and every re-loaded {@link Doc} shows the + * persisted {@code owner}. + */ + @Test + public void testFindAndEditEditsAllMatchesAndPersists() { + runway.save(new Doc(1), new Doc(2), new Doc(3)); + Set edited = runway.findAndEdit(Doc.class, rankPositive(), + doc -> doc.owner = "worker"); + Assert.assertEquals(3, edited.size()); + for (Doc doc : edited) { + Assert.assertEquals("worker", doc.owner); + Assert.assertEquals("worker", + runway.load(Doc.class, doc.id()).owner); + } + } + + /** + * Goal: Verify that {@code findAndEdit} returns an empty + * {@link Set} and never invokes the consumer when no record matches. + *

+ * Start state: Three {@link Doc Docs} none of which + * matches the criteria. + *

+ * Workflow: + *

+ *

+ * Expected: The returned {@link Set} is empty and the + * consumer never ran. + */ + @Test + public void testFindAndEditReturnsEmptyAndSkipsConsumerWhenNoMatch() { + runway.save(new Doc(1), new Doc(2), new Doc(3)); + AtomicBoolean consumerRan = new AtomicBoolean(false); + Set edited = runway.findAndEdit( + Doc.class, Criteria.where().key("rank") + .operator(Operator.GREATER_THAN).value(100).build(), + doc -> consumerRan.set(true)); + Assert.assertTrue(edited.isEmpty()); + Assert.assertFalse(consumerRan.get()); + } + + /** + * Goal: Verify the all-or-nothing guarantee: when the + * consumer throws partway through the match set, no record's edit is + * persisted. + *

+ * Start state: Three {@link Doc Docs} that all match the + * criteria. + *

+ * Workflow: + *

+ *

+ * Expected: The exception propagates and none of the + * re-loaded {@link Doc Docs} has an {@code owner} set. + */ + @Test + public void testFindAndEditIsAllOrNothingWhenConsumerThrows() { + Doc one = new Doc(1); + Doc two = new Doc(2); + Doc three = new Doc(3); + runway.save(one, two, three); + boolean threw = false; + try { + runway.findAndEdit(Doc.class, rankPositive(), doc -> { + if(doc.rank == 2) { + throw new IllegalStateException("boom"); + } + doc.owner = "worker"; + }); + } + catch (IllegalStateException e) { + threw = true; + } + Assert.assertTrue(threw); + Assert.assertNull(runway.load(Doc.class, one.id()).owner); + Assert.assertNull(runway.load(Doc.class, two.id()).owner); + Assert.assertNull(runway.load(Doc.class, three.id()).owner); + } + + /** + * Return a {@link Criteria} matching every {@link Doc} whose {@code rank} + * is positive. + * + * @return the {@code rank > 0} {@link Criteria} + */ + private static Criteria rankPositive() { + return Criteria.where().key("rank").operator(Operator.GREATER_THAN) + .value(0).build(); + } + + /** + * A {@link Record} with a queryable {@code rank} and an editable + * {@code owner}. + * + * @author Javier Lores + */ + public static class Doc extends Record { + + /** + * The queryable rank. + */ + int rank; + + /** + * The editable owner, or {@code null} when unset. + */ + String owner; + + /** + * Construct a new instance. + * + * @param rank the queryable rank + */ + public Doc(int rank) { + this.rank = rank; + } + } + +} diff --git a/src/test/java/com/cinchapi/runway/FindFirstAndEditTest.java b/src/test/java/com/cinchapi/runway/FindFirstAndEditTest.java new file mode 100644 index 0000000..b444120 --- /dev/null +++ b/src/test/java/com/cinchapi/runway/FindFirstAndEditTest.java @@ -0,0 +1,420 @@ +/* + * Copyright (c) 2013-2026 Cinchapi Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.cinchapi.runway; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import com.cinchapi.common.reflect.Reflection; +import com.cinchapi.concourse.Concourse; +import com.cinchapi.concourse.lang.Criteria; +import com.cinchapi.concourse.lang.sort.Order; +import com.cinchapi.concourse.thrift.Operator; + +/** + * Tests for + * {@link Runway#findFirstAndEdit(Class, Criteria, Order, java.util.function.Consumer) + * findFirstAndEdit}, the atomic claim-and-update primitive. Each test runs + * under both Command-API modes (bulk enabled and disabled); the atomic edit + * itself always uses the incremental, synchronously-staged transaction path, so + * the matrix additionally guards the surrounding save and load operations under + * both modes. + * + * @author Javier Lores + */ +@RunWith(Parameterized.class) +public class FindFirstAndEditTest extends RunwayBaseClientServerTest { + + /** + * Return the parameter matrix that drives each test once per Command-API + * capability. + * + * @return one row with bulk commands enabled and one with it disabled + */ + @Parameters(name = "bulkCommands={0}") + public static Collection parameters() { + return Arrays.asList(new Object[][] { { true }, { false } }); + } + + private final boolean useBulkCommands; + + /** + * Construct a new instance. + * + * @param useBulkCommands {@code true} to exercise the bulk Command-API read + * path; {@code false} for the incremental path + */ + public FindFirstAndEditTest(boolean useBulkCommands) { + this.useBulkCommands = useBulkCommands; + } + + @Override + protected void beforeTestRun() { + super.beforeTestRun(); + Reflection.set("supportsBulkCommands", useBulkCommands, runway); // (authorized) + } + + /** + * Goal: Verify that {@code findFirstAndEdit} edits exactly + * the record that sorts first under the {@link Order} and durably persists + * the edit. + *

+ * Start state: Three unclaimed {@link Task Tasks} with + * ranks 3, 2, and 1 saved in non-sorted insertion order. + *

+ * Workflow: + *

+ *

+ * Expected: The returned {@link Task} has rank 1 and + * {@code owner == "worker"}, and the re-loaded {@link Task} shows the same + * persisted {@code owner}. + */ + @Test + public void testFindFirstAndEditEditsFirstUnderOrderAndPersists() { + runway.save(new Task(3), new Task(2), new Task(1)); + Task first = runway.findFirstAndEdit(Task.class, unclaimed(), + Order.by("rank").ascending(), task -> task.owner = "worker"); + Assert.assertNotNull(first); + Assert.assertEquals(1, first.rank); + Assert.assertEquals("worker", first.owner); + Task reloaded = runway.load(Task.class, first.id()); + Assert.assertEquals("worker", reloaded.owner); + } + + /** + * Goal: Verify that {@code findFirstAndEdit} applies the + * {@link Order} client-side and still edits exactly the record that sorts + * first when the server cannot sort or paginate natively. + *

+ * Start state: A {@link Runway} forced onto the legacy + * path (no native sorting/pagination) with three unclaimed {@link Task + * Tasks} of ranks 3, 2, and 1 saved in non-sorted insertion order. + *

+ * Workflow: + *

+ *

+ * Expected: The returned {@link Task} has rank 1 and + * {@code owner == "worker"}, and the re-loaded {@link Task} shows the same + * persisted {@code owner}. + */ + @Test + public void testFindFirstAndEditAppliesOrderClientSideOnLegacyServer() { + Reflection.set("hasNativeSortingAndPagination", false, runway); // (authorized) + runway.save(new Task(3), new Task(2), new Task(1)); + Task first = runway.findFirstAndEdit(Task.class, unclaimed(), + Order.by("rank").ascending(), task -> task.owner = "worker"); + Assert.assertNotNull(first); + Assert.assertEquals(1, first.rank); + Assert.assertEquals("worker", first.owner); + Task reloaded = runway.load(Task.class, first.id()); + Assert.assertEquals("worker", reloaded.owner); + } + + /** + * Goal: Verify that {@code findFirstAndEdit} returns + * {@code null} and never invokes the consumer when no record matches. + *

+ * Start state: Three {@link Task Tasks} whose ranks are + * all below the criteria threshold. + *

+ * Workflow: + *

+ *

+ * Expected: The result is {@code null} and the consumer + * never ran. + */ + @Test + public void testFindFirstAndEditReturnsNullAndSkipsConsumerWhenNoMatch() { + runway.save(new Task(1), new Task(2), new Task(3)); + AtomicBoolean consumerRan = new AtomicBoolean(false); + Task first = runway.findFirstAndEdit(Task.class, + Criteria.where().key("rank").operator(Operator.GREATER_THAN) + .value(100).build(), + Order.by("rank").ascending(), task -> consumerRan.set(true)); + Assert.assertNull(first); + Assert.assertFalse(consumerRan.get()); + } + + /** + * Goal: Verify that {@code findFirstAndEdit} rejects a + * {@code null} {@link Order}, since "first" is undefined without one. + *

+ * Start state: A single unclaimed {@link Task}. + *

+ * Workflow: + *

+ *

+ * Expected: A {@link NullPointerException} is thrown. + */ + @Test(expected = NullPointerException.class) + public void testFindFirstAndEditRequiresOrder() { + runway.save(new Task(1)); + runway.findFirstAndEdit(Task.class, unclaimed(), null, + task -> task.owner = "worker"); + } + + /** + * Goal: Verify the core mutual-exclusion guarantee: two + * threads racing to claim the same single candidate cannot both succeed. + *

+ * Start state: One unclaimed {@link Task}. + *

+ * Workflow: + *

+ *

+ * Expected: Exactly one thread receives a non-null claim + * and the other receives {@code null}; the persisted {@code owner} equals + * the single winner's id. + */ + @Test + public void testFindFirstAndEditClaimsExactlyOneUnderConcurrency() + throws InterruptedException { + Task task = new Task(1); + runway.save(task); + long id = task.id(); + CountDownLatch ready = new CountDownLatch(2); + CountDownLatch go = new CountDownLatch(1); + AtomicReference claim1 = new AtomicReference<>(); + AtomicReference claim2 = new AtomicReference<>(); + Thread t1 = new Thread(() -> { + ready.countDown(); + try { + go.await(); + claim1.set(runway.findFirstAndEdit(Task.class, unclaimed(), + Order.by("rank").ascending(), t -> { + t.claimed = true; + t.owner = "one"; + })); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + Thread t2 = new Thread(() -> { + ready.countDown(); + try { + go.await(); + claim2.set(runway.findFirstAndEdit(Task.class, unclaimed(), + Order.by("rank").ascending(), t -> { + t.claimed = true; + t.owner = "two"; + })); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + t1.start(); + t2.start(); + ready.await(5, TimeUnit.SECONDS); + go.countDown(); + t1.join(10000); + t2.join(10000); + boolean oneWon = claim1.get() != null; + boolean twoWon = claim2.get() != null; + Assert.assertTrue("Exactly one thread must claim the task", + oneWon ^ twoWon); + Task reloaded = runway.load(Task.class, id); + Assert.assertTrue(reloaded.claimed); + Assert.assertEquals(oneWon ? "one" : "two", reloaded.owner); + } + + /** + * Goal: Verify that two threads racing over a candidate + * set of two records each claim a distinct record, so no record is claimed + * twice. + *

+ * Start state: Two unclaimed {@link Task Tasks} with ranks + * 1 and 2. + *

+ * Workflow: + *

+ *

+ * Expected: Both threads receive a non-null claim and the + * two claims are distinct {@link Task Tasks} (different ids). + */ + @Test + public void testFindFirstAndEditConcurrentClaimsTakeDistinctRecords() + throws InterruptedException { + runway.save(new Task(1), new Task(2)); + CountDownLatch ready = new CountDownLatch(2); + CountDownLatch go = new CountDownLatch(1); + AtomicReference claim1 = new AtomicReference<>(); + AtomicReference claim2 = new AtomicReference<>(); + Thread t1 = new Thread(() -> { + ready.countDown(); + try { + go.await(); + claim1.set(runway.findFirstAndEdit(Task.class, unclaimed(), + Order.by("rank").ascending(), t -> { + t.claimed = true; + t.owner = "one"; + })); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + Thread t2 = new Thread(() -> { + ready.countDown(); + try { + go.await(); + claim2.set(runway.findFirstAndEdit(Task.class, unclaimed(), + Order.by("rank").ascending(), t -> { + t.claimed = true; + t.owner = "two"; + })); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + t1.start(); + t2.start(); + ready.await(5, TimeUnit.SECONDS); + go.countDown(); + t1.join(10000); + t2.join(10000); + Assert.assertNotNull(claim1.get()); + Assert.assertNotNull(claim2.get()); + Assert.assertNotEquals(claim1.get().id(), claim2.get().id()); + } + + /** + * Goal: Verify that {@code findFirstAndEdit} throws + * {@link RetryExhaustedException} — rather than returning + * {@code null} — when it loses the commit race on every attempt. + *

+ * Start state: One unclaimed {@link Task}. + *

+ * Workflow: + *

+ *

+ * Expected: A {@link RetryExhaustedException} is thrown + * and the persisted {@code owner} was never set by this caller. + */ + @Test(expected = RetryExhaustedException.class) + public void testFindFirstAndEditThrowsRetryExhaustedUnderPersistentContention() { + Task task = new Task(1); + runway.save(task); + long id = task.id(); + runway.findFirstAndEdit(Task.class, unclaimed(), + Order.by("rank").ascending(), t -> { + // Force a conflicting external write on every attempt so + // the staged transaction can never commit. The nested + // request borrows a second connection while the edit holds + // its own; this is safe because Runway uses an expandable + // cached pool that grows on demand rather than blocking. + Concourse other = runway.connections.request(); + try { + other.set("rank", t.rank + 1, id); + } + finally { + runway.connections.release(other); + } + t.owner = "worker"; + }); + } + + /** + * Return a {@link Criteria} matching every unclaimed {@link Task}, i.e. one + * whose {@code claimed} flag is {@code false}. + * + * @return the {@code claimed == false} {@link Criteria} + */ + private static Criteria unclaimed() { + return Criteria.where().key("claimed").operator(Operator.EQUALS) + .value(false).build(); + } + + /** + * A claimable unit of work with an orderable {@code rank}, a + * {@code claimed} flag, and an {@code owner} that records the claimant. + * + * @author Javier Lores + */ + public static class Task extends Record { + + /** + * The orderable rank; lowest is claimed first under ascending order. + */ + int rank; + + /** + * Whether this task has been claimed. + */ + boolean claimed; + + /** + * The id of the claimant, or {@code null} when unclaimed. + */ + String owner; + + /** + * Construct a new, unclaimed instance. + * + * @param rank the orderable rank + */ + public Task(int rank) { + this.rank = rank; + this.claimed = false; + } + } + +} diff --git a/src/test/java/com/cinchapi/runway/FindUniqueAndEditTest.java b/src/test/java/com/cinchapi/runway/FindUniqueAndEditTest.java new file mode 100644 index 0000000..7937ddd --- /dev/null +++ b/src/test/java/com/cinchapi/runway/FindUniqueAndEditTest.java @@ -0,0 +1,252 @@ +/* + * Copyright (c) 2013-2026 Cinchapi Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.cinchapi.runway; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import com.cinchapi.common.reflect.Reflection; +import com.cinchapi.concourse.DuplicateEntryException; +import com.cinchapi.concourse.lang.Criteria; +import com.cinchapi.concourse.thrift.Operator; + +/** + * Tests for + * {@link Runway#findUniqueAndEdit(Class, Criteria, java.util.function.Consumer) + * findUniqueAndEdit}. Each test runs under both Command-API modes (bulk enabled + * and disabled); the atomic edit itself always uses the incremental, + * synchronously-staged transaction path, so the matrix additionally guards the + * surrounding save and load operations under both modes. + * + * @author Javier Lores + */ +@RunWith(Parameterized.class) +public class FindUniqueAndEditTest extends RunwayBaseClientServerTest { + + /** + * Return the parameter matrix that drives each test once per Command-API + * capability. + * + * @return one row with bulk commands enabled and one with it disabled + */ + @Parameters(name = "bulkCommands={0}") + public static Collection parameters() { + return Arrays.asList(new Object[][] { { true }, { false } }); + } + + private final boolean useBulkCommands; + + /** + * Construct a new instance. + * + * @param useBulkCommands {@code true} to exercise the bulk Command-API read + * path; {@code false} for the incremental path + */ + public FindUniqueAndEditTest(boolean useBulkCommands) { + this.useBulkCommands = useBulkCommands; + } + + @Override + protected void beforeTestRun() { + super.beforeTestRun(); + Reflection.set("supportsBulkCommands", useBulkCommands, runway); // (authorized) + } + + /** + * Goal: Verify that {@code findUniqueAndEdit} edits the + * sole matching record and durably persists the edit. + *

+ * Start state: Three {@link Item Items} with distinct + * codes, exactly one of which matches the criteria. + *

+ * Workflow: + *

+ *

+ * Expected: The returned {@link Item} has code 2 and + * {@code owner == "worker"}, and the re-loaded {@link Item} shows the same + * persisted {@code owner}. + */ + @Test + public void testFindUniqueAndEditEditsSoleMatchAndPersists() { + runway.save(new Item(1), new Item(2), new Item(3)); + Item item = runway.findUniqueAndEdit(Item.class, code(2), + i -> i.owner = "worker"); + Assert.assertNotNull(item); + Assert.assertEquals(2, item.code); + Assert.assertEquals("worker", item.owner); + Item reloaded = runway.load(Item.class, item.id()); + Assert.assertEquals("worker", reloaded.owner); + } + + /** + * Goal: Verify that {@code findUniqueAndEdit} returns + * {@code null} and never invokes the consumer when no record matches. + *

+ * Start state: Three {@link Item Items} none of which + * matches the criteria. + *

+ * Workflow: + *

+ *

+ * Expected: The result is {@code null} and the consumer + * never ran. + */ + @Test + public void testFindUniqueAndEditReturnsNullAndSkipsConsumerWhenNoMatch() { + runway.save(new Item(1), new Item(2), new Item(3)); + AtomicBoolean consumerRan = new AtomicBoolean(false); + Item item = runway.findUniqueAndEdit(Item.class, code(99), + i -> consumerRan.set(true)); + Assert.assertNull(item); + Assert.assertFalse(consumerRan.get()); + } + + /** + * Goal: Verify that {@code findUniqueAndEdit} throws + * {@link DuplicateEntryException} when more than one record matches, and + * that neither the mutation nor a commit occurs. + *

+ * Start state: Two {@link Item Items} that share the same + * code. + *

+ * Workflow: + *

+ *

+ * Expected: A {@link DuplicateEntryException} is thrown + * and neither re-loaded {@link Item} has an {@code owner} set. + */ + @Test + public void testFindUniqueAndEditThrowsOnDuplicateWithoutEditing() { + Item one = new Item(7); + Item two = new Item(7); + runway.save(one, two); + boolean threw = false; + try { + runway.findUniqueAndEdit(Item.class, code(7), + i -> i.owner = "worker"); + } + catch (DuplicateEntryException e) { + threw = true; + } + Assert.assertTrue(threw); + Assert.assertNull(runway.load(Item.class, one.id()).owner); + Assert.assertNull(runway.load(Item.class, two.id()).owner); + } + + /** + * Goal: Verify that {@code findUniqueAndEdit} still + * detects a duplicate match when the server cannot paginate natively, so + * the unique guard does not depend on server-side pagination. + *

+ * Start state: A {@link Runway} forced onto the legacy + * path (no native sorting/pagination) with two {@link Item Items} that + * share the same code. + *

+ * Workflow: + *

+ *

+ * Expected: A {@link DuplicateEntryException} is thrown + * and neither re-loaded {@link Item} has an {@code owner} set. + */ + @Test + public void testFindUniqueAndEditThrowsOnDuplicateOnLegacyServer() { + Reflection.set("hasNativeSortingAndPagination", false, runway); // (authorized) + Item one = new Item(7); + Item two = new Item(7); + runway.save(one, two); + boolean threw = false; + try { + runway.findUniqueAndEdit(Item.class, code(7), + i -> i.owner = "worker"); + } + catch (DuplicateEntryException e) { + threw = true; + } + Assert.assertTrue(threw); + Assert.assertNull(runway.load(Item.class, one.id()).owner); + Assert.assertNull(runway.load(Item.class, two.id()).owner); + } + + /** + * Return a {@link Criteria} matching every {@link Item} whose {@code code} + * equals the given {@code value}. + * + * @param value the code to match + * @return the {@code code == value} {@link Criteria} + */ + private static Criteria code(int value) { + return Criteria.where().key("code").operator(Operator.EQUALS) + .value(value).build(); + } + + /** + * A {@link Record} with a queryable {@code code} and an editable + * {@code owner}. + * + * @author Javier Lores + */ + public static class Item extends Record { + + /** + * The queryable code. + */ + int code; + + /** + * The editable owner, or {@code null} when unset. + */ + String owner; + + /** + * Construct a new instance. + * + * @param code the queryable code + */ + public Item(int code) { + this.code = code; + } + } + +}