From 32ffaf75bce3f96e8d19d6be7d127ef4d3b62de9 Mon Sep 17 00:00:00 2001 From: mkuchenbecker Date: Fri, 29 May 2026 10:39:35 -0700 Subject: [PATCH 01/14] test(tables): deterministic H2 repro for stale-base snapshot loss (incident-12185) Single-JVM, no-threads, no-docker reproduction of the BaseTransaction.applyUpdates silent-rebase lost-update. A stale writer stages its L1 snapshot view + COMMIT_KEY=L1 in a held transaction; a racing writer advances the catalog to L2; committing the stale transaction rebases the stale payload onto L2 so doCommit would subtract the racing snapshot. Asserts the racing snapshot survives. Passes on main (with the #612 stale-base abort CAS); reproduces the silent drop on the pre-fix tree. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../e2e/h2/StaleBaseLostUpdateTest.java | 188 ++++++++++++++++++ 1 file changed, 188 insertions(+) create mode 100644 services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java new file mode 100644 index 000000000..491fce00e --- /dev/null +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java @@ -0,0 +1,188 @@ +package com.linkedin.openhouse.tables.e2e.h2; + +import static com.linkedin.openhouse.common.api.validator.ValidatorConstants.INITIAL_TABLE_VERSION; +import static com.linkedin.openhouse.tables.model.TableModelConstants.TABLE_DTO; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +import com.linkedin.openhouse.common.test.cluster.PropertyOverrideContextInitializer; +import com.linkedin.openhouse.internal.catalog.CatalogConstants; +import com.linkedin.openhouse.internal.catalog.OpenHouseInternalTableOperations; +import com.linkedin.openhouse.internal.catalog.SnapshotsUtil; +import com.linkedin.openhouse.tables.model.IcebergSnapshotsModelTestUtilities; +import com.linkedin.openhouse.tables.model.TableDto; +import com.linkedin.openhouse.tables.model.TableDtoPrimaryKey; +import com.linkedin.openhouse.tables.repository.OpenHouseInternalRepository; +import java.lang.reflect.Field; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotParser; +import org.apache.iceberg.Table; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ContextConfiguration; + +/** + * Deterministic H2 reproduction of the incident-12185 stale-base silent snapshot drop. A stale + * writer holds an uncommitted transaction opened at base L1; a racing writer advances the catalog + * L1 -> L2 (adding S2); the stale transaction then commits, and {@code BaseTransaction.applyUpdates} + * silently rebases the stale {S1} view + COMMIT_KEY=L1 onto L2. Without the PR #612 abort, S2 is + * silently dropped. + */ +@SpringBootTest +@ContextConfiguration(initializers = PropertyOverrideContextInitializer.class) +@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS) +public class StaleBaseLostUpdateTest { + + private static final Logger LOG = LoggerFactory.getLogger(StaleBaseLostUpdateTest.class); + + @Autowired OpenHouseInternalRepository openHouseInternalRepository; + + @Autowired Catalog catalog; + + @Test + void testLostUpdateViaStagedTransactionConflict() throws Exception { + Setup s = createTableWithOneSnapshot("conflict_staged"); + + // Stale writer opens a transaction at L1 and stages its real L1 view ({S1}) -- exactly what + // OpenHouseInternalRepositoryImpl.save does internally, but left uncommitted so a racing commit + // can land first. + Table staleHandle = catalog.loadTable(s.id); + List staleView = Lists.newArrayList(staleHandle.snapshots()); + Assertions.assertEquals(1, staleView.size(), "stale writer's base L1 must hold exactly {S1}"); + Transaction staleTxn = staleHandle.newTransaction(); + staleTxn + .updateProperties() + .set(CatalogConstants.SNAPSHOTS_JSON_KEY, SnapshotsUtil.serializedSnapshots(staleView)) + .set(CatalogConstants.SNAPSHOTS_REFS_KEY, SnapshotsUtil.serializeMap(refs(s.s1))) + .set(CatalogConstants.COMMIT_KEY, s.l1.getTableLocation()) + .set("foo", "bar") + .commit(); + + // Racing writer appends S2 via the repository path -> catalog advances L1 -> L2. + Snapshot s2 = catalog.loadTable(s.id).newAppend().appendFile(dummyDataFile()).apply(); + openHouseInternalRepository.save( + s.l1 + .toBuilder() + .tableVersion(s.l1.getTableLocation()) + .jsonSnapshots(Arrays.asList(SnapshotParser.toJson(s.s1), SnapshotParser.toJson(s2))) + .snapshotRefs(refs(s2)) + .build()); + + clearPerJvmRetryCache(); + + // Commit the held transaction: applyUpdates sees base L1 != catalog L2, rebases, and re-stamps + // the now-stale {S1} list + COMMIT_KEY=L1 onto L2. With #612 this aborts; without it, S2 is + // silently dropped. + try { + staleTxn.commitTransaction(); + } catch (Exception expected) { + // A clean rejection is acceptable and version-dependent: + // - D (#612): CommitFailedException from the stale-base CAS. + // - A (pre-#406): BadRequestException from SnapshotInspector.validateSnapshotsUpdate + // ("Cannot delete the latest snapshot"). + // The invariant asserted below is the same either way: racing S2 must survive. + LOG.info("commitTransaction rejected the stale commit: {}", expected.toString()); + } + + List remaining = Lists.newArrayList(catalog.loadTable(s.id).snapshots()); + LOG.info( + "staged-conflict result: remaining snapshots={}", + remaining.stream() + .map(Snapshot::snapshotId) + .collect(java.util.stream.Collectors.toList())); + Assertions.assertEquals( + 2, remaining.size(), "S2 (" + s2.snapshotId() + ") must not be silently dropped from H2"); + Assertions.assertTrue( + remaining.stream().anyMatch(x -> x.snapshotId() == s2.snapshotId()), + "racing snapshot S2 must still be present after the conflicting stale commit"); + + cleanup(s.id); + } + + /** Holds the post-setup state of a freshly created table with a single snapshot S1 at L1. */ + private static final class Setup { + final TableIdentifier id; + final TableDto l1; + final Snapshot s1; + + Setup(TableIdentifier id, TableDto l1, Snapshot s1) { + this.id = id; + this.l1 = l1; + this.s1 = s1; + } + } + + private Setup createTableWithOneSnapshot(String tableId) throws Exception { + Schema schema = + new Schema( + required(1, "id", Types.StringType.get()), optional(2, "data", Types.StringType.get())); + TableDto l0 = + openHouseInternalRepository.save( + TABLE_DTO + .toBuilder() + .tableId(tableId) + .schema(SchemaParser.toJson(schema, false)) + .timePartitioning(null) + .clustering(null) + .tableVersion(INITIAL_TABLE_VERSION) + .build()); + TableIdentifier id = TableIdentifier.of(l0.getDatabaseId(), tableId); + Snapshot s1 = catalog.loadTable(id).newAppend().appendFile(dummyDataFile()).apply(); + TableDto l1 = + openHouseInternalRepository.save( + l0.toBuilder() + .tableVersion(l0.getTableLocation()) + .jsonSnapshots(Collections.singletonList(SnapshotParser.toJson(s1))) + .snapshotRefs(refs(s1)) + .build()); + return new Setup(id, l1, s1); + } + + private DataFile dummyDataFile() throws Exception { + return IcebergSnapshotsModelTestUtilities.createDummyDataFile( + Files.createTempFile("incident-12185-", ".orc").toString(), PartitionSpec.unpartitioned()); + } + + private static Map refs(Snapshot snapshot) { + return IcebergSnapshotsModelTestUtilities.obtainSnapshotRefsFromSnapshot( + SnapshotParser.toJson(snapshot)); + } + + private void cleanup(TableIdentifier id) { + openHouseInternalRepository.deleteById( + TableDtoPrimaryKey.builder() + .databaseId(id.namespace().toString()) + .tableId(id.name()) + .build()); + } + + /** + * Invalidates the per-JVM static retry cache used by {@code failIfRetryUpdate} so the test models + * a stale commit reaching a replica that never cached the prior {@code COMMIT_KEY}. Without this, + * the single-JVM cache hit would short-circuit the second committer before #612 is exercised. + */ + private static void clearPerJvmRetryCache() throws Exception { + Field cacheField = OpenHouseInternalTableOperations.class.getDeclaredField("CACHE"); + cacheField.setAccessible(true); + ((com.google.common.cache.Cache) cacheField.get(null)).invalidateAll(); + } +} From 0b513ac1a0ff12ae05eff28055e79171588e56bd Mon Sep 17 00:00:00 2001 From: mkuchenbecker Date: Fri, 29 May 2026 10:45:43 -0700 Subject: [PATCH 02/14] test(tables): clean up repro; add prior-data-snapshot variant Factor the staged-conflict concurrency into a shared core and add a second test where the table already holds committed data history (two snapshots) before the race begins. Both assert the racing snapshot survives. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../e2e/h2/StaleBaseLostUpdateTest.java | 147 +++++++++++------- 1 file changed, 88 insertions(+), 59 deletions(-) diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java index 491fce00e..8732d12d4 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java @@ -15,10 +15,10 @@ import com.linkedin.openhouse.tables.repository.OpenHouseInternalRepository; import java.lang.reflect.Field; import java.nio.file.Files; -import java.util.Arrays; -import java.util.Collections; +import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.apache.iceberg.DataFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -41,11 +41,16 @@ import org.springframework.test.context.ContextConfiguration; /** - * Deterministic H2 reproduction of the incident-12185 stale-base silent snapshot drop. A stale - * writer holds an uncommitted transaction opened at base L1; a racing writer advances the catalog - * L1 -> L2 (adding S2); the stale transaction then commits, and {@code BaseTransaction.applyUpdates} - * silently rebases the stale {S1} view + COMMIT_KEY=L1 onto L2. Without the PR #612 abort, S2 is - * silently dropped. + * Deterministic H2 reproduction of the incident-12185 stale-base silent snapshot drop. Single-JVM, + * no threads, no docker. + * + *

A stale writer opens an Iceberg transaction at base L1 and stages its real L1 snapshot view + * (the writer's {@code SNAPSHOTS_JSON} payload) plus {@code COMMIT_KEY=L1}, but holds it + * uncommitted. A racing writer appends a new snapshot via the repository path, advancing the + * catalog L1 -> L2. When the held transaction is committed, {@code BaseTransaction.applyUpdates} + * rebases the stale payload + {@code COMMIT_KEY=L1} onto L2, so {@code doCommit} would subtract the + * racing snapshot. The invariant asserted is that the racing snapshot must survive (PR #612's + * stale-base CAS aborts the commit; without it the racing snapshot is silently dropped). */ @SpringBootTest @ContextConfiguration(initializers = PropertyOverrideContextInitializer.class) @@ -58,84 +63,97 @@ public class StaleBaseLostUpdateTest { @Autowired Catalog catalog; + /** Base table holds a single committed data snapshot when the race begins. */ @Test void testLostUpdateViaStagedTransactionConflict() throws Exception { - Setup s = createTableWithOneSnapshot("conflict_staged"); + TableDto l1 = createTableWithCommittedDataSnapshots("conflict_staged", 1); + assertRacingSnapshotSurvivesStaleStagedCommit(l1); + } + + /** + * Same scenario, but the table already has prior committed data history (two snapshots) before + * the race begins — the stale writer's base is a non-trivial, data-bearing table. + */ + @Test + void testLostUpdateViaStagedTransactionConflictWithPriorDataSnapshot() throws Exception { + TableDto l1 = createTableWithCommittedDataSnapshots("conflict_staged_prior_data", 2); + assertRacingSnapshotSurvivesStaleStagedCommit(l1); + } - // Stale writer opens a transaction at L1 and stages its real L1 view ({S1}) -- exactly what - // OpenHouseInternalRepositoryImpl.save does internally, but left uncommitted so a racing commit - // can land first. - Table staleHandle = catalog.loadTable(s.id); + /** + * Core reproduction. Given a table whose latest committed version is {@code l1}, stages a stale + * commit at L1, lands a racing append (L1 -> L2), then commits the stale transaction and + * asserts the racing snapshot is not dropped. + */ + private void assertRacingSnapshotSurvivesStaleStagedCommit(TableDto l1) throws Exception { + TableIdentifier id = idOf(l1); + + // Stale writer opens a transaction at L1 and stages its real L1 view + COMMIT_KEY=L1 (exactly + // what OpenHouseInternalRepositoryImpl.save stamps), held uncommitted so a racing commit lands + // first. + Table staleHandle = catalog.loadTable(id); List staleView = Lists.newArrayList(staleHandle.snapshots()); - Assertions.assertEquals(1, staleView.size(), "stale writer's base L1 must hold exactly {S1}"); + int priorSnapshotCount = staleView.size(); + Snapshot staleLatest = staleHandle.currentSnapshot(); Transaction staleTxn = staleHandle.newTransaction(); staleTxn .updateProperties() .set(CatalogConstants.SNAPSHOTS_JSON_KEY, SnapshotsUtil.serializedSnapshots(staleView)) - .set(CatalogConstants.SNAPSHOTS_REFS_KEY, SnapshotsUtil.serializeMap(refs(s.s1))) - .set(CatalogConstants.COMMIT_KEY, s.l1.getTableLocation()) + .set(CatalogConstants.SNAPSHOTS_REFS_KEY, SnapshotsUtil.serializeMap(refs(staleLatest))) + .set(CatalogConstants.COMMIT_KEY, l1.getTableLocation()) .set("foo", "bar") .commit(); // Racing writer appends S2 via the repository path -> catalog advances L1 -> L2. - Snapshot s2 = catalog.loadTable(s.id).newAppend().appendFile(dummyDataFile()).apply(); + Snapshot s2 = catalog.loadTable(id).newAppend().appendFile(dummyDataFile()).apply(); + List racingSnapshots = new ArrayList<>(); + for (Snapshot existing : staleView) { + racingSnapshots.add(SnapshotParser.toJson(existing)); + } + racingSnapshots.add(SnapshotParser.toJson(s2)); openHouseInternalRepository.save( - s.l1 - .toBuilder() - .tableVersion(s.l1.getTableLocation()) - .jsonSnapshots(Arrays.asList(SnapshotParser.toJson(s.s1), SnapshotParser.toJson(s2))) + l1.toBuilder() + .tableVersion(l1.getTableLocation()) + .jsonSnapshots(racingSnapshots) .snapshotRefs(refs(s2)) .build()); clearPerJvmRetryCache(); // Commit the held transaction: applyUpdates sees base L1 != catalog L2, rebases, and re-stamps - // the now-stale {S1} list + COMMIT_KEY=L1 onto L2. With #612 this aborts; without it, S2 is - // silently dropped. + // the stale view + COMMIT_KEY=L1 onto L2. A clean rejection is acceptable and version-dependent + // (CommitFailedException from PR #612's CAS, or BadRequestException from the pre-#406 guard); + // the invariant below is the same either way: the racing snapshot must survive. try { staleTxn.commitTransaction(); } catch (Exception expected) { - // A clean rejection is acceptable and version-dependent: - // - D (#612): CommitFailedException from the stale-base CAS. - // - A (pre-#406): BadRequestException from SnapshotInspector.validateSnapshotsUpdate - // ("Cannot delete the latest snapshot"). - // The invariant asserted below is the same either way: racing S2 must survive. LOG.info("commitTransaction rejected the stale commit: {}", expected.toString()); } - List remaining = Lists.newArrayList(catalog.loadTable(s.id).snapshots()); + List remaining = Lists.newArrayList(catalog.loadTable(id).snapshots()); LOG.info( "staged-conflict result: remaining snapshots={}", - remaining.stream() - .map(Snapshot::snapshotId) - .collect(java.util.stream.Collectors.toList())); + remaining.stream().map(Snapshot::snapshotId).collect(Collectors.toList())); Assertions.assertEquals( - 2, remaining.size(), "S2 (" + s2.snapshotId() + ") must not be silently dropped from H2"); + priorSnapshotCount + 1, + remaining.size(), + "racing snapshot S2 (" + s2.snapshotId() + ") must not be silently dropped from H2"); Assertions.assertTrue( remaining.stream().anyMatch(x -> x.snapshotId() == s2.snapshotId()), "racing snapshot S2 must still be present after the conflicting stale commit"); - cleanup(s.id); + cleanup(id); } - /** Holds the post-setup state of a freshly created table with a single snapshot S1 at L1. */ - private static final class Setup { - final TableIdentifier id; - final TableDto l1; - final Snapshot s1; - - Setup(TableIdentifier id, TableDto l1, Snapshot s1) { - this.id = id; - this.l1 = l1; - this.s1 = s1; - } - } - - private Setup createTableWithOneSnapshot(String tableId) throws Exception { + /** + * Creates a table and commits {@code count} data-bearing snapshots through the repository path, + * returning the latest committed {@link TableDto} (the base the stale writer will load as L1). + */ + private TableDto createTableWithCommittedDataSnapshots(String tableId, int count) throws Exception { Schema schema = new Schema( required(1, "id", Types.StringType.get()), optional(2, "data", Types.StringType.get())); - TableDto l0 = + TableDto dto = openHouseInternalRepository.save( TABLE_DTO .toBuilder() @@ -145,16 +163,26 @@ private Setup createTableWithOneSnapshot(String tableId) throws Exception { .clustering(null) .tableVersion(INITIAL_TABLE_VERSION) .build()); - TableIdentifier id = TableIdentifier.of(l0.getDatabaseId(), tableId); - Snapshot s1 = catalog.loadTable(id).newAppend().appendFile(dummyDataFile()).apply(); - TableDto l1 = - openHouseInternalRepository.save( - l0.toBuilder() - .tableVersion(l0.getTableLocation()) - .jsonSnapshots(Collections.singletonList(SnapshotParser.toJson(s1))) - .snapshotRefs(refs(s1)) - .build()); - return new Setup(id, l1, s1); + TableIdentifier id = idOf(dto); + List committed = new ArrayList<>(); + for (int i = 0; i < count; i++) { + Snapshot snapshot = catalog.loadTable(id).newAppend().appendFile(dummyDataFile()).apply(); + committed.add(snapshot); + List jsonSnapshots = + committed.stream().map(SnapshotParser::toJson).collect(Collectors.toList()); + dto = + openHouseInternalRepository.save( + dto.toBuilder() + .tableVersion(dto.getTableLocation()) + .jsonSnapshots(jsonSnapshots) + .snapshotRefs(refs(snapshot)) + .build()); + } + return dto; + } + + private static TableIdentifier idOf(TableDto dto) { + return TableIdentifier.of(dto.getDatabaseId(), dto.getTableId()); } private DataFile dummyDataFile() throws Exception { @@ -178,7 +206,8 @@ private void cleanup(TableIdentifier id) { /** * Invalidates the per-JVM static retry cache used by {@code failIfRetryUpdate} so the test models * a stale commit reaching a replica that never cached the prior {@code COMMIT_KEY}. Without this, - * the single-JVM cache hit would short-circuit the second committer before #612 is exercised. + * the single-JVM cache hit would short-circuit the second committer before the abort is + * exercised. */ private static void clearPerJvmRetryCache() throws Exception { Field cacheField = OpenHouseInternalTableOperations.class.getDeclaredField("CACHE"); From be1386fb33df03a311126cd094e9930e779bee03 Mon Sep 17 00:00:00 2001 From: mkuchenbecker Date: Fri, 29 May 2026 10:54:26 -0700 Subject: [PATCH 03/14] test(tables): stale writer performs a real insert in the repro Both contending writers now append data: the stale writer commits its own data snapshot at base L1 (payload omitting the racing snapshot), modeling a genuine concurrent-insert race rather than a property-only update. Also assert the stale writer's conflicting insert is rejected wholesale, not merged onto the racing commit. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../e2e/h2/StaleBaseLostUpdateTest.java | 29 ++++++++++++------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java index 8732d12d4..4d84abd2f 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java @@ -88,20 +88,22 @@ void testLostUpdateViaStagedTransactionConflictWithPriorDataSnapshot() throws Ex private void assertRacingSnapshotSurvivesStaleStagedCommit(TableDto l1) throws Exception { TableIdentifier id = idOf(l1); - // Stale writer opens a transaction at L1 and stages its real L1 view + COMMIT_KEY=L1 (exactly - // what OpenHouseInternalRepositoryImpl.save stamps), held uncommitted so a racing commit lands - // first. + // Stale writer opens a transaction at L1 and performs its own insert: a new data snapshot + // appended at base L1. Its staged payload contains the existing snapshots + its own append but + // NOT the racing snapshot (which it cannot see), with COMMIT_KEY=L1 — exactly what + // OpenHouseInternalRepositoryImpl.save stamps. Held uncommitted so a racing commit lands first. Table staleHandle = catalog.loadTable(id); List staleView = Lists.newArrayList(staleHandle.snapshots()); int priorSnapshotCount = staleView.size(); - Snapshot staleLatest = staleHandle.currentSnapshot(); + Snapshot staleInsert = staleHandle.newAppend().appendFile(dummyDataFile()).apply(); + List stalePayload = new ArrayList<>(staleView); + stalePayload.add(staleInsert); Transaction staleTxn = staleHandle.newTransaction(); staleTxn .updateProperties() - .set(CatalogConstants.SNAPSHOTS_JSON_KEY, SnapshotsUtil.serializedSnapshots(staleView)) - .set(CatalogConstants.SNAPSHOTS_REFS_KEY, SnapshotsUtil.serializeMap(refs(staleLatest))) + .set(CatalogConstants.SNAPSHOTS_JSON_KEY, SnapshotsUtil.serializedSnapshots(stalePayload)) + .set(CatalogConstants.SNAPSHOTS_REFS_KEY, SnapshotsUtil.serializeMap(refs(staleInsert))) .set(CatalogConstants.COMMIT_KEY, l1.getTableLocation()) - .set("foo", "bar") .commit(); // Racing writer appends S2 via the repository path -> catalog advances L1 -> L2. @@ -134,13 +136,18 @@ private void assertRacingSnapshotSurvivesStaleStagedCommit(TableDto l1) throws E LOG.info( "staged-conflict result: remaining snapshots={}", remaining.stream().map(Snapshot::snapshotId).collect(Collectors.toList())); + Assertions.assertTrue( + remaining.stream().anyMatch(x -> x.snapshotId() == s2.snapshotId()), + "racing snapshot S2 (" + s2.snapshotId() + ") must not be silently dropped from H2"); + Assertions.assertTrue( + remaining.stream().noneMatch(x -> x.snapshotId() == staleInsert.snapshotId()), + "stale writer's conflicting insert (" + + staleInsert.snapshotId() + + ") must be rejected wholesale, not merged on top of the racing commit"); Assertions.assertEquals( priorSnapshotCount + 1, remaining.size(), - "racing snapshot S2 (" + s2.snapshotId() + ") must not be silently dropped from H2"); - Assertions.assertTrue( - remaining.stream().anyMatch(x -> x.snapshotId() == s2.snapshotId()), - "racing snapshot S2 must still be present after the conflicting stale commit"); + "table must hold only the prior snapshots + the racing snapshot S2"); cleanup(id); } From 2aead4b47e0093d1df4b6dcd421837f6c7984866 Mon Sep 17 00:00:00 2001 From: mkuchenbecker Date: Fri, 29 May 2026 10:56:07 -0700 Subject: [PATCH 04/14] test(tables): add zero-prior-data case (create-table-only base) Covers the race on a freshly created table with no committed data snapshots: the stale writer's first insert races a concurrent first insert. Confirms the racing snapshot survives even when the base has no prior history. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java index 4d84abd2f..aab9efb3a 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java @@ -63,6 +63,13 @@ public class StaleBaseLostUpdateTest { @Autowired Catalog catalog; + /** Base table is freshly created with zero committed data snapshots when the race begins. */ + @Test + void testLostUpdateViaStagedTransactionConflictWithNoPriorData() throws Exception { + TableDto l1 = createTableWithCommittedDataSnapshots("conflict_staged_no_data", 0); + assertRacingSnapshotSurvivesStaleStagedCommit(l1); + } + /** Base table holds a single committed data snapshot when the race begins. */ @Test void testLostUpdateViaStagedTransactionConflict() throws Exception { From 823d18c3b97e06a0bc9952efe940514f78bcdc64 Mon Sep 17 00:00:00 2001 From: mkuchenbecker Date: Fri, 29 May 2026 11:02:24 -0700 Subject: [PATCH 05/14] test(tables): add expire-snapshots vs concurrent-insert race Models the production maintenance shape: an expire-snapshots commit (keep-subset payload computed at base L1) racing a concurrent data insert. The stale expire's subset omits the racing snapshot, so the subtractive merge would expire the fresh data commit. Asserts the data commit survives and the stale expire is rejected wholesale. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../e2e/h2/StaleBaseLostUpdateTest.java | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java index aab9efb3a..a2622a8d6 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java @@ -87,6 +87,70 @@ void testLostUpdateViaStagedTransactionConflictWithPriorDataSnapshot() throws Ex assertRacingSnapshotSurvivesStaleStagedCommit(l1); } + /** + * A snapshot-expiration commit racing a concurrent data insert — the production maintenance + * shape. The expire job stages a "keep" subset computed against base L1 (here: keep only the + * current head, expiring older snapshots); that subset cannot reference the concurrently-added + * data snapshot. When the stale expire rebases onto L2, the subtractive merge would expire the + * racing data commit along with the intended old snapshots. Asserts the data commit survives and + * the stale expire is rejected wholesale. + */ + @Test + void testExpireSnapshotsDropsConcurrentDataCommit() throws Exception { + TableDto l1 = createTableWithCommittedDataSnapshots("expire_race", 2); + TableIdentifier id = idOf(l1); + + Table staleHandle = catalog.loadTable(id); + List base = Lists.newArrayList(staleHandle.snapshots()); + Snapshot head = staleHandle.currentSnapshot(); + List keep = Lists.newArrayList(head); // expire everything older than the head + Transaction expireTxn = staleHandle.newTransaction(); + expireTxn + .updateProperties() + .set(CatalogConstants.SNAPSHOTS_JSON_KEY, SnapshotsUtil.serializedSnapshots(keep)) + .set(CatalogConstants.SNAPSHOTS_REFS_KEY, SnapshotsUtil.serializeMap(refs(head))) + .set(CatalogConstants.COMMIT_KEY, l1.getTableLocation()) + .commit(); + + // Racing data insert -> catalog advances L1 -> L2. + Snapshot racingData = catalog.loadTable(id).newAppend().appendFile(dummyDataFile()).apply(); + List l2Snapshots = new ArrayList<>(); + for (Snapshot s : base) { + l2Snapshots.add(SnapshotParser.toJson(s)); + } + l2Snapshots.add(SnapshotParser.toJson(racingData)); + openHouseInternalRepository.save( + l1.toBuilder() + .tableVersion(l1.getTableLocation()) + .jsonSnapshots(l2Snapshots) + .snapshotRefs(refs(racingData)) + .build()); + + clearPerJvmRetryCache(); + + try { + expireTxn.commitTransaction(); + } catch (Exception expected) { + LOG.info("commitTransaction rejected the stale expire: {}", expected.toString()); + } + + List remaining = Lists.newArrayList(catalog.loadTable(id).snapshots()); + LOG.info( + "expire-race result: remaining snapshots={}", + remaining.stream().map(Snapshot::snapshotId).collect(Collectors.toList())); + Assertions.assertTrue( + remaining.stream().anyMatch(x -> x.snapshotId() == racingData.snapshotId()), + "racing data commit (" + + racingData.snapshotId() + + ") must not be expired away by a stale concurrent expire"); + Assertions.assertEquals( + base.size() + 1, + remaining.size(), + "stale expire must be rejected wholesale; nothing expired and the racing data commit kept"); + + cleanup(id); + } + /** * Core reproduction. Given a table whose latest committed version is {@code l1}, stages a stale * commit at L1, lands a racing append (L1 -> L2), then commits the stale transaction and From f566341d64fa288b5243550813816a26fa105e55 Mon Sep 17 00:00:00 2001 From: mkuchenbecker Date: Fri, 29 May 2026 11:07:26 -0700 Subject: [PATCH 06/14] test(tables): drop redundant single-snapshot insert variant count=0 (plain create) already reproduces the insert race; the single-snapshot case added no coverage. Keep: zero-prior-data insert, two-snapshot prior-data insert, and the expire-vs-insert race (which requires base snapshots). --- .../tables/e2e/h2/StaleBaseLostUpdateTest.java | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java index a2622a8d6..26ba42d06 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java @@ -70,16 +70,9 @@ void testLostUpdateViaStagedTransactionConflictWithNoPriorData() throws Exceptio assertRacingSnapshotSurvivesStaleStagedCommit(l1); } - /** Base table holds a single committed data snapshot when the race begins. */ - @Test - void testLostUpdateViaStagedTransactionConflict() throws Exception { - TableDto l1 = createTableWithCommittedDataSnapshots("conflict_staged", 1); - assertRacingSnapshotSurvivesStaleStagedCommit(l1); - } - /** - * Same scenario, but the table already has prior committed data history (two snapshots) before - * the race begins — the stale writer's base is a non-trivial, data-bearing table. + * Table already has prior committed data history (two snapshots) before the race begins — the + * stale writer's base is a non-trivial, data-bearing table. */ @Test void testLostUpdateViaStagedTransactionConflictWithPriorDataSnapshot() throws Exception { From 27e37500d2d21a1054e3fe3f37a08c1c0b76866d Mon Sep 17 00:00:00 2001 From: Mike Kuchenbecker Date: Fri, 29 May 2026 11:10:25 -0700 Subject: [PATCH 07/14] Apply suggestions from code review Co-authored-by: Mike Kuchenbecker --- .../openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java index 26ba42d06..076d45fb5 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java @@ -41,8 +41,6 @@ import org.springframework.test.context.ContextConfiguration; /** - * Deterministic H2 reproduction of the incident-12185 stale-base silent snapshot drop. Single-JVM, - * no threads, no docker. * *

A stale writer opens an Iceberg transaction at base L1 and stages its real L1 snapshot view * (the writer's {@code SNAPSHOTS_JSON} payload) plus {@code COMMIT_KEY=L1}, but holds it From cf14eb2ca5da95f4b07112d270f1c67bc126fecf Mon Sep 17 00:00:00 2001 From: mkuchenbecker Date: Fri, 29 May 2026 11:36:55 -0700 Subject: [PATCH 08/14] test(tables): make the repro subtractive so it actually fails pre-fix Validated by running pre-fix (d4fc9fe7): both tests FAIL (racing snapshot silently dropped); post-fix (#612): both PASS. Three layered guards must all be bypassed for the lost update: 1. HTS optimistic-version CAS -> bypassed by applyUpdates' silent rebase (held txn) 2. failIfRetryUpdate per-JVM cache -> bypassed cross-replica (cleared in the test) 3. Iceberg snapshot sequence-number validation -> only bypassed by a SUBTRACTIVE stale commit (adds no new snapshot). A stale writer adding its own snapshot on a multi-snapshot base is rejected by guard #3, so it cannot lose data -- which is why incident-12185 was an expire/optimizer drop, not an insert race. Tests (both subtractive): expire racing a data insert (prior history); two concurrent first-inserts on a fresh table. --- .../e2e/h2/StaleBaseLostUpdateTest.java | 213 ++++++++---------- 1 file changed, 89 insertions(+), 124 deletions(-) diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java index 076d45fb5..3fc9a9df9 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java @@ -41,14 +41,19 @@ import org.springframework.test.context.ContextConfiguration; /** + * A stale writer opens an Iceberg transaction at base L1 and stages its L1 snapshot view + + * {@code COMMIT_KEY=L1}, held uncommitted. A racing writer commits a new snapshot through the + * repository path, advancing the catalog L1 -> L2. Committing the held transaction drives {@code + * BaseTransaction.applyUpdates} to silently refresh the in-flight base to L2 (passing the HTS + * optimistic-version CAS) while keeping the stale payload + {@code COMMIT_KEY=L1}; {@code doCommit} + * then subtracts the racing snapshot. On the buggy catalog the racing commit is silently dropped; + * PR #612's stale-base CAS aborts instead. Invariant: the racing data commit always survives. * - *

A stale writer opens an Iceberg transaction at base L1 and stages its real L1 snapshot view - * (the writer's {@code SNAPSHOTS_JSON} payload) plus {@code COMMIT_KEY=L1}, but holds it - * uncommitted. A racing writer appends a new snapshot via the repository path, advancing the - * catalog L1 -> L2. When the held transaction is committed, {@code BaseTransaction.applyUpdates} - * rebases the stale payload + {@code COMMIT_KEY=L1} onto L2, so {@code doCommit} would subtract the - * racing snapshot. The invariant asserted is that the racing snapshot must survive (PR #612's - * stale-base CAS aborts the commit; without it the racing snapshot is silently dropped). + *

Both cases here are subtractive stale commits — the stale payload omits the racing + * snapshot and does not add a new snapshot whose sequence number would collide with it. That is the + * shape that actually reproduced incident-12185 (an expire/optimizer commit dropping a fresh data + * commit). A stale writer that adds its own snapshot on a multi-snapshot base is instead + * rejected by Iceberg's snapshot sequence-number validation, so that shape cannot lose data. */ @SpringBootTest @ContextConfiguration(initializers = PropertyOverrideContextInitializer.class) @@ -61,30 +66,12 @@ public class StaleBaseLostUpdateTest { @Autowired Catalog catalog; - /** Base table is freshly created with zero committed data snapshots when the race begins. */ - @Test - void testLostUpdateViaStagedTransactionConflictWithNoPriorData() throws Exception { - TableDto l1 = createTableWithCommittedDataSnapshots("conflict_staged_no_data", 0); - assertRacingSnapshotSurvivesStaleStagedCommit(l1); - } - - /** - * Table already has prior committed data history (two snapshots) before the race begins — the - * stale writer's base is a non-trivial, data-bearing table. - */ - @Test - void testLostUpdateViaStagedTransactionConflictWithPriorDataSnapshot() throws Exception { - TableDto l1 = createTableWithCommittedDataSnapshots("conflict_staged_prior_data", 2); - assertRacingSnapshotSurvivesStaleStagedCommit(l1); - } - /** - * A snapshot-expiration commit racing a concurrent data insert — the production maintenance - * shape. The expire job stages a "keep" subset computed against base L1 (here: keep only the - * current head, expiring older snapshots); that subset cannot reference the concurrently-added - * data snapshot. When the stale expire rebases onto L2, the subtractive merge would expire the - * racing data commit along with the intended old snapshots. Asserts the data commit survives and - * the stale expire is rejected wholesale. + * Snapshot-expiration commit racing a concurrent data insert — the production maintenance shape, + * on a table with prior committed data history. The stale expire keeps only the current head + * (expiring older snapshots); its keep-subset cannot reference the concurrently-added data + * snapshot, so the subtractive merge would expire the racing data commit. Asserts the data commit + * survives and the stale expire is rejected wholesale. */ @Test void testExpireSnapshotsDropsConcurrentDataCommit() throws Exception { @@ -94,131 +81,98 @@ void testExpireSnapshotsDropsConcurrentDataCommit() throws Exception { Table staleHandle = catalog.loadTable(id); List base = Lists.newArrayList(staleHandle.snapshots()); Snapshot head = staleHandle.currentSnapshot(); - List keep = Lists.newArrayList(head); // expire everything older than the head - Transaction expireTxn = staleHandle.newTransaction(); - expireTxn - .updateProperties() - .set(CatalogConstants.SNAPSHOTS_JSON_KEY, SnapshotsUtil.serializedSnapshots(keep)) - .set(CatalogConstants.SNAPSHOTS_REFS_KEY, SnapshotsUtil.serializeMap(refs(head))) - .set(CatalogConstants.COMMIT_KEY, l1.getTableLocation()) - .commit(); + List keepOnlyHead = Lists.newArrayList(head); - // Racing data insert -> catalog advances L1 -> L2. - Snapshot racingData = catalog.loadTable(id).newAppend().appendFile(dummyDataFile()).apply(); - List l2Snapshots = new ArrayList<>(); - for (Snapshot s : base) { - l2Snapshots.add(SnapshotParser.toJson(s)); - } - l2Snapshots.add(SnapshotParser.toJson(racingData)); - openHouseInternalRepository.save( - l1.toBuilder() - .tableVersion(l1.getTableLocation()) - .jsonSnapshots(l2Snapshots) - .snapshotRefs(refs(racingData)) - .build()); - - clearPerJvmRetryCache(); + assertRacingDataCommitSurvivesStaleCommit(l1, base, keepOnlyHead, head); + } - try { - expireTxn.commitTransaction(); - } catch (Exception expected) { - LOG.info("commitTransaction rejected the stale expire: {}", expected.toString()); - } + /** + * Two writers concurrently perform the first insert into a freshly created table. The racing + * writer commits first; the stale writer then commits its own first snapshot still declaring the + * create version as its base. The subtractive merge would drop the racing first commit. + */ + @Test + void testStaleInsertDropsConcurrentDataCommitOnFreshTable() throws Exception { + TableDto l1 = createTableWithCommittedDataSnapshots("insert_race_fresh", 0); + TableIdentifier id = idOf(l1); - List remaining = Lists.newArrayList(catalog.loadTable(id).snapshots()); - LOG.info( - "expire-race result: remaining snapshots={}", - remaining.stream().map(Snapshot::snapshotId).collect(Collectors.toList())); - Assertions.assertTrue( - remaining.stream().anyMatch(x -> x.snapshotId() == racingData.snapshotId()), - "racing data commit (" - + racingData.snapshotId() - + ") must not be expired away by a stale concurrent expire"); - Assertions.assertEquals( - base.size() + 1, - remaining.size(), - "stale expire must be rejected wholesale; nothing expired and the racing data commit kept"); + Table staleHandle = catalog.loadTable(id); + List base = Lists.newArrayList(staleHandle.snapshots()); // empty + Snapshot staleInsert = staleHandle.newAppend().appendFile(dummyDataFile()).apply(); - cleanup(id); + assertRacingDataCommitSurvivesStaleCommit(l1, base, appended(base, staleInsert), staleInsert); } /** - * Core reproduction. Given a table whose latest committed version is {@code l1}, stages a stale - * commit at L1, lands a racing append (L1 -> L2), then commits the stale transaction and - * asserts the racing snapshot is not dropped. + * Core reproduction. A stale writer holds a transaction at base {@code l1} staging {@code + * stalePayload} (main -> {@code staleHead}); a racing writer commits a fresh data snapshot + * first (L1 -> L2); the held transaction then commits, rebasing the stale payload (which omits + * the racing snapshot) onto L2. Asserts the racing commit survives and the stale commit is + * rejected wholesale (nothing from the stale payload is applied). + * + * @param base the snapshots present at L1 (the stale writer's view before its own staging) + * @param stalePayload the full snapshot set the stale writer commits (omits the racing snapshot) + * @param staleHead the snapshot the stale writer points main at */ - private void assertRacingSnapshotSurvivesStaleStagedCommit(TableDto l1) throws Exception { + private void assertRacingDataCommitSurvivesStaleCommit( + TableDto l1, List base, List stalePayload, Snapshot staleHead) + throws Exception { TableIdentifier id = idOf(l1); + int priorSnapshotCount = base.size(); - // Stale writer opens a transaction at L1 and performs its own insert: a new data snapshot - // appended at base L1. Its staged payload contains the existing snapshots + its own append but - // NOT the racing snapshot (which it cannot see), with COMMIT_KEY=L1 — exactly what - // OpenHouseInternalRepositoryImpl.save stamps. Held uncommitted so a racing commit lands first. - Table staleHandle = catalog.loadTable(id); - List staleView = Lists.newArrayList(staleHandle.snapshots()); - int priorSnapshotCount = staleView.size(); - Snapshot staleInsert = staleHandle.newAppend().appendFile(dummyDataFile()).apply(); - List stalePayload = new ArrayList<>(staleView); - stalePayload.add(staleInsert); - Transaction staleTxn = staleHandle.newTransaction(); + // Stale writer: held transaction at L1 staging its payload + COMMIT_KEY=L1, uncommitted. + Transaction staleTxn = catalog.loadTable(id).newTransaction(); staleTxn .updateProperties() .set(CatalogConstants.SNAPSHOTS_JSON_KEY, SnapshotsUtil.serializedSnapshots(stalePayload)) - .set(CatalogConstants.SNAPSHOTS_REFS_KEY, SnapshotsUtil.serializeMap(refs(staleInsert))) + .set(CatalogConstants.SNAPSHOTS_REFS_KEY, SnapshotsUtil.serializeMap(refs(staleHead))) .set(CatalogConstants.COMMIT_KEY, l1.getTableLocation()) .commit(); - // Racing writer appends S2 via the repository path -> catalog advances L1 -> L2. - Snapshot s2 = catalog.loadTable(id).newAppend().appendFile(dummyDataFile()).apply(); - List racingSnapshots = new ArrayList<>(); - for (Snapshot existing : staleView) { - racingSnapshots.add(SnapshotParser.toJson(existing)); - } - racingSnapshots.add(SnapshotParser.toJson(s2)); - openHouseInternalRepository.save( - l1.toBuilder() - .tableVersion(l1.getTableLocation()) - .jsonSnapshots(racingSnapshots) - .snapshotRefs(refs(s2)) - .build()); + // Racing writer (also based on L1) commits a fresh data snapshot first -> L1 -> L2. + Snapshot racing = catalog.loadTable(id).newAppend().appendFile(dummyDataFile()).apply(); + commitThroughRepository(l1, snapshotJson(base, racing), racing); clearPerJvmRetryCache(); - // Commit the held transaction: applyUpdates sees base L1 != catalog L2, rebases, and re-stamps - // the stale view + COMMIT_KEY=L1 onto L2. A clean rejection is acceptable and version-dependent - // (CommitFailedException from PR #612's CAS, or BadRequestException from the pre-#406 guard); - // the invariant below is the same either way: the racing snapshot must survive. try { staleTxn.commitTransaction(); } catch (Exception expected) { - LOG.info("commitTransaction rejected the stale commit: {}", expected.toString()); + LOG.info("stale commit rejected: {}", expected.toString()); } List remaining = Lists.newArrayList(catalog.loadTable(id).snapshots()); LOG.info( - "staged-conflict result: remaining snapshots={}", + "stale-conflict result: remaining={}", remaining.stream().map(Snapshot::snapshotId).collect(Collectors.toList())); Assertions.assertTrue( - remaining.stream().anyMatch(x -> x.snapshotId() == s2.snapshotId()), - "racing snapshot S2 (" + s2.snapshotId() + ") must not be silently dropped from H2"); - Assertions.assertTrue( - remaining.stream().noneMatch(x -> x.snapshotId() == staleInsert.snapshotId()), - "stale writer's conflicting insert (" - + staleInsert.snapshotId() - + ") must be rejected wholesale, not merged on top of the racing commit"); + remaining.stream().anyMatch(s -> s.snapshotId() == racing.snapshotId()), + "racing data commit (" + racing.snapshotId() + ") must not be silently dropped"); Assertions.assertEquals( priorSnapshotCount + 1, remaining.size(), - "table must hold only the prior snapshots + the racing snapshot S2"); + "stale commit must be rejected wholesale; table holds only the prior snapshots + the racing " + + "data commit"); cleanup(id); } + /** Commits {@code jsonSnapshots} (main -> {@code head}) declaring base {@code l1}'s version. */ + private void commitThroughRepository(TableDto l1, List jsonSnapshots, Snapshot head) { + openHouseInternalRepository.save( + l1.toBuilder() + .tableVersion(l1.getTableLocation()) + .jsonSnapshots(jsonSnapshots) + .snapshotRefs(refs(head)) + .build()); + } + /** * Creates a table and commits {@code count} data-bearing snapshots through the repository path, - * returning the latest committed {@link TableDto} (the base the stale writer will load as L1). + * returning the latest committed {@link TableDto} (the base L1 the racers will load). */ - private TableDto createTableWithCommittedDataSnapshots(String tableId, int count) throws Exception { + private TableDto createTableWithCommittedDataSnapshots(String tableId, int count) + throws Exception { Schema schema = new Schema( required(1, "id", Types.StringType.get()), optional(2, "data", Types.StringType.get())); @@ -237,19 +191,30 @@ private TableDto createTableWithCommittedDataSnapshots(String tableId, int count for (int i = 0; i < count; i++) { Snapshot snapshot = catalog.loadTable(id).newAppend().appendFile(dummyDataFile()).apply(); committed.add(snapshot); - List jsonSnapshots = - committed.stream().map(SnapshotParser::toJson).collect(Collectors.toList()); dto = openHouseInternalRepository.save( dto.toBuilder() .tableVersion(dto.getTableLocation()) - .jsonSnapshots(jsonSnapshots) + .jsonSnapshots( + committed.stream().map(SnapshotParser::toJson).collect(Collectors.toList())) .snapshotRefs(refs(snapshot)) .build()); } return dto; } + private static List appended(List existing, Snapshot extra) { + List all = new ArrayList<>(existing); + all.add(extra); + return all; + } + + private static List snapshotJson(List existing, Snapshot extra) { + return appended(existing, extra).stream() + .map(SnapshotParser::toJson) + .collect(Collectors.toList()); + } + private static TableIdentifier idOf(TableDto dto) { return TableIdentifier.of(dto.getDatabaseId(), dto.getTableId()); } @@ -273,10 +238,10 @@ private void cleanup(TableIdentifier id) { } /** - * Invalidates the per-JVM static retry cache used by {@code failIfRetryUpdate} so the test models - * a stale commit reaching a replica that never cached the prior {@code COMMIT_KEY}. Without this, - * the single-JVM cache hit would short-circuit the second committer before the abort is - * exercised. + * Invalidates the per-JVM static retry cache used by {@code failIfRetryUpdate} so the stale write + * models a request reaching a replica that never cached the racing commit's {@code COMMIT_KEY}. + * Without this, the single-JVM cache hit would reject the stale write before the subtractive + * merge (or the PR #612 abort) is exercised. */ private static void clearPerJvmRetryCache() throws Exception { Field cacheField = OpenHouseInternalTableOperations.class.getDeclaredField("CACHE"); From 582b42ac6bab2a24c400a6ed4e6861658c354f59 Mon Sep 17 00:00:00 2001 From: mkuchenbecker Date: Fri, 29 May 2026 12:45:35 -0700 Subject: [PATCH 09/14] test(tables): add populated-table concurrent-insert case; comments describe behavior only Third case: two writers insert into a table that already holds data; the catalog rejects the stale commit and the concurrent insert remains. Comments rewritten to describe observable behavior without external references. --- .../e2e/h2/StaleBaseLostUpdateTest.java | 93 ++++++++++--------- 1 file changed, 51 insertions(+), 42 deletions(-) diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java index 3fc9a9df9..702016783 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java @@ -41,19 +41,13 @@ import org.springframework.test.context.ContextConfiguration; /** - * A stale writer opens an Iceberg transaction at base L1 and stages its L1 snapshot view + - * {@code COMMIT_KEY=L1}, held uncommitted. A racing writer commits a new snapshot through the - * repository path, advancing the catalog L1 -> L2. Committing the held transaction drives {@code - * BaseTransaction.applyUpdates} to silently refresh the in-flight base to L2 (passing the HTS - * optimistic-version CAS) while keeping the stale payload + {@code COMMIT_KEY=L1}; {@code doCommit} - * then subtracts the racing snapshot. On the buggy catalog the racing commit is silently dropped; - * PR #612's stale-base CAS aborts instead. Invariant: the racing data commit always survives. + * Concurrency tests for committing a snapshot set against a stale base version. * - *

Both cases here are subtractive stale commits — the stale payload omits the racing - * snapshot and does not add a new snapshot whose sequence number would collide with it. That is the - * shape that actually reproduced incident-12185 (an expire/optimizer commit dropping a fresh data - * commit). A stale writer that adds its own snapshot on a multi-snapshot base is instead - * rejected by Iceberg's snapshot sequence-number validation, so that shape cannot lose data. + *

In each test the table is at version L1, a second writer commits a new snapshot (advancing the + * catalog to L2), and then a writer that still declares L1 as its base commits a snapshot set + * computed at L1 — which therefore omits the snapshot the second writer just added. The catalog + * must not drop that concurrently added snapshot: the stale commit is rejected and the concurrently + * added snapshot remains in the table. */ @SpringBootTest @ContextConfiguration(initializers = PropertyOverrideContextInitializer.class) @@ -67,11 +61,10 @@ public class StaleBaseLostUpdateTest { @Autowired Catalog catalog; /** - * Snapshot-expiration commit racing a concurrent data insert — the production maintenance shape, - * on a table with prior committed data history. The stale expire keeps only the current head - * (expiring older snapshots); its keep-subset cannot reference the concurrently-added data - * snapshot, so the subtractive merge would expire the racing data commit. Asserts the data commit - * survives and the stale expire is rejected wholesale. + * An expiration commit declaring a stale base, racing a concurrent insert. The expiring writer + * keeps only the current head and drops older snapshots; its kept set, computed at the stale + * base, does not include the concurrently inserted snapshot. The concurrent insert must remain + * and the expiration must be rejected, leaving the prior snapshots plus the concurrent insert. */ @Test void testExpireSnapshotsDropsConcurrentDataCommit() throws Exception { @@ -87,9 +80,9 @@ void testExpireSnapshotsDropsConcurrentDataCommit() throws Exception { } /** - * Two writers concurrently perform the first insert into a freshly created table. The racing - * writer commits first; the stale writer then commits its own first snapshot still declaring the - * create version as its base. The subtractive merge would drop the racing first commit. + * Two writers each perform the first insert into a freshly created table. One commits first; the + * other then commits its own first snapshot still declaring the create version as its base. The + * first writer's snapshot must remain. */ @Test void testStaleInsertDropsConcurrentDataCommitOnFreshTable() throws Exception { @@ -104,14 +97,31 @@ void testStaleInsertDropsConcurrentDataCommitOnFreshTable() throws Exception { } /** - * Core reproduction. A stale writer holds a transaction at base {@code l1} staging {@code - * stalePayload} (main -> {@code staleHead}); a racing writer commits a fresh data snapshot - * first (L1 -> L2); the held transaction then commits, rebasing the stale payload (which omits - * the racing snapshot) onto L2. Asserts the racing commit survives and the stale commit is - * rejected wholesale (nothing from the stale payload is applied). + * Two writers insert into a table that already holds data, both based on the same version. The + * stale writer's appended snapshot shares a sequence number with the concurrent insert, so the + * catalog rejects the stale commit; the concurrent insert remains. + */ + @Test + void testConcurrentInsertOnPopulatedTableIsRejected() throws Exception { + TableDto l1 = createTableWithCommittedDataSnapshots("insert_race_populated", 2); + TableIdentifier id = idOf(l1); + + Table staleHandle = catalog.loadTable(id); + List base = Lists.newArrayList(staleHandle.snapshots()); + Snapshot staleInsert = staleHandle.newAppend().appendFile(dummyDataFile()).apply(); + + assertRacingDataCommitSurvivesStaleCommit(l1, base, appended(base, staleInsert), staleInsert); + } + + /** + * Loads the table at version {@code l1}, has a second writer commit a fresh data snapshot + * (advancing the catalog), then commits a writer that still declares {@code l1} as its base with + * {@code stalePayload} (pointing main at {@code staleHead}) — a snapshot set that omits the second + * writer's snapshot. Asserts the concurrently committed snapshot remains and the stale commit is + * rejected without applying any of its payload. * - * @param base the snapshots present at L1 (the stale writer's view before its own staging) - * @param stalePayload the full snapshot set the stale writer commits (omits the racing snapshot) + * @param base the snapshots present at {@code l1} + * @param stalePayload the snapshot set the stale writer commits (omits the concurrent snapshot) * @param staleHead the snapshot the stale writer points main at */ private void assertRacingDataCommitSurvivesStaleCommit( @@ -120,7 +130,7 @@ private void assertRacingDataCommitSurvivesStaleCommit( TableIdentifier id = idOf(l1); int priorSnapshotCount = base.size(); - // Stale writer: held transaction at L1 staging its payload + COMMIT_KEY=L1, uncommitted. + // Stale writer holds a transaction at L1 staging its payload, left uncommitted. Transaction staleTxn = catalog.loadTable(id).newTransaction(); staleTxn .updateProperties() @@ -129,11 +139,12 @@ private void assertRacingDataCommitSurvivesStaleCommit( .set(CatalogConstants.COMMIT_KEY, l1.getTableLocation()) .commit(); - // Racing writer (also based on L1) commits a fresh data snapshot first -> L1 -> L2. + // Second writer, also based on L1, commits a fresh data snapshot, advancing the catalog to L2. Snapshot racing = catalog.loadTable(id).newAppend().appendFile(dummyDataFile()).apply(); commitThroughRepository(l1, snapshotJson(base, racing), racing); - clearPerJvmRetryCache(); + // Evaluate the stale commit on its base version rather than short-circuit it as a duplicate. + clearRetryCache(); try { staleTxn.commitTransaction(); @@ -143,16 +154,15 @@ private void assertRacingDataCommitSurvivesStaleCommit( List remaining = Lists.newArrayList(catalog.loadTable(id).snapshots()); LOG.info( - "stale-conflict result: remaining={}", + "remaining snapshots after stale commit={}", remaining.stream().map(Snapshot::snapshotId).collect(Collectors.toList())); Assertions.assertTrue( remaining.stream().anyMatch(s -> s.snapshotId() == racing.snapshotId()), - "racing data commit (" + racing.snapshotId() + ") must not be silently dropped"); + "concurrently committed snapshot (" + racing.snapshotId() + ") must not be dropped"); Assertions.assertEquals( priorSnapshotCount + 1, remaining.size(), - "stale commit must be rejected wholesale; table holds only the prior snapshots + the racing " - + "data commit"); + "table must hold only the prior snapshots plus the concurrently committed snapshot"); cleanup(id); } @@ -168,8 +178,8 @@ private void commitThroughRepository(TableDto l1, List jsonSnapshots, Sn } /** - * Creates a table and commits {@code count} data-bearing snapshots through the repository path, - * returning the latest committed {@link TableDto} (the base L1 the racers will load). + * Creates a table and commits {@code count} data-bearing snapshots, returning the latest + * committed {@link TableDto} (the version both writers load as their base). */ private TableDto createTableWithCommittedDataSnapshots(String tableId, int count) throws Exception { @@ -221,7 +231,8 @@ private static TableIdentifier idOf(TableDto dto) { private DataFile dummyDataFile() throws Exception { return IcebergSnapshotsModelTestUtilities.createDummyDataFile( - Files.createTempFile("incident-12185-", ".orc").toString(), PartitionSpec.unpartitioned()); + Files.createTempFile("stale-base-conflict-", ".orc").toString(), + PartitionSpec.unpartitioned()); } private static Map refs(Snapshot snapshot) { @@ -238,12 +249,10 @@ private void cleanup(TableIdentifier id) { } /** - * Invalidates the per-JVM static retry cache used by {@code failIfRetryUpdate} so the stale write - * models a request reaching a replica that never cached the racing commit's {@code COMMIT_KEY}. - * Without this, the single-JVM cache hit would reject the stale write before the subtractive - * merge (or the PR #612 abort) is exercised. + * Invalidates the per-JVM retry cache so the stale commit is evaluated against its base version + * rather than short-circuited as a duplicate retry of an already-seen commit. */ - private static void clearPerJvmRetryCache() throws Exception { + private static void clearRetryCache() throws Exception { Field cacheField = OpenHouseInternalTableOperations.class.getDeclaredField("CACHE"); cacheField.setAccessible(true); ((com.google.common.cache.Cache) cacheField.get(null)).invalidateAll(); From 3e0a25038a6dddfabef84b8e08f02323d672372c Mon Sep 17 00:00:00 2001 From: mkuchenbecker Date: Fri, 29 May 2026 13:03:48 -0700 Subject: [PATCH 10/14] test(tables): inline trivial helpers; assert final table state as a set Inline commitThroughRepository, snapshotJson, idOf, and refs at their call sites. Assert the table holds exactly the prior snapshots plus the concurrently committed snapshot, regardless of whether the stale commit throws. --- .../e2e/h2/StaleBaseLostUpdateTest.java | 100 ++++++++---------- 1 file changed, 45 insertions(+), 55 deletions(-) diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java index 702016783..3cb812ec3 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java @@ -17,7 +17,7 @@ import java.nio.file.Files; import java.util.ArrayList; import java.util.List; -import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import org.apache.iceberg.DataFile; import org.apache.iceberg.PartitionSpec; @@ -33,8 +33,6 @@ import org.apache.iceberg.types.Types; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.annotation.DirtiesContext; @@ -46,16 +44,14 @@ *

In each test the table is at version L1, a second writer commits a new snapshot (advancing the * catalog to L2), and then a writer that still declares L1 as its base commits a snapshot set * computed at L1 — which therefore omits the snapshot the second writer just added. The catalog - * must not drop that concurrently added snapshot: the stale commit is rejected and the concurrently - * added snapshot remains in the table. + * must not drop that concurrently added snapshot: the stale commit is rejected and the + * concurrently added snapshot remains in the table. */ @SpringBootTest @ContextConfiguration(initializers = PropertyOverrideContextInitializer.class) @DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS) public class StaleBaseLostUpdateTest { - private static final Logger LOG = LoggerFactory.getLogger(StaleBaseLostUpdateTest.class); - @Autowired OpenHouseInternalRepository openHouseInternalRepository; @Autowired Catalog catalog; @@ -69,7 +65,7 @@ public class StaleBaseLostUpdateTest { @Test void testExpireSnapshotsDropsConcurrentDataCommit() throws Exception { TableDto l1 = createTableWithCommittedDataSnapshots("expire_race", 2); - TableIdentifier id = idOf(l1); + TableIdentifier id = TableIdentifier.of(l1.getDatabaseId(), l1.getTableId()); Table staleHandle = catalog.loadTable(id); List base = Lists.newArrayList(staleHandle.snapshots()); @@ -87,7 +83,7 @@ void testExpireSnapshotsDropsConcurrentDataCommit() throws Exception { @Test void testStaleInsertDropsConcurrentDataCommitOnFreshTable() throws Exception { TableDto l1 = createTableWithCommittedDataSnapshots("insert_race_fresh", 0); - TableIdentifier id = idOf(l1); + TableIdentifier id = TableIdentifier.of(l1.getDatabaseId(), l1.getTableId()); Table staleHandle = catalog.loadTable(id); List base = Lists.newArrayList(staleHandle.snapshots()); // empty @@ -104,7 +100,7 @@ void testStaleInsertDropsConcurrentDataCommitOnFreshTable() throws Exception { @Test void testConcurrentInsertOnPopulatedTableIsRejected() throws Exception { TableDto l1 = createTableWithCommittedDataSnapshots("insert_race_populated", 2); - TableIdentifier id = idOf(l1); + TableIdentifier id = TableIdentifier.of(l1.getDatabaseId(), l1.getTableId()); Table staleHandle = catalog.loadTable(id); List base = Lists.newArrayList(staleHandle.snapshots()); @@ -117,8 +113,8 @@ void testConcurrentInsertOnPopulatedTableIsRejected() throws Exception { * Loads the table at version {@code l1}, has a second writer commit a fresh data snapshot * (advancing the catalog), then commits a writer that still declares {@code l1} as its base with * {@code stalePayload} (pointing main at {@code staleHead}) — a snapshot set that omits the second - * writer's snapshot. Asserts the concurrently committed snapshot remains and the stale commit is - * rejected without applying any of its payload. + * writer's snapshot. After the stale commit, the table must hold exactly the prior snapshots plus + * the second writer's snapshot: the stale commit applies none of its payload. * * @param base the snapshots present at {@code l1} * @param stalePayload the snapshot set the stale writer commits (omits the concurrent snapshot) @@ -127,56 +123,63 @@ void testConcurrentInsertOnPopulatedTableIsRejected() throws Exception { private void assertRacingDataCommitSurvivesStaleCommit( TableDto l1, List base, List stalePayload, Snapshot staleHead) throws Exception { - TableIdentifier id = idOf(l1); - int priorSnapshotCount = base.size(); + TableIdentifier id = TableIdentifier.of(l1.getDatabaseId(), l1.getTableId()); // Stale writer holds a transaction at L1 staging its payload, left uncommitted. Transaction staleTxn = catalog.loadTable(id).newTransaction(); staleTxn .updateProperties() .set(CatalogConstants.SNAPSHOTS_JSON_KEY, SnapshotsUtil.serializedSnapshots(stalePayload)) - .set(CatalogConstants.SNAPSHOTS_REFS_KEY, SnapshotsUtil.serializeMap(refs(staleHead))) + .set( + CatalogConstants.SNAPSHOTS_REFS_KEY, + SnapshotsUtil.serializeMap( + IcebergSnapshotsModelTestUtilities.obtainSnapshotRefsFromSnapshot( + SnapshotParser.toJson(staleHead)))) .set(CatalogConstants.COMMIT_KEY, l1.getTableLocation()) .commit(); // Second writer, also based on L1, commits a fresh data snapshot, advancing the catalog to L2. Snapshot racing = catalog.loadTable(id).newAppend().appendFile(dummyDataFile()).apply(); - commitThroughRepository(l1, snapshotJson(base, racing), racing); + openHouseInternalRepository.save( + l1.toBuilder() + .tableVersion(l1.getTableLocation()) + .jsonSnapshots( + appended(base, racing).stream() + .map(SnapshotParser::toJson) + .collect(Collectors.toList())) + .snapshotRefs( + IcebergSnapshotsModelTestUtilities.obtainSnapshotRefsFromSnapshot( + SnapshotParser.toJson(racing))) + .build()); // Evaluate the stale commit on its base version rather than short-circuit it as a duplicate. clearRetryCache(); + // The stale commit may be rejected outright; it must not succeed by dropping the concurrent + // snapshot. The table-state assertion below holds whether or not the commit throws. try { staleTxn.commitTransaction(); - } catch (Exception expected) { - LOG.info("stale commit rejected: {}", expected.toString()); + } catch (Exception rejected) { + // Rejection is an acceptable outcome; the table state is asserted below. } - List remaining = Lists.newArrayList(catalog.loadTable(id).snapshots()); - LOG.info( - "remaining snapshots after stale commit={}", - remaining.stream().map(Snapshot::snapshotId).collect(Collectors.toList())); - Assertions.assertTrue( - remaining.stream().anyMatch(s -> s.snapshotId() == racing.snapshotId()), - "concurrently committed snapshot (" + racing.snapshotId() + ") must not be dropped"); + Set expected = + base.stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); + expected.add(racing.snapshotId()); + Set actual = + Lists.newArrayList(catalog.loadTable(id).snapshots()).stream() + .map(Snapshot::snapshotId) + .collect(Collectors.toSet()); Assertions.assertEquals( - priorSnapshotCount + 1, - remaining.size(), - "table must hold only the prior snapshots plus the concurrently committed snapshot"); + expected, + actual, + "table must hold exactly the prior snapshots plus the concurrently committed snapshot " + + racing.snapshotId() + + "; the stale commit must apply none of its own payload"); cleanup(id); } - /** Commits {@code jsonSnapshots} (main -> {@code head}) declaring base {@code l1}'s version. */ - private void commitThroughRepository(TableDto l1, List jsonSnapshots, Snapshot head) { - openHouseInternalRepository.save( - l1.toBuilder() - .tableVersion(l1.getTableLocation()) - .jsonSnapshots(jsonSnapshots) - .snapshotRefs(refs(head)) - .build()); - } - /** * Creates a table and commits {@code count} data-bearing snapshots, returning the latest * committed {@link TableDto} (the version both writers load as their base). @@ -196,7 +199,7 @@ private TableDto createTableWithCommittedDataSnapshots(String tableId, int count .clustering(null) .tableVersion(INITIAL_TABLE_VERSION) .build()); - TableIdentifier id = idOf(dto); + TableIdentifier id = TableIdentifier.of(dto.getDatabaseId(), dto.getTableId()); List committed = new ArrayList<>(); for (int i = 0; i < count; i++) { Snapshot snapshot = catalog.loadTable(id).newAppend().appendFile(dummyDataFile()).apply(); @@ -207,7 +210,9 @@ private TableDto createTableWithCommittedDataSnapshots(String tableId, int count .tableVersion(dto.getTableLocation()) .jsonSnapshots( committed.stream().map(SnapshotParser::toJson).collect(Collectors.toList())) - .snapshotRefs(refs(snapshot)) + .snapshotRefs( + IcebergSnapshotsModelTestUtilities.obtainSnapshotRefsFromSnapshot( + SnapshotParser.toJson(snapshot))) .build()); } return dto; @@ -219,27 +224,12 @@ private static List appended(List existing, Snapshot extra) return all; } - private static List snapshotJson(List existing, Snapshot extra) { - return appended(existing, extra).stream() - .map(SnapshotParser::toJson) - .collect(Collectors.toList()); - } - - private static TableIdentifier idOf(TableDto dto) { - return TableIdentifier.of(dto.getDatabaseId(), dto.getTableId()); - } - private DataFile dummyDataFile() throws Exception { return IcebergSnapshotsModelTestUtilities.createDummyDataFile( Files.createTempFile("stale-base-conflict-", ".orc").toString(), PartitionSpec.unpartitioned()); } - private static Map refs(Snapshot snapshot) { - return IcebergSnapshotsModelTestUtilities.obtainSnapshotRefsFromSnapshot( - SnapshotParser.toJson(snapshot)); - } - private void cleanup(TableIdentifier id) { openHouseInternalRepository.deleteById( TableDtoPrimaryKey.builder() From 6e06360bd2704167cec31cb1a6db793d9a3eabca Mon Sep 17 00:00:00 2001 From: mkuchenbecker Date: Fri, 29 May 2026 13:09:08 -0700 Subject: [PATCH 11/14] test(tables): build snapshot lists inline instead of via a helper Prepare each commit's snapshot list explicitly at the call site so the base + new snapshot is visible in the flow. --- .../tables/e2e/h2/StaleBaseLostUpdateTest.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java index 3cb812ec3..1c48c2879 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java @@ -88,8 +88,10 @@ void testStaleInsertDropsConcurrentDataCommitOnFreshTable() throws Exception { Table staleHandle = catalog.loadTable(id); List base = Lists.newArrayList(staleHandle.snapshots()); // empty Snapshot staleInsert = staleHandle.newAppend().appendFile(dummyDataFile()).apply(); + List stalePayload = new ArrayList<>(base); + stalePayload.add(staleInsert); - assertRacingDataCommitSurvivesStaleCommit(l1, base, appended(base, staleInsert), staleInsert); + assertRacingDataCommitSurvivesStaleCommit(l1, base, stalePayload, staleInsert); } /** @@ -105,8 +107,10 @@ void testConcurrentInsertOnPopulatedTableIsRejected() throws Exception { Table staleHandle = catalog.loadTable(id); List base = Lists.newArrayList(staleHandle.snapshots()); Snapshot staleInsert = staleHandle.newAppend().appendFile(dummyDataFile()).apply(); + List stalePayload = new ArrayList<>(base); + stalePayload.add(staleInsert); - assertRacingDataCommitSurvivesStaleCommit(l1, base, appended(base, staleInsert), staleInsert); + assertRacingDataCommitSurvivesStaleCommit(l1, base, stalePayload, staleInsert); } /** @@ -140,11 +144,13 @@ private void assertRacingDataCommitSurvivesStaleCommit( // Second writer, also based on L1, commits a fresh data snapshot, advancing the catalog to L2. Snapshot racing = catalog.loadTable(id).newAppend().appendFile(dummyDataFile()).apply(); + List snapshotsAfterRace = new ArrayList<>(base); + snapshotsAfterRace.add(racing); openHouseInternalRepository.save( l1.toBuilder() .tableVersion(l1.getTableLocation()) .jsonSnapshots( - appended(base, racing).stream() + snapshotsAfterRace.stream() .map(SnapshotParser::toJson) .collect(Collectors.toList())) .snapshotRefs( @@ -218,12 +224,6 @@ private TableDto createTableWithCommittedDataSnapshots(String tableId, int count return dto; } - private static List appended(List existing, Snapshot extra) { - List all = new ArrayList<>(existing); - all.add(extra); - return all; - } - private DataFile dummyDataFile() throws Exception { return IcebergSnapshotsModelTestUtilities.createDummyDataFile( Files.createTempFile("stale-base-conflict-", ".orc").toString(), From bc4b7ab19b0b65c212ff10be6e15000bcdd7d10a Mon Sep 17 00:00:00 2001 From: mkuchenbecker Date: Fri, 29 May 2026 13:13:51 -0700 Subject: [PATCH 12/14] test(tables): assert the stale commit throws Use assertThrows(CommitFailedException) for the rejection, then assert the table holds exactly the prior snapshots plus the concurrent commit. --- .../e2e/h2/StaleBaseLostUpdateTest.java | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java index 1c48c2879..cb8fc362b 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java @@ -29,6 +29,7 @@ import org.apache.iceberg.Transaction; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.Assertions; @@ -161,16 +162,14 @@ private void assertRacingDataCommitSurvivesStaleCommit( // Evaluate the stale commit on its base version rather than short-circuit it as a duplicate. clearRetryCache(); - // The stale commit may be rejected outright; it must not succeed by dropping the concurrent - // snapshot. The table-state assertion below holds whether or not the commit throws. - try { - staleTxn.commitTransaction(); - } catch (Exception rejected) { - // Rejection is an acceptable outcome; the table state is asserted below. - } + // The stale commit declares a base that no longer matches the catalog, so it must be rejected. + Assertions.assertThrows( + CommitFailedException.class, + staleTxn::commitTransaction, + "the stale commit must be rejected, not applied against the advanced catalog"); - Set expected = - base.stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); + // The table must then hold exactly the prior snapshots plus the concurrently committed snapshot. + Set expected = base.stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); expected.add(racing.snapshotId()); Set actual = Lists.newArrayList(catalog.loadTable(id).snapshots()).stream() @@ -180,8 +179,7 @@ private void assertRacingDataCommitSurvivesStaleCommit( expected, actual, "table must hold exactly the prior snapshots plus the concurrently committed snapshot " - + racing.snapshotId() - + "; the stale commit must apply none of its own payload"); + + racing.snapshotId()); cleanup(id); } From a10d49daad1a88b0c0ff426e8603df506358f5d2 Mon Sep 17 00:00:00 2001 From: mkuchenbecker Date: Fri, 29 May 2026 13:54:24 -0700 Subject: [PATCH 13/14] test(tables): assert rejection by outcome, not type; add metadata-update case Assert the stale commit is rejected (any exception) rather than a specific type, so the contract is rejection + final table state. Add a case where the stale commit changes metadata without adding a snapshot. --- .../e2e/h2/StaleBaseLostUpdateTest.java | 24 +++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java index cb8fc362b..f2e82afa0 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java @@ -114,6 +114,25 @@ void testConcurrentInsertOnPopulatedTableIsRejected() throws Exception { assertRacingDataCommitSurvivesStaleCommit(l1, base, stalePayload, staleInsert); } + /** + * A stale commit that updates metadata without adding a snapshot, racing a concurrent insert. Its + * declared snapshot set is its base view, which omits the concurrent insert, so the diff is a + * pure deletion of the concurrent snapshot. The concurrent insert must remain and the stale + * commit must be rejected. + */ + @Test + void testStaleMetadataUpdateDropsConcurrentDataCommit() throws Exception { + TableDto l1 = createTableWithCommittedDataSnapshots("metadata_update_race", 2); + TableIdentifier id = TableIdentifier.of(l1.getDatabaseId(), l1.getTableId()); + + Table staleHandle = catalog.loadTable(id); + List base = Lists.newArrayList(staleHandle.snapshots()); + Snapshot head = staleHandle.currentSnapshot(); + + // The stale writer adds no snapshot; its declared set is its base view and main stays at head. + assertRacingDataCommitSurvivesStaleCommit(l1, base, base, head); + } + /** * Loads the table at version {@code l1}, has a second writer commit a fresh data snapshot * (advancing the catalog), then commits a writer that still declares {@code l1} as its base with @@ -162,9 +181,10 @@ private void assertRacingDataCommitSurvivesStaleCommit( // Evaluate the stale commit on its base version rather than short-circuit it as a duplicate. clearRetryCache(); - // The stale commit declares a base that no longer matches the catalog, so it must be rejected. + // The stale commit declares a base that no longer matches the catalog, so it must be rejected + // (the rejection's exception type is not part of the contract). Assertions.assertThrows( - CommitFailedException.class, + Exception.class, staleTxn::commitTransaction, "the stale commit must be rejected, not applied against the advanced catalog"); From 2fc233155676f53058b1b226b061122d5b1f240e Mon Sep 17 00:00:00 2001 From: mkuchenbecker Date: Fri, 29 May 2026 14:25:36 -0700 Subject: [PATCH 14/14] test(tables): metadata-update case changes a real table property Set an actual table property (plus the base snapshot view, no new snapshot) so the case is a genuine property-changing commit rather than internal bookkeeping only. --- .../e2e/h2/StaleBaseLostUpdateTest.java | 55 +++++++++++++++++-- 1 file changed, 51 insertions(+), 4 deletions(-) diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java index f2e82afa0..14fd88684 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java @@ -115,8 +115,8 @@ void testConcurrentInsertOnPopulatedTableIsRejected() throws Exception { } /** - * A stale commit that updates metadata without adding a snapshot, racing a concurrent insert. Its - * declared snapshot set is its base view, which omits the concurrent insert, so the diff is a + * A stale commit that changes a table property and adds no snapshot, racing a concurrent insert. + * Its declared snapshot set is its base view, which omits the concurrent insert, so the diff is a * pure deletion of the concurrent snapshot. The concurrent insert must remain and the stale * commit must be rejected. */ @@ -129,8 +129,55 @@ void testStaleMetadataUpdateDropsConcurrentDataCommit() throws Exception { List base = Lists.newArrayList(staleHandle.snapshots()); Snapshot head = staleHandle.currentSnapshot(); - // The stale writer adds no snapshot; its declared set is its base view and main stays at head. - assertRacingDataCommitSurvivesStaleCommit(l1, base, base, head); + // Stale writer: set a table property and carry the base snapshot view (no new snapshot), held. + Transaction staleTxn = staleHandle.newTransaction(); + staleTxn + .updateProperties() + .set("test.stale.property", "changed") + .set(CatalogConstants.SNAPSHOTS_JSON_KEY, SnapshotsUtil.serializedSnapshots(base)) + .set( + CatalogConstants.SNAPSHOTS_REFS_KEY, + SnapshotsUtil.serializeMap( + IcebergSnapshotsModelTestUtilities.obtainSnapshotRefsFromSnapshot( + SnapshotParser.toJson(head)))) + .set(CatalogConstants.COMMIT_KEY, l1.getTableLocation()) + .commit(); + + // Second writer commits a fresh data snapshot, advancing the catalog. + Snapshot racing = catalog.loadTable(id).newAppend().appendFile(dummyDataFile()).apply(); + List snapshotsAfterRace = new ArrayList<>(base); + snapshotsAfterRace.add(racing); + openHouseInternalRepository.save( + l1.toBuilder() + .tableVersion(l1.getTableLocation()) + .jsonSnapshots( + snapshotsAfterRace.stream() + .map(SnapshotParser::toJson) + .collect(Collectors.toList())) + .snapshotRefs( + IcebergSnapshotsModelTestUtilities.obtainSnapshotRefsFromSnapshot( + SnapshotParser.toJson(racing))) + .build()); + + clearRetryCache(); + + Set expected = base.stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); + expected.add(racing.snapshotId()); + Assertions.assertThrows( + Exception.class, + staleTxn::commitTransaction, + "the stale property update must be rejected, not applied against the advanced catalog"); + Set actual = + Lists.newArrayList(catalog.loadTable(id).snapshots()).stream() + .map(Snapshot::snapshotId) + .collect(Collectors.toSet()); + Assertions.assertEquals( + expected, + actual, + "table must hold exactly the prior snapshots plus the concurrently committed snapshot " + + racing.snapshotId()); + + cleanup(id); } /**