Skip to content

Add *AndEdit atomic read-modify-write methods (findAndEdit, findUniqueAndEdit, findFirstAndEdit) #140

Description

@jtnelson

Summary

I propose a new *AndEdit family of atomic read-modify-write methods on the
Runway database interface: findAndEdit, findUniqueAndEdit, and
findFirstAndEdit. Each one takes the usual find arguments (a Class, a
Criteria, and, where applicable, an Order) plus a consumer that mutates the
matched record(s). The method finds the record(s), applies the consumer, saves,
and commits as a single atomic unit, retrying on write conflict.

This exists because there is no read-modify-write or compare-and-set primitive
in Runway today. The only transactional primitive is
Runway.save(boolean preventStaleWrites, Record...)
(src/main/java/com/cinchapi/runway/Runway.java:842), which makes the
write atomic but performs its read (the find/load that produced the
records) in a separate, earlier transaction. A caller who wants
"find the matching record, change it, and persist that change without anyone
else slipping in between" has to assemble that themselves, and Runway gives
them no way to make the read and the write one transaction.

The driving use case comes from a new data-sync capability in cinchapi-server,
a load-balanced service that scales across Kubernetes pods with no external
coordinator. Connector configuration (sources, credentials, schedule, and a
lock field) lives in a shared Concourse instance. Each instance runs a loop
that atomically claims the first available connection (one whose lock is unset,
or whose lock is stale because the previous holder died), records the lock,
runs the sync, then releases the lock. The hard requirement is mutual
exclusion: two instances must never claim the same connection. findFirstAndEdit
over a candidate set, with a real single-transaction guarantee, is the
primitive that makes that safe. This ticket adds only the Runway primitives;
cinchapi-server composes the lock condition (unset or stale), the ordering, and
the timeout window itself (see Scope and dependencies).

Proposed API / Syntax

I propose adding these methods. The requester asked specifically for
Consumer<Record>; I recommend the typed Consumer<T> as an ergonomic
generalization so the consumer receives the record already typed as T and can
call its domain methods without a cast. The Consumer<Record> form is the
baseline and would work, but it forces every call site to downcast inside the
lambda; Consumer<T> is strictly more convenient and no less expressive. I
recommend Consumer<T>.

// Edit every matching record; all edits commit in one transaction.
<T extends Record> Set<T> findAndEdit(
        Class<T> clazz, Criteria criteria, Consumer<T> consumer);

// Edit the single matching record; throw if more than one matches.
<T extends Record> T findUniqueAndEdit(
        Class<T> clazz, Criteria criteria, Consumer<T> consumer);

// Edit the first matching record in the given order.
<T extends Record> T findFirstAndEdit(
        Class<T> clazz, Criteria criteria, Order order,
        Consumer<T> consumer);

Semantics:

  • findAndEdit returns the Set of edited records, all saved in one
    transaction (all-or-nothing: either every edit commits or none do). When no
    record matches, it returns an empty set, the consumer is never invoked, and
    no transaction is committed. The returned set preserves the iteration order
    of the underlying find.
  • findUniqueAndEdit returns the edited record, or null when nothing
    matches. It throws DuplicateEntryException when more than one record
    matches, consistent with findUnique
    (src/main/java/com/cinchapi/runway/DatabaseInterface.java:1106). The
    duplicate check happens inside the transaction, before the consumer runs, so
    a violation neither mutates nor commits.
  • findFirstAndEdit returns the first matching record under order, or
    null when nothing matches. Runway has no server-side first primitive (the
    driver exposes find(Criteria, Order) at
    concourse-driver-java/.../Concourse.java:1002 but no findFirst), so
    "first" means the first element of the ordered match set. order is
    required; the variant is meaningless without a deterministic ordering, and
    the claim-the-oldest-lock use case depends on it.

Atomicity and contention (applies to all three):

  • The find, the consumer application, and the save run as one transaction.
    Concurrent callers contending for the same record(s) are serialized by
    Concourse's just-in-time read locking plus version-change preemption: the
    read taken during the find conflicts with a competing write, so at most one
    contender commits and the rest are preempted.
  • On a write conflict, the method aborts and retries the whole cycle
    (re-find, re-apply, re-save) up to a bounded number of attempts with jittered
    backoff. The find is re-run on every attempt so the consumer always observes
    fresh state. When the attempt budget is exhausted without a successful
    commit, the method surfaces the failure rather than silently returning a
    non-committed record (the exact exhaustion signal is an open decision; see
    Implementation pointers).
  • Because records are not identity-mapped (every find returns a new instance,
    documented at
    src/main/java/com/cinchapi/runway/DatabaseInterface.java:1149), the consumer
    must mutate the instance it is handed; it must not close over a record loaded
    earlier. On abort, in-memory field edits are not rolled back, which is
    exactly why each retry re-finds a fresh instance instead of reusing the
    mutated one.
  • The consumer carries the contract that the persisted edit is a pure function
    of the record's current state. It may run more than once (on retry) and must
    be safe to do so. It runs before the save's validation, so an edit that
    violates a Required/Unique constraint surfaces as an exception from the
    save path, not from the consumer.

Examples

Claim-and-lock the first available connector, the cinchapi-server use case.
cinchapi-server builds the candidate criteria (the canonical form is
locked[<cutoff>~] = null, combining the modification-timestamp binding in
cinchapi/concourse#790 with the null value sentinel in cinchapi/concourse#791)
and the ordering (oldest lastRun first); the Runway primitive guarantees only
one instance wins:

String instanceId = ...; // this pod's identity
Criteria available = ...; // composed by cinchapi-server: unlocked OR stale
Order oldestFirst = Order.by("lastRun").ascending().build();

Connector claimed = runway.findFirstAndEdit(
        Connector.class, available, oldestFirst,
        connector -> connector.set("lockedBy", instanceId));

if(claimed != null) {
    try {
        runSync(claimed);
    }
    finally {
        // release; another instance may now claim it
        runway.findFirstAndEdit(Connector.class,
                Criteria.where().key("id").operator(Operator.EQUALS)
                        .value(claimed.id()).build(),
                Order.by("id").build(),
                connector -> connector.set("lockedBy", null));
    }
}
else {
    // nothing available right now
}

Two pods racing on the same candidate set: each call stages its own
transaction, both read the row, both try to write lockedBy; Concourse lets
exactly one commit and preempts the other, which retries, re-runs the find,
finds the row no longer available, and (depending on the now-narrower candidate
set) either claims a different connector or returns null. Neither pod ever
runs the sync for a connector the other pod holds.

Edit every matching record atomically:

Set<Invoice> reopened = runway.findAndEdit(
        Invoice.class,
        Criteria.where().key("status").operator(Operator.EQUALS)
                .value("VOID").build(),
        invoice -> invoice.set("status", "OPEN"));

Implementation pointers

This is a planning ticket; the following is the recommended approach with the
relevant seams.

I present two designs and recommend (A).

(A) True single-transaction read-modify-write (recommended). Request one
connection from the pool, stage it, run the find on that same staged
connection, apply the consumer, save within that same transaction, commit, and
on conflict abort and retry. This is the only design that delivers real mutual
exclusion, because the find's read and the save's write share one transaction
and therefore one set of Concourse locks. It mirrors how the Concourse server
composes findOrAddKeyValue and verifyOrSet server-side, but assembled at the
client level via Concourse.stage(), which does not auto-retry (a failed
transaction surfaces as TransactionException), so Runway must own the retry
loop.

The seams that make (A) feasible already exist:

  • The save loop at src/main/java/com/cinchapi/runway/Runway.java:842 is the
    template for the retry/abort structure: it requests one Concourse from
    connections (Runway.java:414), builds a Saver
    (BatchSaver when supportsBulkCommands, else IncrementalSaver,
    Runway.java:858), stage()s, calls
    Record.saveWithinTransaction(...) (Record.java:2253) per root, commit()s,
    abort()s on throwable, retries TransactionException up to
    MAX_SPURIOUS_SAVE_RETRIES (Runway.java:371), and releases the connection
    in finally.
  • The crucial enabling fact: a Reader can bind to a caller-owned, already
    staged connection. AbstractReader(Concourse connection)
    (src/main/java/com/cinchapi/runway/db/AbstractReader.java:86) and
    IncrementalReader(Concourse connection)
    (src/main/java/com/cinchapi/runway/db/IncrementalReader.java:59) take a
    connection and do not stage it; only the Saver stages. So the new code
    path can: request a connection, build a Saver over it and call stage()
    (IncrementalSaver.stage() calls concourse.stage(),
    src/main/java/com/cinchapi/runway/db/IncrementalSaver.java:63), then bind an
    IncrementalReader to that same connection to run the find inside the
    staged transaction, apply the consumer to the resulting record(s), call
    saveWithinTransaction on the same Saver, then commit(). This removes the
    blocker that the normal find path requests its own pool connection: here the
    Reader and Saver are deliberately bound to one connection.
  • For the typed consumer and the find/findUnique/findFirst shaping, reuse
    the existing resolution rather than reimplementing it: find returns a
    Set<T> (DatabaseInterface.java:330), findUnique returns T and raises
    DuplicateEntryException (DatabaseInterface.java:1106), and ordering flows
    through the FindSelection resolution (Runway.java:1532). findFirst is the
    first element of the ordered set.

Retry policy: bound the attempts and add jittered backoff. The Concourse server's
internal supply-with-retry loops unbounded and without backoff; an unbounded
client retry on a single hot record would create a retry storm. Claiming over a
set of candidate records naturally disperses contention (each loser re-finds
and tends to pick a different row), so the bound primarily guards the
degenerate single-hot-record case. Decide and document the exhaustion signal
(an exception versus a sentinel return); I lean toward an exception so a failed
claim is never mistaken for "nothing matched."

(B) Simpler fallback (not recommended as the primary path).
find then consumer.accept then save(true, records). This reuses
save(...) unchanged: the write is atomic and stale writes are rejected via
the existing preventStaleWrites audit, which throws StaleDataException
(src/main/java/com/cinchapi/runway/StaleDataException.java) when a record
changed since it was loaded. But the read is a prior, separate transaction, so
a competing claim is only detected after the fact, at save time; the
*AndEdit method must then catch StaleDataException, re-run the find, and
retry. This widens the contention window (read and write are not jointly locked)
and is strictly weaker than (A) for the mutual-exclusion use case. Document it as
the fallback if the single-connection binding in (A) proves problematic on some
server version.

Note on consumer timing across the two designs: in (A) the consumer runs inside
the transaction but still before the save's validation; in (B) it runs entirely
outside the save transaction. In both, beforeSave()/validators execute as part
of the save, so a consumer edit that violates Required/Unique surfaces from
the save, not from the consumer.

The three public methods belong alongside the existing find/save methods.
Whether they live as concrete methods on Runway (next to save) or as
defaults on DatabaseInterface depends on where the staged-connection seam can
be reached; the connection pool and Saver/Reader wiring live on Runway, so
the concrete implementation most naturally sits there with thin delegating
overloads.

Performance

  • Each successful call is one find plus one save in a single transaction: the
    same server round trips as a find followed by a save, with no extra reads
    beyond what preventStaleWrites already does in design (B), and none beyond
    the find in design (A).
  • The find must push the Criteria (and Order for findFirstAndEdit) down to
    the server and must not degrade into a full scan plus client-side filter. Use
    the existing criteria/order resolution (Runway.java:1532) so indexed lookups
    and native sorting are preserved; do not load the class and filter in memory.
  • findFirstAndEdit should request only what it needs. Where the server
    supports native ordering and pagination, the implementation can fetch the
    first match rather than materializing the full ordered set; where it does not,
    it falls back to ordering the match set client-side (mirroring the legacy path
    at Runway.java:1606). Either way it edits exactly one record.
  • Retries are the main performance risk. Bounding attempts and adding jittered
    backoff prevents an O(attempts) round-trip storm on a contended record.
    Contending over a candidate set keeps expected retries low because losers
    disperse to different rows; a single hot record is the worst case and is
    precisely what the bound caps.
  • findAndEdit over a large match set commits all edits in one transaction;
    this is intentional (all-or-nothing) but means transaction size grows with the
    match set. Callers who want to edit very large sets should scope the criteria
    accordingly. There is no O(n^2) behavior in the edit loop: it is one pass over
    the matched records.

Testing

This repo writes tests but does not run them (a live Concourse server is
required). Add tests under src/test/java/com/cinchapi/runway/ following the
existing conventions:

  • Extend the project base test class as the existing Runway tests do
    (AbstractRunwayTest, which extends RunwayBaseClientServerTest).
  • Parameterize over both saver paths with
    @RunWith(Parameterized.class) and a bulkCommands parameter, exactly as
    PreventStaleWriteTest
    (src/test/java/com/cinchapi/runway/PreventStaleWriteTest.java) does, so both
    BatchSaver and IncrementalSaver are exercised.
  • Give every @Test the mandatory Goal / Start state / Workflow / Expected
    Javadoc block.

Behavior tests to add:

  • findUniqueAndEdit returns null when nothing matches and the consumer never
    runs; returns the edited record on a single match; throws
    DuplicateEntryException on more than one match without committing or
    mutating.
  • findFirstAndEdit edits exactly the first record under the given Order,
    returns null on no match, and requires an order.
  • findAndEdit edits all matches and persists them atomically; assert that a
    consumer that throws (or an edit that violates a constraint) leaves no
    record changed (all-or-nothing rollback).
  • Persistence check: after each call, re-load the record(s) from a fresh
    instance and assert the edit is durable.

Concurrency tests (the core guarantee), using CountDownLatch and
AtomicBoolean/AtomicReference to coordinate, as existing concurrency tests
in this repo do (e.g. SpuriousSaveFailureTest):

  • Two threads call findFirstAndEdit against the same single candidate record
    simultaneously; assert exactly one observes a successful claim and the other
    observes "not claimed" (or claims a different record), and that the stored
    lock field reflects exactly one winner. join() both threads before
    asserting; set a timeout to catch deadlock.
  • Two threads racing over a small candidate set each end up with a distinct
    record; no record is claimed twice.
  • A retry-exhaustion test that pins contention on one record and asserts the
    bounded-retry behavior and the chosen exhaustion signal.

Per repo conventions, write these tests but do not run ./gradlew test or
./gradlew build; run ./gradlew spotlessApply for formatting only.

Acceptance criteria

  • findAndEdit, findUniqueAndEdit, and findFirstAndEdit are added with
    Consumer<T extends Record> consumers.
  • Each method performs find, consumer application, and save as one atomic
    unit and retries on write conflict with a bounded attempt count and
    jittered backoff.
  • Design (A) (true single-transaction read-modify-write via a Reader and
    Saver bound to one staged connection) is implemented, or, if (A) is not
    viable on a supported server, (B) is implemented and the weaker contention
    guarantee is documented.
  • Return-type and edge-case contract holds: findAndEdit returns the edited
    set (empty when no match, consumer not run); findUniqueAndEdit returns
    the edited record or null, and throws DuplicateEntryException on more
    than one match without committing; findFirstAndEdit returns the first
    edited record under the required Order or null.
  • Each retry re-runs the find against a fresh record instance; in-memory
    edits from an aborted attempt are never persisted.
  • The exhaustion behavior when the retry budget is spent is defined and
    documented.
  • The find pushes criteria and order to the server and does not fall back to
    a full scan where native resolution is available.
  • Tests are written (not run) per repo conventions: parameterized over
    BatchSaver/IncrementalSaver, with the four-section Javadoc, including
    concurrency tests proving two threads cannot both claim the same record.
  • ./gradlew spotlessApply has been run and the code conforms to the
    project style.
  • Javadoc on every new method documents the atomicity, retry, null/empty,
    and consumer-re-execution contract without describing implementation
    mechanics.

Scope and dependencies

In scope for a first cut: the three *AndEdit methods as Runway primitives,
the single-transaction read-modify-write path, bounded jittered retry, and the
tests above.

Out of scope: any higher-level lock, lease, or claim abstraction. Runway stays
at the primitive level. cinchapi-server composes the lock condition (lock unset,
or lock value added longer ago than the timeout window, with stale-lock
detection evaluated at query time rather than by mutating stored state), the
lastRun ordering, and the timeout window itself, and calls these primitives.
Convenience overloads (for example a findAndEdit without an order, or
realm-scoped variants) can follow once the core three land.

Cross-repo dependency: findFirstAndEdit depends on the separate findFirst
feature in this same repository (cinchapi/runway) for the first-match variant
and its Order handling; the *AndEdit work should build on that rather than
duplicate first-match resolution. The consuming feature is the data-sync
connector-claiming capability in cinchapi-server, which is the motivating
caller but is not part of this ticket.

Related

Part of the connector data-sync locking initiative (cinchapi-server).

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Fields

    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions