Skip to content

[FLINK-35661][table] Fix MiniBatchGroupAggFunction silently dropping records (backport #27505)#28262

Open
nateab wants to merge 1 commit into
apache:release-1.20from
nateab:FLINK-35661-release-1.20
Open

[FLINK-35661][table] Fix MiniBatchGroupAggFunction silently dropping records (backport #27505)#28262
nateab wants to merge 1 commit into
apache:release-1.20from
nateab:FLINK-35661-release-1.20

Conversation

@nateab
Copy link
Copy Markdown
Contributor

@nateab nateab commented May 26, 2026

Backport of #27505 to release-1.20.

What is the purpose of the change

This pull request fixes a bug in MiniBatchGroupAggFunction.finishBundle() where records were being silently dropped when a mini-batch bundle contained a key with only retraction messages and no existing accumulator state.

The root cause was using return instead of continue when inputRows became empty after filtering out leading retraction messages for a key with no state. This caused the method to exit entirely, abandoning processing of all remaining keys in the bundle.

Brief change log

  • Changed return; to continue; in MiniBatchGroupAggFunction.finishBundle() so processing continues to the next key instead of exiting the entire method
  • Added unit test MiniBatchGroupAggFunctionTest that directly verifies the fix by simulating a bundle with multiple keys where the first key has only retractions

Verifying this change

This change added tests and can be verified as follows:

  • Added MiniBatchGroupAggFunctionTest.testFinishBundleContinuesAfterEmptyInputRows() which creates a mock bundle with three keys where the first key has only a DELETE message (no existing state). The test verifies that:
    • Without the fix: 0 outputs are produced (all subsequent keys dropped)
    • With the fix: 2 outputs are produced (keys "bbb" and "ccc" processed correctly)

Backport notes:

  • Cherry-picked cleanly from b81b124 (master). The source file is byte-identical between master pre-fix and release-1.20.
  • Test passes locally on release-1.20: Tests run: 1, Failures: 0, Errors: 0.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): yes (mini-batch aggregation path, but change is minimal - only control flow fix)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

…records

When finishBundle() encounters a key with only retraction messages and no
existing state, it now uses 'continue' instead of 'return' to skip that key
and continue processing remaining keys in the bundle.

This change adds a unit test using KeyedOneInputStreamOperatorTestHarness
with KeyedMapBundleOperator to verify the fix.

(cherry picked from commit b81b124)
@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented May 26, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants