Flink: Fix flaky TestMonitorSource.testStateRestore#16548
Conversation
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Is this generated by the help of an AI agent? The stopWithSavepoint().get() seems like a good solution, but I think it should be changed in all Flink versions. |
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
@pvary thanks for the review! On the AI question: yes, this PR was prepared with AI assistance (drafting the fix, the refactor, and the test changes). Per the AI-assisted contribution guidelines I reviewed and verified it end-to-end — I understand the savepoint-completion race and ran the affected tests locally — and kept the wording and style aligned with the codebase. Happy to walk through any part during review. On applying it to all Flink versions: done in 806876f. I'd originally scoped it to v2.0/v2.1 because the observed CI failure was the |
|
|
||
| assertThat(resultWithSavepoint.poll(Duration.ofSeconds(5L))).isEqualTo(EMPTY_EVENT); | ||
| // Restoring from a savepoint on a busy cluster may take longer than the default 5s poll | ||
| assertThat(resultWithSavepoint.poll(Duration.ofSeconds(30L))).isEqualTo(EMPTY_EVENT); |
There was a problem hiding this comment.
Are we sure we need this? Wasn't this only a concurrency issue?
There was a problem hiding this comment.
The deterministic fix in closeJobClient removes the savepoint-completion race, which was the actual cause of the failure here. I kept the timeout bump as a separate, defensive measure: restoring a job on a busy CI runner (resubmit → schedule → deploy tasks) can occasionally take longer than 5s even when the savepoint is fully written, so the larger timeout guards against that kind of CI lag.
It doesn't slow the normal run down, though — poll(...) returns as soon as the expected event arrives, so on an unloaded runner / in the happy path the extra time is never actually spent; the 30s only caps how long we're willing to wait before failing. That's why I'd lean towards keeping it as a low-cost robustness backstop.
That said, I don't feel strongly about it — if you'd rather rely solely on the deterministic fix and keep the change minimal, just let me know and I'll revert the timeout part.
Closes #16546.
Summary
TestMonitorSource.testStateRestore(thetestStateRestore(File, ClusterClient)variant in the Flink v2.0/v2.1 trees) intermittently fails with aTimeoutExceptionfromCollectingSink.poll(example CI run). The timeout only means the sink queue stayed empty for 5s; the real cause is a savepoint-completion race in the shared test helper, not slow startup.OperatorTestBase.closeJobClient(JobClient, File)discarded theCompletableFuture<String>returned bystopWithSavepointand instead waited for the savepoint directory to appear on disk. That directory is created early in the savepoint process, before the_metadata/state files finish writing. Phase 2 of the test then restores from that path viaclusterClient.submitJob; when the restore races savepoint completion, the restored job never comes up and emits nothing, so the poll times out.What changed
OperatorTestBase.closeJobClientnow awaits thestopWithSavepoint(...)future and returns the path it resolves to, so the savepoint is guaranteed to be fully written before any job restores from it. This mirrors the existing idiom inTestIcebergSourceFailover.testBoundedWithSavepoint, which awaits the savepoint future with.get(). The only caller that passes a non-null savepoint directory isTestMonitorSource.testStateRestore, so the change is scoped to this test.testStateRestoreis raised from 5s to 30s. The assertion stays strict — a genuine re-read emits a non-empty event quickly and still fails fast — so the longer timeout only extends the wait for the (correct) first event, mirroring the "deterministic fix + generous timeout backstop" pattern used whenTestIcebergSourceFailoverwas de-flaked.Both changes apply to the Flink v1.20, v2.0, and v2.1 trees. The observed CI failure was the
testStateRestore(File, ClusterClient)variant that only exists in v2.0/v2.1, but the same savepoint-completion race is latent in v1.20'scloseJobClient, so the fix is applied there as well. The v1.20testStateRestorerestores viaenv.executeAsyncfrom aConfigurationcarryingSavepointConfigOptions.SAVEPOINT_PATHrather thanclusterClient.submitJob, so itscloseJobClientnow sets that path from the awaitedstopWithSavepoint(...)future and the Phase 2 poll backstop is raised to 30s to match v2.x.