Skip to content

[Feature] Support TTL on ShortTermMemory#657

Open
da-daken wants to merge 4 commits into
apache:mainfrom
da-daken:new_short_memory_ttl
Open

[Feature] Support TTL on ShortTermMemory#657
da-daken wants to merge 4 commits into
apache:mainfrom
da-daken:new_short_memory_ttl

Conversation

@da-daken
Copy link
Copy Markdown
Contributor

@da-daken da-daken commented May 11, 2026

Linked issue: #580

Purpose of change

Support controlling TTL on Short Term memory.

Tests

ut

API

no api

Documentation

  • doc-needed
  • doc-not-needed
  • doc-included

@github-actions github-actions Bot added doc-needed Your PR changes impact docs. fixVersion/0.3.0 The feature or bug should be implemented/fixed in the 0.3.0 version. priority/major Default priority of the PR or issue. labels May 11, 2026
@wenjin272
Copy link
Copy Markdown
Collaborator

Hi, @da-daken, thanks for your contribution. Due to the imminent code freeze for version 0.3, I will prioritize handling some must-have items for this release. This may delay my review of your two PRs, but I estimate that I will complete the reviews within a week at the latest.

Copy link
Copy Markdown
Collaborator

@weiqingy weiqingy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for picking this up — a few cross-cutting questions before this lands:

  1. Python mirror is missing. python/flink_agents/api/core_options.py mirrors every entry in the Java AgentExecutionOptions today (MAX_RETRIES, RETRY_WAIT_INTERVAL, the async flags, etc.). The three new short-term-memory TTL options are not added on the Python side, so a Python user can't reach them via the typed API — they'd have to fall back to raw string keys. Could we add the mirrors (and a matching test) in this PR? CLAUDE.md is explicit about Java↔Python parity for shared options.

  2. Are we comfortable exposing StateTtlConfig.UpdateType and StateTtlConfig.StateVisibility directly in the public API? Two of the new options take Flink internal enums as their value type, in a class under org.apache.flink.agents.api.agents. That couples the public surface to Flink's state API forever, and makes the Python mirror painful (we'd need to redefine the enums on the Python side or fall back to strings). One alternative is to wrap them in our own enums (ShortTermMemoryTtlUpdate / ShortTermMemoryTtlVisibility) with one-to-one mappings — costs a few lines and a translation step, buys API independence. What do you think?

}

@Test
void testTTLConfigurationApplied() throws Exception {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two gaps in coverage worth closing while we're here:

  • TTL disabled path. No test exercises the early-return in maybeEnableShortTermMemoryTTL (TTL_MS = 0 or unset). That branch is the default for every existing pipeline, so a regression that accidentally enables TTL with 0ms would be silent. A short scenario with ttlMs = 0 confirming the third event1 returns EXISTING regardless of sleep would cover it.
  • Default update-type / visibility. Both tests explicitly override the update-type to OnCreateAndWrite. The option defaults (OnReadAndWrite / NeverReturnExpired) are never actually exercised, so a regression in how the defaults are wired would slip through. One scenario that sets only TTL_MS and asserts the same expiry behavior would catch that.

/**
* When {@link AgentExecutionOptions#SHORT_TERM_MEMORY_STATE_TTL_MS} is positive, attaches Flink
* {@link StateTtlConfig} to the short-term memory {@link MapStateDescriptor}. Unset, null, or
* non-positive values disable TTL (Flink does not allow zero/negative TTL).
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice — recording "Flink does not allow zero/negative TTL" at the workaround site spares future maintainers a trip through Flink source.

daken added 2 commits May 27, 2026 23:29
@da-daken
Copy link
Copy Markdown
Contributor Author

Thanks for picking this up — a few cross-cutting questions before this lands:

  1. Python mirror is missing. python/flink_agents/api/core_options.py mirrors every entry in the Java AgentExecutionOptions today (MAX_RETRIES, RETRY_WAIT_INTERVAL, the async flags, etc.). The three new short-term-memory TTL options are not added on the Python side, so a Python user can't reach them via the typed API — they'd have to fall back to raw string keys. Could we add the mirrors (and a matching test) in this PR? CLAUDE.md is explicit about Java↔Python parity for shared options.
  2. Are we comfortable exposing StateTtlConfig.UpdateType and StateTtlConfig.StateVisibility directly in the public API? Two of the new options take Flink internal enums as their value type, in a class under org.apache.flink.agents.api.agents. That couples the public surface to Flink's state API forever, and makes the Python mirror painful (we'd need to redefine the enums on the Python side or fall back to strings). One alternative is to wrap them in our own enums (ShortTermMemoryTtlUpdate / ShortTermMemoryTtlVisibility) with one-to-one mappings — costs a few lines and a translation step, buys API independence. What do you think?

@weiqingy Thanks for your review. You're right, I submitted the new code. PTAL

new ConfigOption<>("rag.async", Boolean.class, true);

public static final ConfigOption<Long> SHORT_TERM_MEMORY_STATE_TTL_MS =
new ConfigOption<>("short-term-memory.state-ttl.ms", Long.class, 0L);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: 0L doubles as the "TTL disabled" sentinel, but that contract only lives in OperatorStateManager.maybeEnableShortTermMemoryTTL. A one-line javadoc here — e.g. "Set to a positive value in milliseconds to enable TTL; 0 (the default) disables it" — would spare future readers a trip into the runtime. Same applies to the two enum options below: worth noting they're only consulted when TTL_MS > 0.

}

@Test
void testTTLConfigurationNotApplied() throws Exception {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: testTTLConfigurationNotApplied vs testTTLConfigurationApplied (line 109) is a bit misleading — TTL is configured in both; what differs is whether enough time elapsed for entries to expire (sleep_ms=0 vs 2000). Names like testValueStillVisibleBeforeTTLExpiry / testValueExpiresAfterTTL would express the actual assertion. Same suggestion for the Python mirror in short_term_memory_ttl_test.py:126,159.

@weiqingy
Copy link
Copy Markdown
Collaborator

LGTM — both prior concerns are addressed cleanly. Only two small readability nits inline. Thanks for the quick turnaround.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-needed Your PR changes impact docs. fixVersion/0.3.0 The feature or bug should be implemented/fixed in the 0.3.0 version. priority/major Default priority of the PR or issue.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants