Add commit retry and concurrency validation for writes#3320
Add commit retry and concurrency validation for writes#3320lawofcycles wants to merge 13 commits into
Conversation
Add automatic retry with exponential backoff when catalog commits fail due to concurrent transactions (CommitFailedException), and integrate the existing validation functions from validate.py into the write path to detect incompatible concurrent modifications (ValidationException). The retry loop is placed in Transaction.commit_transaction(). On each retry attempt, table metadata is refreshed, registered snapshot producers are re-executed to regenerate manifests, and data conflict validation is run. Uncommitted manifests from failed attempts are cleaned up after a successful commit. Validation is performed for _OverwriteFiles and _DeleteFiles based on the table's isolation level (serializable/snapshot). _FastAppendFiles and _MergeAppendFiles do not require validation since appends never conflict. Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Skip _validate_no_new_delete_files and _validate_deleted_data_files when conflict_detection_filter is None, matching Java's BaseOverwriteFiles.validate() behavior for rowFilter == AlwaysFalse(). Route isolation level property based on the calling operation. Transaction.delete() uses write.delete.isolation-level (default). Transaction.overwrite(), dynamic_partition_overwrite(), and upsert() use write.update.isolation-level via _isolation_level_property on the snapshot producer. Remove unused WRITE_MERGE_ISOLATION_LEVEL constant.` Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Use Operation enum instead of string literals for producer construction. Use .value for IsolationLevel string comparison to avoid unreachable statement warning. Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Fix _build_delete_files_partition_predicate overwriting _case_sensitive to True by passing the current value to delete_by_predicate. This caused case-insensitive deletes to fail when _OverwriteFiles was used with a user-specified predicate. Move import random/time to file top level. Add total timeout (commit.retry.total-timeout-ms) to the retry loop. Add comments for intentional validation duplication and cached_property clearing. Stabilize test_commit_retry_on_commit_failed by removing flaky patch.object assertion. Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
In CI, pyiceberg.table module is loaded twice, creating two distinct Transaction class objects. patch.object on the test-imported Transaction does not affect the runtime Transaction used by Table.append(). Fix by resolving Transaction from pyiceberg.table module at runtime. Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Benchmark resultsThis PR brings three capabilities to PyIceberg's write path.
To validate (3), I benchmarked concurrent appends using the NYC Yellow Taxi dataset (2024-01, 2.9M rows, 19 columns) with Glue Data Catalog + S3. Before vs AfterWithout this PR, concurrent appends fail immediately with
(N workers x 10 batches x 1K rows, Internal retry vs user-side retryCompared the internal retry (this PR) against a user-side retry that catches
(3 batches per worker, ~370K-1.5M rows per batch depending on worker count) Internal retry is faster because it reuses data files already written to S3 and only regenerates manifests on retry. User-side retry rewrites Parquet files on every attempt. Interestingly, internal retry actually performs more retries than user-side retry (88 vs 50 total retries at 8 workers), because the shorter retry window increases commit attempt density. Despite more retries, the total time is lower because each retry is much cheaper. Tuning
|
| min-wait-ms | Total time | Total retries |
|---|---|---|
| 100 | 158s | 78 |
| 500 | 126s | 115 |
| 1000 | 238s | 67 |
| 2000 | 235s | 63 |
| 3000 | 206s | 41 |
The default (100ms, matching Java Iceberg) works reasonably well, but 500ms is optimal for Glue. Too short causes contention storms, too long wastes time waiting. The optimal value depends on the catalog's commit latency.
qzyu999
left a comment
There was a problem hiding this comment.
Hi @lawofcycles, thanks so much for this amazing PR. I took a look and saw two spaces so far where there are some minor gaps that can be easily patched.
The first is regarding AssertTableUUID, where I notice a pattern of repetitively adding/removing it inside the retry loop for commit_transaction(). I believe this can be resolved simply by moving the addition part outside the for-loop.
The second is also regarding commit_transaction(), where in the case of an abort (e.g., ValidationException), there will be some orphaned manifest files. This can be easily fixed by adding a try/except around the for-loop itself, making sure upon failure that both _uncommitted_manifests and _written_manifests are cleared.
Thanks again for the great work, I look forward to #3320 merging so that I may integrate the changes into #3131, PTAL!
| for attempt in range(num_retries + 1): | ||
| try: | ||
| self._requirements += (AssertTableUUID(uuid=self.table_metadata.table_uuid),) |
There was a problem hiding this comment.
suggestion: Here AssertTableUUID is appended to self._requirements within each retry loop, but below in _rebuild_snapshot_updates it's removed again with:
self._requirements = tuple(r for r in self._requirements if not isinstance(r, (AssertRefSnapshotId, AssertTableUUID)))
This can be simplified by moving self._requirements += (AssertTableUUID(uuid=self.table_metadata.table_uuid),) outside the for-loop and updating the line in _rebuild_snapshot_updates to simply:
self._requirements = tuple(r for r in self._requirements if not isinstance(r, AssertRefSnapshotId))
The reason being is that AssertTableUUID would remain constant the whole time, so we're simply adding and removing it within each retry.
There was a problem hiding this comment.
Thanks for the suggestion. I moved it outside the loop and removed the AssertTableUUID filter from _rebuild_snapshot_updates.
| def _cleanup_uncommitted(self) -> None: | ||
| """Delete manifest files from failed retry attempts.""" | ||
| for path in self._uncommitted_manifests: | ||
| try: | ||
| self._io.delete(path) | ||
| except Exception: | ||
| logger.warning("Failed to delete uncommitted manifest: %s", path, exc_info=True) | ||
| self._uncommitted_manifests.clear() |
There was a problem hiding this comment.
suggestion: We could also add a second similar function as follows:
def _clean_all_uncommitted(self) -> None:
"""Clean up all manifests written during this producer's lifecycle on abort."""
for path in itertools.chain(self._uncommitted_manifests, self._written_manifests):
try:
self._io.delete(path)
except Exception:
logger.warning("Failed to delete uncommitted manifest: %s", path, exc_info=True)
self._uncommitted_manifests.clear()
self._written_manifests.clear()then in Transaciton.commit_transaction(), we can add a try/except to the for-loop as follows:
try:
for attempt in range(num_retries + 1):
try:
self._table._do_commit(...)
self._cleanup_uncommitted_manifests()
break
except CommitFailedException:
... # retry logic
except Exception:
# Catch ValidationException or retry exhaustion
for producer in self._snapshot_producers:
producer._clean_all_uncommitted()
raisethis would then allow the PyIceberg implementation to mirror the cleanAll() method in Java. In the current implementation, the for-loop for retrying will only clear out the _uncommitted_manifests from the previous failed retries, but we can extend this with _clean_all_uncommitted which will clear out that and _written_manifests from the current attempt in the case of a permanent abort. This would fix the gap for orphaned manifests from ValidationException (or other permanent failures) that are not cleaned up. I also think it's worth mentioning that this fix could be cleanly added to this PR without waiting for a full Delete orphaned files implementation in PyIceberg. WDYT about adding this into the current PR?
There was a problem hiding this comment.
Good catch. I added _clean_all_uncommitted() that cleans up both _uncommitted_manifests and _written_manifests, and wrapped the retry loop with try/except so it gets called on any permanent failure (ValidationException, retry exhaustion, etc.).
|
Any update on this? We are currently facing a production issue that this PR would solve. |
AssertTableUUID is constant across retries, so add it once before the loop instead of adding/removing on each iteration. Add _clean_all_uncommitted() that deletes both _uncommitted_manifests and _written_manifests on permanent failure, fixing orphaned manifests from the last attempt. Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
qzyu999
left a comment
There was a problem hiding this comment.
Hi @lawofcycles, thank you so much for accepting the suggested changes. I reviewed those and they all look correct.
|
Thanks for writing this! This is such a useful feature and there's a ton of nuance. I had Claude write up a short Gist explaining this issue that I'm seeing. Code is often easier to parse than writing. We try to commit and there's a conflict. Now, while we're waiting to retry, a second conflicting commit comes in. We're now two commits behind. We have to make sure that there's no issues against both of these commits. We should have this example as a test. As it stands, we're ignoring one of the commits. |
| conflict_detection_filter = self._predicate if self._predicate != AlwaysFalse() else None | ||
|
|
||
| if isolation_level == IsolationLevel.SERIALIZABLE: | ||
| _validate_added_data_files(table, parent_snapshot, conflict_detection_filter, parent_snapshot) |
There was a problem hiding this comment.
Why are both of these parent_snapshot?
There was a problem hiding this comment.
It shouldn't be. This was a bug. _parent_snapshot_id gets updated on each retry, so passing it for both collapsed the validation window to zero. Fixed by introducing _starting_snapshot_id that stays fixed across retries.
The concurrency validation was using parent_snapshot (current head) for both the starting point and ending point of the validation window. When multiple concurrent commits occur during retry sleep, the validation would only inspect the latest head and miss conflicting commits below it. Introduce _starting_snapshot_id that is fixed at operation init time and does not change on retry. Also fix _validation_history to use exclusive semantics for from_snapshot, matching Java Iceberg's ancestorsBetween. Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
|
@rambleraptor Thanks for the repro, this made the issue very clear. Confirmed and fixed. The validation now pins the original base snapshot at init time so the window covers all concurrent commits, not just the latest head. |
Closes #3319
Closes #819
Closes #269
Rationale for this change
PyIceberg currently fails immediately with
CommitFailedExceptionwhen a concurrent transaction commits first, regardless of whether the writes actually conflict. Java Iceberg handles this transparently through its retry loop inSnapshotProducer.commit().This PR adds automatic commit retry with exponential backoff and data conflict validation to PyIceberg, matching Java Iceberg's behavior. On
CommitFailedException, the retry loop refreshes table metadata, re-runs validation, and regenerates manifests. If validation detects a real data conflict, the operation aborts withValidationExceptioninstead of retrying.The retry loop is placed in
Transaction.commit_transaction()rather than in individual snapshot producers. This is necessary becauseTransaction.delete()uses two producers (_DeleteFiles+_OverwriteFiles) that must be committed atomically. Retrying at the producer level would break this atomicity.Validation behavior follows Java's
BaseOverwriteFiles.validate(), using the existing validation functions fromvalidate.pythat were contributed through #1935, #1938, #2050, and #3049.Are these changes tested?
Yes. Unit tests and integration tests covering retry success,
ValidationExceptionabort, retry exhaustion, isolation levels, partition-level conflict detection, manifest cleanup, and producer state reset.Are there any user-facing changes?
Yes. Previously, all concurrent write conflicts resulted in
CommitFailedException.Now:
ValidationExceptioninstead ofCommitFailedExceptionThe following new table properties are supported.
commit.retry.num-retries(default: 4)commit.retry.min-wait-ms(default: 100)commit.retry.max-wait-ms(default: 60000)write.delete.isolation-level(default: serializable)write.update.isolation-level(default: serializable)