From 497d80e257a72f8a7e6304e62cfd8f3fe321b7b0 Mon Sep 17 00:00:00 2001 From: Javier Lores Date: Fri, 26 Jun 2026 10:41:38 -0400 Subject: [PATCH 1/2] feat: [GH-140] Add *AndEdit atomic read-modify-write methods Add findAndEdit, findUniqueAndEdit, and findFirstAndEdit to Runway as true single-transaction read-modify-write primitives: each binds a Reader and Saver to one staged connection so the find's read and the save's write share one transaction (and one set of locks), giving concurrent callers mutual exclusion. On write conflict the cycle is retried with jittered backoff up to the bounded attempt limit; on exhaustion a RetryExhaustedException is thrown so a contended claim is never mistaken for 'nothing matched'. The incremental path is used unconditionally because BatchSaver defers the server-side STAGE until commit time, which would place the read outside the transaction. Stacked on feature/GH-139 (findFirst). Tests written (not run) per repo policy; spotlessApply clean. --- .../runway/RetryExhaustedException.java | 59 +++ src/main/java/com/cinchapi/runway/Runway.java | 243 +++++++++++ .../com/cinchapi/runway/FindAndEditTest.java | 219 ++++++++++ .../cinchapi/runway/FindFirstAndEditTest.java | 382 ++++++++++++++++++ .../runway/FindUniqueAndEditTest.java | 211 ++++++++++ 5 files changed, 1114 insertions(+) create mode 100644 src/main/java/com/cinchapi/runway/RetryExhaustedException.java create mode 100644 src/test/java/com/cinchapi/runway/FindAndEditTest.java create mode 100644 src/test/java/com/cinchapi/runway/FindFirstAndEditTest.java create mode 100644 src/test/java/com/cinchapi/runway/FindUniqueAndEditTest.java diff --git a/src/main/java/com/cinchapi/runway/RetryExhaustedException.java b/src/main/java/com/cinchapi/runway/RetryExhaustedException.java new file mode 100644 index 0000000..2e81972 --- /dev/null +++ b/src/main/java/com/cinchapi/runway/RetryExhaustedException.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2013-2026 Cinchapi Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.cinchapi.runway; + +/** + * A {@link RetryExhaustedException} is thrown by an atomic read-modify-write + * operation ({@code findAndEdit}, {@code findUniqueAndEdit}, and + * {@code findFirstAndEdit}) when it cannot commit because it lost the + * write-conflict race on every attempt up to the bounded retry limit. + *

+ * This is semantically distinct from a {@code null} or empty result, which + * means no record matched. A {@link RetryExhaustedException} means matching + * record(s) existed but persistent contention prevented this caller from + * committing its edit; the caller may back off and retry. + * + * @author Javier Lores + */ +@SuppressWarnings("serial") +public class RetryExhaustedException extends RunwayException { + + /** + * The number of attempts that were made before giving up. + */ + private final int attempts; + + /** + * Construct a new instance. + * + * @param attempts the number of attempts that were made before giving up + */ + public RetryExhaustedException(int attempts) { + super("Failed to atomically commit after " + attempts + + " attempts due to persistent write contention"); + this.attempts = attempts; + } + + /** + * Return the number of attempts that were made before giving up. + * + * @return the attempt count + */ + public int attempts() { + return attempts; + } + +} diff --git a/src/main/java/com/cinchapi/runway/Runway.java b/src/main/java/com/cinchapi/runway/Runway.java index bede6dc..d029675 100644 --- a/src/main/java/com/cinchapi/runway/Runway.java +++ b/src/main/java/com/cinchapi/runway/Runway.java @@ -37,6 +37,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; @@ -370,6 +371,12 @@ private static void restore(Map snapshot) { */ private static final int MAX_SPURIOUS_SAVE_RETRIES = 5; + /** + * The base interval, in milliseconds, for the jittered exponential backoff + * between atomic read-modify-write retries. + */ + private static final long RETRY_BACKOFF_BASE_MILLIS = 10; + /** * The development {@link Version} sentinel; a server reporting this version * is treated as providing every feature, regardless of the {@link Version} @@ -964,6 +971,242 @@ public boolean save(Record... records) { return save(false, records); } + /** + * Atomically find every {@link Record} of type {@code clazz} that matches + * the {@code criteria}, apply the {@code consumer} to each, and persist all + * of the edits as a single transaction. + *

+ * The find, the {@code consumer} application, and the save run as one + * transaction: either every edit commits or none do. When no record + * matches, the {@code consumer} is never invoked, nothing is committed, and + * an empty {@link Set} is returned. The returned {@link Set} preserves the + * iteration order of the underlying find. + *

+ * On a write conflict the whole cycle (re-find, re-apply, re-save) is + * retried with jittered backoff up to a bounded number of attempts, so the + * {@code consumer} may run more than once and must be safe to do so; it + * must mutate the {@link Record} it is handed rather than one captured + * earlier. The {@code consumer} runs before the save's validation, so an + * edit that violates a {@code Required}/{@code Unique} constraint surfaces + * from the save path. + * + * @param clazz the {@link Record} type to find + * @param criteria the {@link Criteria} the records must match + * @param consumer the mutation to apply to each matching {@link Record} + * @return the {@link Set} of edited {@link Record Records}, empty when none + * matched + * @throws RetryExhaustedException if the edit cannot commit within the + * bounded number of attempts due to persistent contention + */ + public Set findAndEdit(Class clazz, + Criteria criteria, Consumer consumer) { + return editWithinTransaction(clazz, false, criteria, null, null, + consumer, found -> {}); + } + + /** + * Atomically find the one {@link Record} of type {@code clazz} that matches + * the {@code criteria}, apply the {@code consumer} to it, and persist the + * edit as a single transaction. + *

+ * Returns the edited {@link Record}, or {@code null} when nothing matches + * (in which case the {@code consumer} is never invoked and nothing is + * committed). Throws {@link DuplicateEntryException} when more than one + * record matches, consistent with + * {@link DatabaseInterface#findUnique(Class, Criteria) findUnique}; the + * duplicate check happens inside the transaction before the + * {@code consumer} runs, so a violation neither mutates nor commits. + *

+ * On a write conflict the whole cycle is retried with jittered backoff up + * to a bounded number of attempts, so the {@code consumer} may run more + * than once and must be safe to do so; it must mutate the {@link Record} it + * is handed rather than one captured earlier. + * + * @param clazz the {@link Record} type to find + * @param criteria the {@link Criteria} the record must match + * @param consumer the mutation to apply to the matching {@link Record} + * @return the edited {@link Record}, or {@code null} if none matches + * @throws DuplicateEntryException if more than one record matches + * @throws RetryExhaustedException if the edit cannot commit within the + * bounded number of attempts due to persistent contention + */ + public T findUniqueAndEdit(Class clazz, + Criteria criteria, Consumer consumer) { + Set edited = editWithinTransaction(clazz, false, criteria, null, + DatabaseInterface.UNIQUE_PAGINATION, consumer, found -> { + if(found.size() > 1) { + throw duplicateEntryException( + "Multiple records match {} in {}", criteria, + clazz); + } + }); + return Iterables.getFirst(edited, null); + } + + /** + * Atomically find the first {@link Record} of type {@code clazz} that + * matches the {@code criteria} under the supplied {@code order}, apply the + * {@code consumer} to it, and persist the edit as a single transaction. + *

+ * "First" is defined entirely by {@code order}, which is required. Returns + * the edited {@link Record}, or {@code null} when nothing matches (in which + * case the {@code consumer} is never invoked and nothing is committed). + *

+ * This is the claim-and-update primitive: concurrent callers contending for + * the same record are serialized by the database, so at most one commits + * and the rest are preempted and retried. On a write conflict the whole + * cycle (re-find under {@code order}, re-apply, re-save) is retried with + * jittered backoff up to a bounded number of attempts, so the + * {@code consumer} may run more than once and must be safe to do so; it + * must mutate the {@link Record} it is handed rather than one captured + * earlier. + * + * @param clazz the {@link Record} type to find + * @param criteria the {@link Criteria} the record must match + * @param order the {@link Order} that defines "first" + * @param consumer the mutation to apply to the matching {@link Record} + * @return the edited {@link Record}, or {@code null} if none matches + * @throws RetryExhaustedException if the edit cannot commit within the + * bounded number of attempts due to persistent contention + */ + public T findFirstAndEdit(Class clazz, + Criteria criteria, Order order, Consumer consumer) { + Preconditions.checkNotNull(order, "findFirstAndEdit requires an Order"); + Set edited = editWithinTransaction(clazz, false, criteria, order, + Page.limit(1), consumer, found -> {}); + return Iterables.getFirst(edited, null); + } + + /** + * Perform an atomic find-modify-save: within a single staged transaction, + * find the records of type {@code clazz} matching {@code criteria} (under + * the optional {@code order} and {@code page}), let {@code validator} + * inspect the freshly read {@link Set} before any edit, apply + * {@code consumer} to each record, save, and commit. The find's read and + * the save's write share one transaction (and therefore one set of locks), + * which is what gives concurrent callers true mutual exclusion. + *

+ * On a {@link TransactionException} the cycle is aborted and retried with + * jittered backoff up to {@link #MAX_SPURIOUS_SAVE_RETRIES} attempts, each + * re-finding fresh records so the {@code consumer} always observes current + * state. When the attempt budget is exhausted a + * {@link RetryExhaustedException} is thrown rather than returning a + * non-committed result. + * + * @param clazz the {@link Record} type to find + * @param any whether to include the {@code clazz} hierarchy + * @param criteria the {@link Criteria} the records must match + * @param order the sort {@link Order}, or {@code null} + * @param page the {@link Page} limit, or {@code null} for all matches + * @param consumer the mutation applied to each matching {@link Record} + * @param validator a hook that inspects the freshly read {@link Set} inside + * the transaction before any edit (e.g. to enforce uniqueness) + * @return the {@link Set} of edited and committed {@link Record Records} + * @throws RetryExhaustedException if no attempt commits within the bound + */ + private Set editWithinTransaction(Class clazz, + boolean any, Criteria criteria, @Nullable Order order, + @Nullable Page page, Consumer consumer, + Consumer> validator) { + Concourse concourse = connections.request(); + try { + int attempts = 0; + while (true) { + // NOTE: The incremental path is used unconditionally (even when + // #supportsBulkCommands) because BatchSaver defers the + // server-side STAGE until commit time, which would place the + // find's read outside the transaction. IncrementalSaver stages + // synchronously, so the find's read and the save's write share + // one transaction (and one set of locks) — the property + // that gives concurrent callers mutual exclusion. + Saver saver = new IncrementalSaver(concourse); + try { + saver.stage(); + Set found; + // NOTE: The Reader is bound to the same staged connection + // as the Saver so the find reads inside the transaction; + // this deliberately bypasses the result cache used by the + // pooled #select path. + try (Reader reader = new IncrementalReader(concourse)) { + Read read = enqueueRead(reader, any, clazz, criteria, + order, page); + AtomicReference> ref = new AtomicReference<>(); + read.data + .then($data -> read.navigated + .then($navigated -> resolveLinkTargets( + reader, $data, $navigated)) + .map($targets -> instantiateAll(clazz, + any, $data, $targets))) + .onResolve(ref::set); + reader.drain(); + found = ref.get(); + } + validator.accept(found); + if(found.isEmpty()) { + saver.abort(); + return found; + } + Map seen = new HashMap<>(); + Map snapshots = new HashMap<>(); + for (T record : found) { + consumer.accept(record); + record.assign(this); + record.saveWithinTransaction(saver, seen, snapshots, + false); + } + if(saver.commit()) { + seen.entrySet().stream().filter(Entry::getValue) + .map(Entry::getKey).forEach(record -> { + enqueueSaveNotification(record); + record.checkpoint(); + }); + return found; + } + else { + // Trigger the retry path below. + throw new TransactionException(); + } + } + catch (TransactionException e) { + saver.abort(); + if(++attempts > MAX_SPURIOUS_SAVE_RETRIES) { + throw new RetryExhaustedException(attempts); + } + backoffWithJitter(attempts); + continue; + } + catch (Throwable t) { + // A non-transaction failure (e.g. a duplicate-entry or + // constraint violation) is terminal: abort and propagate + // without retrying so nothing is committed or mutated. + saver.abort(); + throw t; + } + } + } + finally { + connections.release(concourse); + } + } + + /** + * Sleep for a jittered, exponentially growing interval before the next + * atomic read-modify-write {@code attempt}, dispersing contending callers + * so they do not collide again in lockstep. + * + * @param attempt the attempt number that just failed (1-based) + */ + private void backoffWithJitter(int attempt) { + long ceiling = RETRY_BACKOFF_BASE_MILLIS * (1L << (attempt - 1)); + long delay = ThreadLocalRandom.current().nextLong(ceiling + 1); + try { + Thread.sleep(delay); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + /** * Search for records in {@code clazz} that match the search {@query} across * any of the provided {@code keys}. diff --git a/src/test/java/com/cinchapi/runway/FindAndEditTest.java b/src/test/java/com/cinchapi/runway/FindAndEditTest.java new file mode 100644 index 0000000..c923402 --- /dev/null +++ b/src/test/java/com/cinchapi/runway/FindAndEditTest.java @@ -0,0 +1,219 @@ +/* + * Copyright (c) 2013-2026 Cinchapi Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.cinchapi.runway; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import com.cinchapi.common.reflect.Reflection; +import com.cinchapi.concourse.lang.Criteria; +import com.cinchapi.concourse.thrift.Operator; + +/** + * Tests for + * {@link Runway#findAndEdit(Class, Criteria, java.util.function.Consumer) + * findAndEdit}. Each test runs under both Command-API modes (bulk enabled and + * disabled); the atomic edit itself always uses the incremental, + * synchronously-staged transaction path, so the matrix additionally guards the + * surrounding save and load operations under both modes. + * + * @author Javier Lores + */ +@RunWith(Parameterized.class) +public class FindAndEditTest extends RunwayBaseClientServerTest { + + /** + * Return the parameter matrix that drives each test once per Command-API + * capability. + * + * @return one row with bulk commands enabled and one with it disabled + */ + @Parameters(name = "bulkCommands={0}") + public static Collection parameters() { + return Arrays.asList(new Object[][] { { true }, { false } }); + } + + private final boolean useBulkCommands; + + /** + * Construct a new instance. + * + * @param useBulkCommands {@code true} to exercise the bulk Command-API read + * path; {@code false} for the incremental path + */ + public FindAndEditTest(boolean useBulkCommands) { + this.useBulkCommands = useBulkCommands; + } + + @Override + protected void beforeTestRun() { + super.beforeTestRun(); + Reflection.set("supportsBulkCommands", useBulkCommands, runway); // (authorized) + } + + /** + * Goal: Verify that {@code findAndEdit} edits every + * matching record and durably persists all of the edits. + *

+ * Start state: Three {@link Doc Docs} that all match the + * criteria. + *

+ * Workflow: + *

+ *

+ * Expected: Three {@link Doc Docs} are returned, each with + * {@code owner == "worker"}, and every re-loaded {@link Doc} shows the + * persisted {@code owner}. + */ + @Test + public void testFindAndEditEditsAllMatchesAndPersists() { + runway.save(new Doc(1), new Doc(2), new Doc(3)); + Set edited = runway.findAndEdit(Doc.class, rankPositive(), + doc -> doc.owner = "worker"); + Assert.assertEquals(3, edited.size()); + for (Doc doc : edited) { + Assert.assertEquals("worker", doc.owner); + Assert.assertEquals("worker", + runway.load(Doc.class, doc.id()).owner); + } + } + + /** + * Goal: Verify that {@code findAndEdit} returns an empty + * {@link Set} and never invokes the consumer when no record matches. + *

+ * Start state: Three {@link Doc Docs} none of which + * matches the criteria. + *

+ * Workflow: + *

+ *

+ * Expected: The returned {@link Set} is empty and the + * consumer never ran. + */ + @Test + public void testFindAndEditReturnsEmptyAndSkipsConsumerWhenNoMatch() { + runway.save(new Doc(1), new Doc(2), new Doc(3)); + AtomicBoolean consumerRan = new AtomicBoolean(false); + Set edited = runway.findAndEdit( + Doc.class, Criteria.where().key("rank") + .operator(Operator.GREATER_THAN).value(100).build(), + doc -> consumerRan.set(true)); + Assert.assertTrue(edited.isEmpty()); + Assert.assertFalse(consumerRan.get()); + } + + /** + * Goal: Verify the all-or-nothing guarantee: when the + * consumer throws partway through the match set, no record's edit is + * persisted. + *

+ * Start state: Three {@link Doc Docs} that all match the + * criteria. + *

+ * Workflow: + *

+ *

+ * Expected: The exception propagates and none of the + * re-loaded {@link Doc Docs} has an {@code owner} set. + */ + @Test + public void testFindAndEditIsAllOrNothingWhenConsumerThrows() { + Doc one = new Doc(1); + Doc two = new Doc(2); + Doc three = new Doc(3); + runway.save(one, two, three); + boolean threw = false; + try { + runway.findAndEdit(Doc.class, rankPositive(), doc -> { + if(doc.rank == 2) { + throw new IllegalStateException("boom"); + } + doc.owner = "worker"; + }); + } + catch (IllegalStateException e) { + threw = true; + } + Assert.assertTrue(threw); + Assert.assertNull(runway.load(Doc.class, one.id()).owner); + Assert.assertNull(runway.load(Doc.class, two.id()).owner); + Assert.assertNull(runway.load(Doc.class, three.id()).owner); + } + + /** + * Return a {@link Criteria} matching every {@link Doc} whose {@code rank} + * is positive. + * + * @return the {@code rank > 0} {@link Criteria} + */ + private static Criteria rankPositive() { + return Criteria.where().key("rank").operator(Operator.GREATER_THAN) + .value(0).build(); + } + + /** + * A {@link Record} with a queryable {@code rank} and an editable + * {@code owner}. + * + * @author Javier Lores + */ + public static class Doc extends Record { + + /** + * The queryable rank. + */ + int rank; + + /** + * The editable owner, or {@code null} when unset. + */ + String owner; + + /** + * Construct a new instance. + * + * @param rank the queryable rank + */ + public Doc(int rank) { + this.rank = rank; + } + } + +} diff --git a/src/test/java/com/cinchapi/runway/FindFirstAndEditTest.java b/src/test/java/com/cinchapi/runway/FindFirstAndEditTest.java new file mode 100644 index 0000000..041fe41 --- /dev/null +++ b/src/test/java/com/cinchapi/runway/FindFirstAndEditTest.java @@ -0,0 +1,382 @@ +/* + * Copyright (c) 2013-2026 Cinchapi Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.cinchapi.runway; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import com.cinchapi.common.reflect.Reflection; +import com.cinchapi.concourse.Concourse; +import com.cinchapi.concourse.lang.Criteria; +import com.cinchapi.concourse.lang.sort.Order; +import com.cinchapi.concourse.thrift.Operator; + +/** + * Tests for + * {@link Runway#findFirstAndEdit(Class, Criteria, Order, java.util.function.Consumer) + * findFirstAndEdit}, the atomic claim-and-update primitive. Each test runs + * under both Command-API modes (bulk enabled and disabled); the atomic edit + * itself always uses the incremental, synchronously-staged transaction path, so + * the matrix additionally guards the surrounding save and load operations under + * both modes. + * + * @author Javier Lores + */ +@RunWith(Parameterized.class) +public class FindFirstAndEditTest extends RunwayBaseClientServerTest { + + /** + * Return the parameter matrix that drives each test once per Command-API + * capability. + * + * @return one row with bulk commands enabled and one with it disabled + */ + @Parameters(name = "bulkCommands={0}") + public static Collection parameters() { + return Arrays.asList(new Object[][] { { true }, { false } }); + } + + private final boolean useBulkCommands; + + /** + * Construct a new instance. + * + * @param useBulkCommands {@code true} to exercise the bulk Command-API read + * path; {@code false} for the incremental path + */ + public FindFirstAndEditTest(boolean useBulkCommands) { + this.useBulkCommands = useBulkCommands; + } + + @Override + protected void beforeTestRun() { + super.beforeTestRun(); + Reflection.set("supportsBulkCommands", useBulkCommands, runway); // (authorized) + } + + /** + * Goal: Verify that {@code findFirstAndEdit} edits exactly + * the record that sorts first under the {@link Order} and durably persists + * the edit. + *

+ * Start state: Three unclaimed {@link Task Tasks} with + * ranks 3, 2, and 1 saved in non-sorted insertion order. + *

+ * Workflow: + *

+ *

+ * Expected: The returned {@link Task} has rank 1 and + * {@code owner == "worker"}, and the re-loaded {@link Task} shows the same + * persisted {@code owner}. + */ + @Test + public void testFindFirstAndEditEditsFirstUnderOrderAndPersists() { + runway.save(new Task(3), new Task(2), new Task(1)); + Task first = runway.findFirstAndEdit(Task.class, unclaimed(), + Order.by("rank").ascending(), task -> task.owner = "worker"); + Assert.assertNotNull(first); + Assert.assertEquals(1, first.rank); + Assert.assertEquals("worker", first.owner); + Task reloaded = runway.load(Task.class, first.id()); + Assert.assertEquals("worker", reloaded.owner); + } + + /** + * Goal: Verify that {@code findFirstAndEdit} returns + * {@code null} and never invokes the consumer when no record matches. + *

+ * Start state: Three {@link Task Tasks} whose ranks are + * all below the criteria threshold. + *

+ * Workflow: + *

+ *

+ * Expected: The result is {@code null} and the consumer + * never ran. + */ + @Test + public void testFindFirstAndEditReturnsNullAndSkipsConsumerWhenNoMatch() { + runway.save(new Task(1), new Task(2), new Task(3)); + AtomicBoolean consumerRan = new AtomicBoolean(false); + Task first = runway.findFirstAndEdit(Task.class, + Criteria.where().key("rank").operator(Operator.GREATER_THAN) + .value(100).build(), + Order.by("rank").ascending(), task -> consumerRan.set(true)); + Assert.assertNull(first); + Assert.assertFalse(consumerRan.get()); + } + + /** + * Goal: Verify that {@code findFirstAndEdit} rejects a + * {@code null} {@link Order}, since "first" is undefined without one. + *

+ * Start state: A single unclaimed {@link Task}. + *

+ * Workflow: + *

+ *

+ * Expected: A {@link NullPointerException} is thrown. + */ + @Test(expected = NullPointerException.class) + public void testFindFirstAndEditRequiresOrder() { + runway.save(new Task(1)); + runway.findFirstAndEdit(Task.class, unclaimed(), null, + task -> task.owner = "worker"); + } + + /** + * Goal: Verify the core mutual-exclusion guarantee: two + * threads racing to claim the same single candidate cannot both succeed. + *

+ * Start state: One unclaimed {@link Task}. + *

+ * Workflow: + *

+ *

+ * Expected: Exactly one thread receives a non-null claim + * and the other receives {@code null}; the persisted {@code owner} equals + * the single winner's id. + */ + @Test + public void testFindFirstAndEditClaimsExactlyOneUnderConcurrency() + throws InterruptedException { + Task task = new Task(1); + runway.save(task); + long id = task.id(); + CountDownLatch ready = new CountDownLatch(2); + CountDownLatch go = new CountDownLatch(1); + AtomicReference claim1 = new AtomicReference<>(); + AtomicReference claim2 = new AtomicReference<>(); + Thread t1 = new Thread(() -> { + ready.countDown(); + try { + go.await(); + claim1.set(runway.findFirstAndEdit(Task.class, unclaimed(), + Order.by("rank").ascending(), t -> { + t.claimed = true; + t.owner = "one"; + })); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + Thread t2 = new Thread(() -> { + ready.countDown(); + try { + go.await(); + claim2.set(runway.findFirstAndEdit(Task.class, unclaimed(), + Order.by("rank").ascending(), t -> { + t.claimed = true; + t.owner = "two"; + })); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + t1.start(); + t2.start(); + ready.await(5, TimeUnit.SECONDS); + go.countDown(); + t1.join(10000); + t2.join(10000); + boolean oneWon = claim1.get() != null; + boolean twoWon = claim2.get() != null; + Assert.assertTrue("Exactly one thread must claim the task", + oneWon ^ twoWon); + Task reloaded = runway.load(Task.class, id); + Assert.assertTrue(reloaded.claimed); + Assert.assertEquals(oneWon ? "one" : "two", reloaded.owner); + } + + /** + * Goal: Verify that two threads racing over a candidate + * set of two records each claim a distinct record, so no record is claimed + * twice. + *

+ * Start state: Two unclaimed {@link Task Tasks} with ranks + * 1 and 2. + *

+ * Workflow: + *

+ *

+ * Expected: Both threads receive a non-null claim and the + * two claims are distinct {@link Task Tasks} (different ids). + */ + @Test + public void testFindFirstAndEditConcurrentClaimsTakeDistinctRecords() + throws InterruptedException { + runway.save(new Task(1), new Task(2)); + CountDownLatch ready = new CountDownLatch(2); + CountDownLatch go = new CountDownLatch(1); + AtomicReference claim1 = new AtomicReference<>(); + AtomicReference claim2 = new AtomicReference<>(); + Thread t1 = new Thread(() -> { + ready.countDown(); + try { + go.await(); + claim1.set(runway.findFirstAndEdit(Task.class, unclaimed(), + Order.by("rank").ascending(), t -> { + t.claimed = true; + t.owner = "one"; + })); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + Thread t2 = new Thread(() -> { + ready.countDown(); + try { + go.await(); + claim2.set(runway.findFirstAndEdit(Task.class, unclaimed(), + Order.by("rank").ascending(), t -> { + t.claimed = true; + t.owner = "two"; + })); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + t1.start(); + t2.start(); + ready.await(5, TimeUnit.SECONDS); + go.countDown(); + t1.join(10000); + t2.join(10000); + Assert.assertNotNull(claim1.get()); + Assert.assertNotNull(claim2.get()); + Assert.assertNotEquals(claim1.get().id(), claim2.get().id()); + } + + /** + * Goal: Verify that {@code findFirstAndEdit} throws + * {@link RetryExhaustedException} — rather than returning + * {@code null} — when it loses the commit race on every attempt. + *

+ * Start state: One unclaimed {@link Task}. + *

+ * Workflow: + *

+ *

+ * Expected: A {@link RetryExhaustedException} is thrown + * and the persisted {@code owner} was never set by this caller. + */ + @Test(expected = RetryExhaustedException.class) + public void testFindFirstAndEditThrowsRetryExhaustedUnderPersistentContention() { + Task task = new Task(1); + runway.save(task); + long id = task.id(); + runway.findFirstAndEdit(Task.class, unclaimed(), + Order.by("rank").ascending(), t -> { + // Force a conflicting external write on every attempt so + // the staged transaction can never commit. + Concourse other = runway.connections.request(); + try { + other.set("rank", t.rank + 1, id); + } + finally { + runway.connections.release(other); + } + t.owner = "worker"; + }); + } + + /** + * Return a {@link Criteria} matching every unclaimed {@link Task}, i.e. one + * whose {@code claimed} flag is {@code false}. + * + * @return the {@code claimed == false} {@link Criteria} + */ + private static Criteria unclaimed() { + return Criteria.where().key("claimed").operator(Operator.EQUALS) + .value(false).build(); + } + + /** + * A claimable unit of work with an orderable {@code rank}, a + * {@code claimed} flag, and an {@code owner} that records the claimant. + * + * @author Javier Lores + */ + public static class Task extends Record { + + /** + * The orderable rank; lowest is claimed first under ascending order. + */ + int rank; + + /** + * Whether this task has been claimed. + */ + boolean claimed; + + /** + * The id of the claimant, or {@code null} when unclaimed. + */ + String owner; + + /** + * Construct a new, unclaimed instance. + * + * @param rank the orderable rank + */ + public Task(int rank) { + this.rank = rank; + this.claimed = false; + } + } + +} diff --git a/src/test/java/com/cinchapi/runway/FindUniqueAndEditTest.java b/src/test/java/com/cinchapi/runway/FindUniqueAndEditTest.java new file mode 100644 index 0000000..2179812 --- /dev/null +++ b/src/test/java/com/cinchapi/runway/FindUniqueAndEditTest.java @@ -0,0 +1,211 @@ +/* + * Copyright (c) 2013-2026 Cinchapi Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.cinchapi.runway; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import com.cinchapi.common.reflect.Reflection; +import com.cinchapi.concourse.DuplicateEntryException; +import com.cinchapi.concourse.lang.Criteria; +import com.cinchapi.concourse.thrift.Operator; + +/** + * Tests for + * {@link Runway#findUniqueAndEdit(Class, Criteria, java.util.function.Consumer) + * findUniqueAndEdit}. Each test runs under both Command-API modes (bulk enabled + * and disabled); the atomic edit itself always uses the incremental, + * synchronously-staged transaction path, so the matrix additionally guards the + * surrounding save and load operations under both modes. + * + * @author Javier Lores + */ +@RunWith(Parameterized.class) +public class FindUniqueAndEditTest extends RunwayBaseClientServerTest { + + /** + * Return the parameter matrix that drives each test once per Command-API + * capability. + * + * @return one row with bulk commands enabled and one with it disabled + */ + @Parameters(name = "bulkCommands={0}") + public static Collection parameters() { + return Arrays.asList(new Object[][] { { true }, { false } }); + } + + private final boolean useBulkCommands; + + /** + * Construct a new instance. + * + * @param useBulkCommands {@code true} to exercise the bulk Command-API read + * path; {@code false} for the incremental path + */ + public FindUniqueAndEditTest(boolean useBulkCommands) { + this.useBulkCommands = useBulkCommands; + } + + @Override + protected void beforeTestRun() { + super.beforeTestRun(); + Reflection.set("supportsBulkCommands", useBulkCommands, runway); // (authorized) + } + + /** + * Goal: Verify that {@code findUniqueAndEdit} edits the + * sole matching record and durably persists the edit. + *

+ * Start state: Three {@link Item Items} with distinct + * codes, exactly one of which matches the criteria. + *

+ * Workflow: + *

+ *

+ * Expected: The returned {@link Item} has code 2 and + * {@code owner == "worker"}, and the re-loaded {@link Item} shows the same + * persisted {@code owner}. + */ + @Test + public void testFindUniqueAndEditEditsSoleMatchAndPersists() { + runway.save(new Item(1), new Item(2), new Item(3)); + Item item = runway.findUniqueAndEdit(Item.class, code(2), + i -> i.owner = "worker"); + Assert.assertNotNull(item); + Assert.assertEquals(2, item.code); + Assert.assertEquals("worker", item.owner); + Item reloaded = runway.load(Item.class, item.id()); + Assert.assertEquals("worker", reloaded.owner); + } + + /** + * Goal: Verify that {@code findUniqueAndEdit} returns + * {@code null} and never invokes the consumer when no record matches. + *

+ * Start state: Three {@link Item Items} none of which + * matches the criteria. + *

+ * Workflow: + *

+ *

+ * Expected: The result is {@code null} and the consumer + * never ran. + */ + @Test + public void testFindUniqueAndEditReturnsNullAndSkipsConsumerWhenNoMatch() { + runway.save(new Item(1), new Item(2), new Item(3)); + AtomicBoolean consumerRan = new AtomicBoolean(false); + Item item = runway.findUniqueAndEdit(Item.class, code(99), + i -> consumerRan.set(true)); + Assert.assertNull(item); + Assert.assertFalse(consumerRan.get()); + } + + /** + * Goal: Verify that {@code findUniqueAndEdit} throws + * {@link DuplicateEntryException} when more than one record matches, and + * that neither the mutation nor a commit occurs. + *

+ * Start state: Two {@link Item Items} that share the same + * code. + *

+ * Workflow: + *

+ *

+ * Expected: A {@link DuplicateEntryException} is thrown + * and neither re-loaded {@link Item} has an {@code owner} set. + */ + @Test + public void testFindUniqueAndEditThrowsOnDuplicateWithoutEditing() { + Item one = new Item(7); + Item two = new Item(7); + runway.save(one, two); + boolean threw = false; + try { + runway.findUniqueAndEdit(Item.class, code(7), + i -> i.owner = "worker"); + } + catch (DuplicateEntryException e) { + threw = true; + } + Assert.assertTrue(threw); + Assert.assertNull(runway.load(Item.class, one.id()).owner); + Assert.assertNull(runway.load(Item.class, two.id()).owner); + } + + /** + * Return a {@link Criteria} matching every {@link Item} whose {@code code} + * equals the given {@code value}. + * + * @param value the code to match + * @return the {@code code == value} {@link Criteria} + */ + private static Criteria code(int value) { + return Criteria.where().key("code").operator(Operator.EQUALS) + .value(value).build(); + } + + /** + * A {@link Record} with a queryable {@code code} and an editable + * {@code owner}. + * + * @author Javier Lores + */ + public static class Item extends Record { + + /** + * The queryable code. + */ + int code; + + /** + * The editable owner, or {@code null} when unset. + */ + String owner; + + /** + * Construct a new instance. + * + * @param code the queryable code + */ + public Item(int code) { + this.code = code; + } + } + +} From 3bdaae2771092ac3a3e30b7abf3604757b88bf84 Mon Sep 17 00:00:00 2001 From: Javier Lores Date: Mon, 29 Jun 2026 18:10:14 -0400 Subject: [PATCH 2/2] fix: [GH-140] Support legacy servers in the *AndEdit family + cleanups Apply Order/Page client-side in editWithinTransaction when the connected server cannot sort or paginate natively, mirroring the read-path fallback. The in-transaction find still reads (and therefore locks) every match, so atomicity and mutual exclusion are preserved; only the ordering/limit are applied client-side afterward via a new sortAndPage helper. This keeps findFirstAndEdit and the paginated guards correct on legacy servers, not just native-capable ones. Also: - Drop the unused `any` (hierarchy) parameter from editWithinTransaction; every call site passed false. - Honor interrupts during the retry backoff: restore the interrupt flag and abandon the retry loop instead of silently continuing to contend. - Add NOTE comments documenting the retry-on-every-TransactionException semantics and the connection-held-across-backoff choice. Tests: legacy-path coverage (hasNativeSortingAndPagination=false) for findFirstAndEdit client-side ordering and findUniqueAndEdit duplicate detection without native pagination. --- src/main/java/com/cinchapi/runway/Runway.java | 69 ++++++++++++++++--- .../cinchapi/runway/FindFirstAndEditTest.java | 40 ++++++++++- .../runway/FindUniqueAndEditTest.java | 41 +++++++++++ 3 files changed, 138 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/cinchapi/runway/Runway.java b/src/main/java/com/cinchapi/runway/Runway.java index d029675..1b5c331 100644 --- a/src/main/java/com/cinchapi/runway/Runway.java +++ b/src/main/java/com/cinchapi/runway/Runway.java @@ -1000,8 +1000,8 @@ public boolean save(Record... records) { */ public Set findAndEdit(Class clazz, Criteria criteria, Consumer consumer) { - return editWithinTransaction(clazz, false, criteria, null, null, - consumer, found -> {}); + return editWithinTransaction(clazz, criteria, null, null, consumer, + found -> {}); } /** @@ -1032,7 +1032,7 @@ public Set findAndEdit(Class clazz, */ public T findUniqueAndEdit(Class clazz, Criteria criteria, Consumer consumer) { - Set edited = editWithinTransaction(clazz, false, criteria, null, + Set edited = editWithinTransaction(clazz, criteria, null, DatabaseInterface.UNIQUE_PAGINATION, consumer, found -> { if(found.size() > 1) { throw duplicateEntryException( @@ -1072,7 +1072,7 @@ public T findUniqueAndEdit(Class clazz, public T findFirstAndEdit(Class clazz, Criteria criteria, Order order, Consumer consumer) { Preconditions.checkNotNull(order, "findFirstAndEdit requires an Order"); - Set edited = editWithinTransaction(clazz, false, criteria, order, + Set edited = editWithinTransaction(clazz, criteria, order, Page.limit(1), consumer, found -> {}); return Iterables.getFirst(edited, null); } @@ -1094,7 +1094,6 @@ public T findFirstAndEdit(Class clazz, * non-committed result. * * @param clazz the {@link Record} type to find - * @param any whether to include the {@code clazz} hierarchy * @param criteria the {@link Criteria} the records must match * @param order the sort {@link Order}, or {@code null} * @param page the {@link Page} limit, or {@code null} for all matches @@ -1105,11 +1104,21 @@ public T findFirstAndEdit(Class clazz, * @throws RetryExhaustedException if no attempt commits within the bound */ private Set editWithinTransaction(Class clazz, - boolean any, Criteria criteria, @Nullable Order order, - @Nullable Page page, Consumer consumer, - Consumer> validator) { + Criteria criteria, @Nullable Order order, @Nullable Page page, + Consumer consumer, Consumer> validator) { + // NOTE: Unlike #save, every TransactionException is retried here + // regardless of #spuriousSaveFailureStrategy and without a stale-data + // check. These primitives are built for write-conflict contention (the + // claim use case), where a lost commit race is exactly the condition a + // caller wants retried; staleness checks are off because the read is + // re-issued fresh inside each attempt's transaction. Concourse concourse = connections.request(); try { + // NOTE: The connection is held across the bounded backoff sleeps + // rather than released between attempts. The staged transaction + // lives on this connection, and re-requesting per attempt would + // churn the pool; the backoff is capped at a few hundred millis so + // the idle occupancy is brief. int attempts = 0; while (true) { // NOTE: The incremental path is used unconditionally (even when @@ -1122,25 +1131,37 @@ private Set editWithinTransaction(Class clazz, Saver saver = new IncrementalSaver(concourse); try { saver.stage(); + // NOTE: A legacy server cannot sort or paginate + // server-side, so the order/page are withheld from the + // find and applied client-side below. The find still reads + // (and therefore locks) every match inside the + // transaction, so the atomicity and mutual-exclusion + // guarantees are preserved. + boolean nativeOrderAndPage = hasNativeSortingAndPagination + || doesNotRequireSortingOrPagination(order, page); Set found; // NOTE: The Reader is bound to the same staged connection // as the Saver so the find reads inside the transaction; // this deliberately bypasses the result cache used by the // pooled #select path. try (Reader reader = new IncrementalReader(concourse)) { - Read read = enqueueRead(reader, any, clazz, criteria, - order, page); + Read read = enqueueRead(reader, false, clazz, criteria, + nativeOrderAndPage ? order : null, + nativeOrderAndPage ? page : null); AtomicReference> ref = new AtomicReference<>(); read.data .then($data -> read.navigated .then($navigated -> resolveLinkTargets( reader, $data, $navigated)) .map($targets -> instantiateAll(clazz, - any, $data, $targets))) + false, $data, $targets))) .onResolve(ref::set); reader.drain(); found = ref.get(); } + if(!nativeOrderAndPage) { + found = sortAndPage(found, order, page); + } validator.accept(found); if(found.isEmpty()) { saver.abort(); @@ -1203,8 +1224,34 @@ private void backoffWithJitter(int attempt) { Thread.sleep(delay); } catch (InterruptedException e) { + // Honor the interrupt by restoring the flag and abandoning the + // retry loop rather than silently continuing to contend. Thread.currentThread().interrupt(); + throw CheckedExceptions.throwAsRuntimeException(e); + } + } + + /** + * Apply {@code order} and {@code page} to {@code records} client-side, for + * the legacy-server path where the find could not sort or paginate + * server-side. + * + * @param records the records read for the match, in find order + * @param order the sort {@link Order} to apply, or {@code null} + * @param page the {@link Page} to apply, or {@code null} for all records + * @return the sorted and paginated {@link Set}, preserving iteration order + */ + private Set sortAndPage(Set records, + @Nullable Order order, @Nullable Page page) { + if(order != null) { + records = DatabaseInterface.sort(records, + backwardsCompatible(order)); + } + if(page != null) { + records = records.stream().skip(page.skip()).limit(page.limit()) + .collect(Collectors.toCollection(LinkedHashSet::new)); } + return records; } /** diff --git a/src/test/java/com/cinchapi/runway/FindFirstAndEditTest.java b/src/test/java/com/cinchapi/runway/FindFirstAndEditTest.java index 041fe41..b444120 100644 --- a/src/test/java/com/cinchapi/runway/FindFirstAndEditTest.java +++ b/src/test/java/com/cinchapi/runway/FindFirstAndEditTest.java @@ -109,6 +109,41 @@ public void testFindFirstAndEditEditsFirstUnderOrderAndPersists() { Assert.assertEquals("worker", reloaded.owner); } + /** + * Goal: Verify that {@code findFirstAndEdit} applies the + * {@link Order} client-side and still edits exactly the record that sorts + * first when the server cannot sort or paginate natively. + *

+ * Start state: A {@link Runway} forced onto the legacy + * path (no native sorting/pagination) with three unclaimed {@link Task + * Tasks} of ranks 3, 2, and 1 saved in non-sorted insertion order. + *

+ * Workflow: + *

+ *

+ * Expected: The returned {@link Task} has rank 1 and + * {@code owner == "worker"}, and the re-loaded {@link Task} shows the same + * persisted {@code owner}. + */ + @Test + public void testFindFirstAndEditAppliesOrderClientSideOnLegacyServer() { + Reflection.set("hasNativeSortingAndPagination", false, runway); // (authorized) + runway.save(new Task(3), new Task(2), new Task(1)); + Task first = runway.findFirstAndEdit(Task.class, unclaimed(), + Order.by("rank").ascending(), task -> task.owner = "worker"); + Assert.assertNotNull(first); + Assert.assertEquals(1, first.rank); + Assert.assertEquals("worker", first.owner); + Task reloaded = runway.load(Task.class, first.id()); + Assert.assertEquals("worker", reloaded.owner); + } + /** * Goal: Verify that {@code findFirstAndEdit} returns * {@code null} and never invokes the consumer when no record matches. @@ -322,7 +357,10 @@ public void testFindFirstAndEditThrowsRetryExhaustedUnderPersistentContention() runway.findFirstAndEdit(Task.class, unclaimed(), Order.by("rank").ascending(), t -> { // Force a conflicting external write on every attempt so - // the staged transaction can never commit. + // the staged transaction can never commit. The nested + // request borrows a second connection while the edit holds + // its own; this is safe because Runway uses an expandable + // cached pool that grows on demand rather than blocking. Concourse other = runway.connections.request(); try { other.set("rank", t.rank + 1, id); diff --git a/src/test/java/com/cinchapi/runway/FindUniqueAndEditTest.java b/src/test/java/com/cinchapi/runway/FindUniqueAndEditTest.java index 2179812..7937ddd 100644 --- a/src/test/java/com/cinchapi/runway/FindUniqueAndEditTest.java +++ b/src/test/java/com/cinchapi/runway/FindUniqueAndEditTest.java @@ -168,6 +168,47 @@ public void testFindUniqueAndEditThrowsOnDuplicateWithoutEditing() { Assert.assertNull(runway.load(Item.class, two.id()).owner); } + /** + * Goal: Verify that {@code findUniqueAndEdit} still + * detects a duplicate match when the server cannot paginate natively, so + * the unique guard does not depend on server-side pagination. + *

+ * Start state: A {@link Runway} forced onto the legacy + * path (no native sorting/pagination) with two {@link Item Items} that + * share the same code. + *

+ * Workflow: + *

+ *

+ * Expected: A {@link DuplicateEntryException} is thrown + * and neither re-loaded {@link Item} has an {@code owner} set. + */ + @Test + public void testFindUniqueAndEditThrowsOnDuplicateOnLegacyServer() { + Reflection.set("hasNativeSortingAndPagination", false, runway); // (authorized) + Item one = new Item(7); + Item two = new Item(7); + runway.save(one, two); + boolean threw = false; + try { + runway.findUniqueAndEdit(Item.class, code(7), + i -> i.owner = "worker"); + } + catch (DuplicateEntryException e) { + threw = true; + } + Assert.assertTrue(threw); + Assert.assertNull(runway.load(Item.class, one.id()).owner); + Assert.assertNull(runway.load(Item.class, two.id()).owner); + } + /** * Return a {@link Criteria} matching every {@link Item} whose {@code code} * equals the given {@code value}.