Skip to content

[BUG] Flink StreamWriteOperatorCoordinator fails with HoodieWriteConflictException on restart during recommitInstant #19024

@suryaprasanna

Description

@suryaprasanna

Describe the bug

When a Flink streaming job restarts with pending inflight instants and OCC (Optimistic Concurrency Control) is enabled, StreamWriteOperatorCoordinator.restoreEvents() fails with HoodieWriteConflictException during recommitInstant(). The conflict is a false positive — it detects overlapping file IDs between the inflight instant being recommitted and the job's own prior completed commits.

To Reproduce

  1. Start a Flink streaming upsert job with hoodie.write.concurrency.mode=OPTIMISTIC_CONCURRENCY_CONTROL and metadata table enabled
  2. Let the job run for several checkpoints (multiple deltacommits on the timeline)
  3. Kill the job while an instant is inflight (not yet committed)
  4. Restart the job — restoreEvents() attempts to recommit the inflight instant
  5. Job fails with HoodieWriteConflictException

Expected behavior

The recommit should succeed when the only completed instants on the timeline are from the same writer's prior checkpoints. Conflict resolution should only flag genuinely concurrent commits from other writers.

Environment

  • Hudi version: 1.0.0+ (also affects 1.0.1, 1.1.0, 1.2)
  • Flink version: 1.18+
  • Storage: HDFS / Cloud (GCS/CFS)

Stack trace

org.apache.hudi.exception.HoodieWriteConflictException: java.util.ConcurrentModificationException: Cannot resolve conflicts for overlapping writes between first operation = ConcurrentOperation(actionState=INFLIGHT, actionType=deltacommit, instantTime=20260615144320397), second operation = ConcurrentOperation(actionState=COMPLETED, actionType=deltacommit, instantTime=20260612230825400)
    at o.a.h.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy.resolveConflict(SimpleConcurrentFileWritesConflictResolutionStrategy.java:205)
    at o.a.h.client.utils.TransactionUtils.lambda$resolveWriteConflictIfAny$0(TransactionUtils.java:94)
    at o.a.h.client.utils.TransactionUtils.resolveWriteConflictIfAny(TransactionUtils.java:88)
    at o.a.h.client.BaseHoodieClient.resolveWriteConflict(BaseHoodieClient.java:238)
    at o.a.h.client.BaseHoodieWriteClient.preCommit(BaseHoodieWriteClient.java:431)
    at o.a.h.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:270)
    at o.a.h.client.HoodieFlinkWriteClient.commit(HoodieFlinkWriteClient.java:165)
    at o.a.h.client.BaseHoodieWriteClient.commit(BaseHoodieWriteClient.java:224)
    at o.a.h.sink.StreamWriteOperatorCoordinator.doCommit(StreamWriteOperatorCoordinator.java:677)
    at o.a.h.sink.StreamWriteOperatorCoordinator.commitInstant(StreamWriteOperatorCoordinator.java:648)
    at o.a.h.sink.StreamWriteOperatorCoordinator.recommitInstant(StreamWriteOperatorCoordinator.java:565)
    at o.a.h.sink.StreamWriteOperatorCoordinator.restoreEvents(StreamWriteOperatorCoordinator.java:455)
    at o.a.h.sink.StreamWriteOperatorCoordinator.start(StreamWriteOperatorCoordinator.java:265)
    at o.a.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:185)

PR: #19023

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions