Skip to content

Pending writes queue#4309

Open
ohadzeliger wants to merge 7 commits into
FoundationDB:mainfrom
ohadzeliger:pending-writes-wueue
Open

Pending writes queue#4309
ohadzeliger wants to merge 7 commits into
FoundationDB:mainfrom
ohadzeliger:pending-writes-wueue

Conversation

@ohadzeliger

@ohadzeliger ohadzeliger commented Jun 29, 2026

Copy link
Copy Markdown
Contributor

Create a write-pending-queue for use with the indexer when in write-only mode.
The created queue provides the following guarantees:

  • Conflict free enqueue and clear
  • Strict insertion order traversal (for the same incarnation)
  • Incarnation support for use after moving to a different FDB cluster
  • Conflict-free size getter
  • Transactionally consistant ensureEmpty operation
  • Generic payload extensible by caller (caller controls payload creation)
  • Configurable maximum size

Testing involves 3 test classes:

  • Correctness tests - ensure guarantees on conflicts and ordering are preserved
  • Size tests - ensure size getter matches queue size
  • Concurrency tests - random threads performing operations while preserving consistency

Resolves #4311

@jjezra jjezra added relational issues related to relational FDB indexer Online Indexer issues labels Jun 29, 2026
@ohadzeliger ohadzeliger self-assigned this Jun 29, 2026
@ohadzeliger ohadzeliger added the enhancement New feature or request label Jun 29, 2026
@jjezra jjezra marked this pull request as ready for review June 30, 2026 15:31

@jjezra jjezra left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this queue is unused, I'll be happy to have it in the code as a foundation to the write_only_with_queue development while adding checks and improvements.

Comment on lines +58 to +59
* <p>The queue is intended to hold pending operations that arrive while a background indexer
* (or any other background worker) is running. Front-end transactions enqueue items

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about -
The queue is intended to hold pending foreground operations that arrive while a background worker is running and possibly holding resources.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

/**
* Default maximum queue size; protects against unbounded growth on persistent failure.
*/
public static final int DEFAULT_MAX_QUEUE_SIZE = 10_000;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this default be increased to 100K?
Since the consequences are severe (either re-indexing the whole record store or rejecting user IO) the buffer should be big enough to absorb occasional indexing/merging failures that may keep an active queue for a while.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only used by one test, perhaps it should be moved there, rather than here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Changed to 100K. Assuming that all callers will actually tune on a per-case basis.

* read and the commit
*/
@Nonnull
public CompletableFuture<Boolean> ensureQueueEmpty(@Nonnull FDBRecordContext context) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isQueueEmpty might be a better name.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


try (FDBRecordContext txA = openContext()) {
// TX_A asserts the queue is empty
assertTrue(queue.ensureQueueEmpty(txA).join());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test where ensureQueueEmpty tx succeeds and the add item fails?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That will not fail as such - the caller should provide another flag to guarantee it (since the enqueue does not read the range)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment for the class

* versionstamped-key + atomic-counter idioms, not from anything queue-specific.</p>
*/
@Test
void testDrainScanAndClearDoesNotConflictWithConcurrentEnqueue() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose that it's a "nice to have" :)

* a way readers must distinguish. Readers reject entries whose stored version exceeds this
* constant.
*/
public static final int CURRENT_VERSION = 0;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: IIUC this means that the caller can't use the version field to adjust what they write, that must be within the payload.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Payload versioning does not come for free with this field, though a specific payload provider can affect the overall version for certain use cases if needed.

/**
* Default maximum queue size; protects against unbounded growth on persistent failure.
*/
public static final int DEFAULT_MAX_QUEUE_SIZE = 10_000;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only used by one test, perhaps it should be moved there, rather than here.

Comment on lines +89 to +91
* <li><b>Capacity.</b> A configurable maximum queue size; {@link #enqueue} fails with
* {@link PendingWritesQueueTooLargeException} once the size counter reaches the limit.
* A value of {@code 0} disables the cap.</li>

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps worth noting that this is approximate (since it is written at snapshot, you can end up with a bigger queue by the amount written by concurrent transactions)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* @param entry the entry to clear
*/
public void clearEntry(@Nonnull FDBRecordContext context, @Nonnull PendingWritesQueueEntry<T> entry) {
SplitHelper.deleteSplit(context, queueSubspace, entry.getKeyTuple(), true, false, false, null);

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does deleteSplit result in a read-conflict on the entry? Will two transactions conflict if they both try to clear the same entry? (other modifications aren't allowed, anyways)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally, two clear operations wouldn't conflict. Discussed offline and agreed that the correct system behavior is that they should. Added read conflict for the clearEntry and modified the test.

Comment on lines +251 to +253
* @throws com.apple.foundationdb.record.provider.foundationdb.FDBExceptions.FDBStoreTransactionConflictException
* via the transaction's commit if another transaction enqueued into the queue between this
* read and the commit

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is misleading. This method does not throw this exception, nor does the future. IMO the explanation in the paragraph should be sufficient

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, done.

RecordCursor<PendingWritesQueueEntry<TestQueuePayload>> cursor =
queue.getQueueCursor(context, scanProps, continuation);
RecordCursorResult<PendingWritesQueueEntry<TestQueuePayload>> last = null;
while (true) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use cursor.forEachResult to simplify this code a bit.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

last = next;
break;
}
collected.add(next.get());

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would probably be good to assert that this is 7, or at least that we actually apply a limit, and don't just read it all at once, but maybe the limiting is already tested below.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added assertion on the number of continuations hit

try (FDBRecordContext ctxA = openContext();
FDBRecordContext ctxB = openContext()) {
for (int i = 0; i < 4; i++) {
queue.enqueue(ctxA, payload("A-" + i), 0).join();

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you force a getReadVersion call?
I think it's possible that if these are all blind-writes, then GRV doesn't happen until commit, in which case they are not actually overlapping.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enqueue reads the queue size (via SNAPSHOT) so I think we should be ok

}

// While the drain transaction is still open, another transaction enqueues a fresh
// entry. It must commit cleanly because enqueue never conflicts with anything.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// entry. It must commit cleanly because enqueue never conflicts with anything.
// entry. It must commit cleanly because enqueue only conflicts with ensureQueueEmpty.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

assertScanThrows(readerQueue, RecordCoreStorageException.class);
}

private <T extends com.google.protobuf.Message> void assertScanThrows(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can Message be imported rather than fully-qualified.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually replaced by ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request indexer Online Indexer issues relational issues related to relational FDB

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Create a generic pending writes queue

3 participants