Skip to content

[AgentGateway]feat: Prefix Trie Storage for Multi Trajectories #65

Draft
gxlvera wants to merge 9 commits into
verl-project:mainfrom
gxlvera:feat/gateway-prefix-trie
Draft

[AgentGateway]feat: Prefix Trie Storage for Multi Trajectories #65
gxlvera wants to merge 9 commits into
verl-project:mainfrom
gxlvera:feat/gateway-prefix-trie

Conversation

@gxlvera

@gxlvera gxlvera commented Jun 18, 2026

Copy link
Copy Markdown

Summary

Implements the per-session prefix trie that replaces the single linear active_trajectory, per RFC #51. Today a session keeps one message_history + one active trajectory, so only the latest branch can be re-attached; this generalizes it into a trie where an incoming request longest-prefix matches any path and continues from the nearest checkpoint — enabling sub-agent / best-of-N / context-condensation / warm-start.

Fully gated behind GatewayActorConfig.gateway_trie_enabled (default off); flag-off reproduces the legacy single-active behavior exactly (covered by a parity test).

What's in this PR (full M1)

Data model — gateway/trie.py (new)

  • MessageKey (+ canonicalize_content / make_message_key): hashable per-message identity; excludes reasoning_content (M1; TODO thinking-model replay) and keys multimodal content via a stable image/video digest (pixels live on the node, not the key).
  • BranchCheckpoint, TrieNode, PrefixTrie with the prepare → commit lifecycle and a finalize traversal (iter_export_nodes → terminal checkpoints only by default).
  • prepare materializes incoming prompt-side messages as pending/structural nodes (no checkpoint, never exported) and clones the nearest checkpoint into a request-local buffer.
  • TrajectoryBuffer re-homed to gateway/types.py so the trie stays free of the verl-importing codec.

Session integration — gateway/session.py

  • _prepare_generation_inputs_trie: routes through trie.prepare (longest-prefix match + nearest-checkpoint clone) and reuses the existing encode_full / encode_incremental paths — only the state model changes.
  • commit attaches the assistant child via trie.commit; finalize traverses the trie → one branch-aware Trajectory per terminal checkpoint (reward stamped per the current one-session-one-reward contract).
  • Per-branch multimodal stored on the committed node (new-this-turn images) and reconstructed along the path. Failed generations abandon the pending node. snapshot_state reports num_branches / num_inflight_generations in trie mode.
  • Tools-change gate preserved (full re-encode), mirroring the legacy active_tool_schemas gate.

Wiring: config.py / gateway.py thread gateway_trie_enabled into sessions.

Tests (CPU-only)

  • tests/uni_agent/gateway/test_trie_on_cpu.py — 16 trie-structural tests covering the RFC appendix fork types: A sequential extension, B system-keyed split (parallel sub-agents), C context condensation, D best-of-N + idempotent retry, E warm-start; plus pending-node/no-export, clone isolation, terminal-vs-all export, multimodal digest + per-branch reconstruction.
  • tests/uni_agent/gateway/test_session_trie_on_cpu.py — 3 session-integration tests: flag parity (trie-on yields identical backend prompts + identical finalized trajectory as trie-off on a linear multi-turn conversation — the compatibility gate), best-of-N fan-out (N siblings → N trajectories), multi-turn reattach (single continuous branch).

Scope / deferrals

  • Tokenization boundary: reuse current remove_system_prompt (text via tokenizer, multimodal via processor). Per-model seam handling (Qwen trailing \n, GLM 4.7 ambiguous bos/eos) + a token-sequence verifier → M2 (TITO-style merge).
  • Concurrency: generation_lock kept (serial) → M3.
  • GC/eviction: none — the trie dies with the session at finalize/abort.
  • Reward: one-session-one-reward; reward contract unchanged.
  • No new caller API — branching is purely message-driven.

Note that this PR is a more complete version of the closed #64 and this description is produced by @ChangyiYang.

ChangyiYang and others added 7 commits June 18, 2026 07:50
Add a per-session prefix trie to replace the single linear active trajectory
(issue verl-project#51): every committed assistant turn becomes a node that may carry a
checkpoint, and an incoming request longest-prefix matches any path and
continues from the nearest checkpoint.

- session/trie.py: MessageKey (+canonicalize_content/make_message_key),
  BranchCheckpoint, TrieNode, PrefixTrie (prepare/commit/finalize-traversal).
  MessageKey excludes reasoning_content (M1) and hashes multimodal content via
  a stable image/video digest (data:/long strings are hashed for compactness).
- session/types.py: home TrajectoryBuffer here so trie stays free of the
  verl-importing codec; re-exported via session/__init__.
- CPU tests cover the RFC appendix fork types (sequential / system-split /
  condensation / best-of-N + idempotent retry / warm-start) plus pending-node,
  clone-isolation, export, multimodal-digest, and regression guards that fail
  on the pre-review behavior.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Wire the trie into GatewaySession end-to-end, gated by
GatewayActorConfig.gateway_trie_enabled (default False; flag-off reproduces the
legacy single-active behavior exactly, covered by a parity test).

- session.py: trie-mode prepare routes through trie.prepare (longest-prefix
  match + nearest-checkpoint clone) and reuses the existing
  encode_full/encode_incremental paths; commit attaches the assistant child;
  finalize traverses the trie -> one branch-aware trajectory per terminal
  checkpoint. Per-branch multimodal stored on the node; failed generations
  abandon the pending node; snapshot_state reports num_branches/num_inflight.
  tools-change gate mirrors the legacy active_tool_schemas gate.
- config.py/gateway.py: thread gateway_trie_enabled into sessions.
- session-integration tests: flag parity, best-of-N fan-out, multi-turn reattach.

Tokenization stays on remove_system_prompt (TITO -> M2); generation_lock kept
(concurrency -> M3); no GC (trie dies with the session).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
- _prepare_generation_inputs_trie: abandon the pending node if encoding fails
  or is cancelled after trie.prepare (try/except BaseException around the
  encode step), preventing in-flight bookkeeping leaks. Added a session test
  that fails without the guard.
- _media_digest: hash repr(_freeze(value)) in the scalar fallback so nested
  dict/list ordering is canonical across processes.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
- session.py: on a full re-encode mid-branch (e.g. tools change), store only
  this turn's delta media on the committed node instead of the whole prompt's
  media, so collect_multi_modal does not double-count media carried on ancestor
  checkpoints. Added a regression test (multimodal + tools-change mid-branch)
  that fails without the fix.
- trie.py: defensively shallow-copy message dicts when storing them in nodes
  (materialize_prompt_suffix, upsert_assistant) and in the checkpoint's
  covered_messages, isolating trie state from external payload mutation.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
- session.py: catch BaseException around backend.generate so a cancellation
  (asyncio.CancelledError, which is not an Exception) still abandons the trie
  pending node before propagating. Added a regression test that fails without
  the guard.
- trie.py: move 'import json' to module top (PEP 8) instead of inline.

Co-Authored-By: Changyi Yang <changyiyang2023@gmail.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a prefix trie (PrefixTrie) to support multi-trajectory session storage in the gateway, enabling features like multi-branch reattachment, best-of-N, sub-agents, and warm-starts. The changes integrate the trie into GatewaySession under a new configuration flag and add comprehensive CPU-only unit and integration tests. The review feedback focuses on a key memory efficiency improvement: removing the redundant messages list from BranchCheckpoint to avoid O(N^2) space complexity, as the full conversation history can already be reconstructed on demand using rebuild_messages.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +197 to +203
trajectory_buffer: TrajectoryBuffer
request_tools: list[dict[str, Any]] | None = None
chat_template_kwargs_key: tuple | None = None
messages: list[dict[str, Any]] | None = None
image_data: list[Any] | None = None
video_data: list[Any] | None = None
extra_fields: dict[str, Any] | None = None

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Memory Efficiency Improvement: Redundant messages in BranchCheckpoint

Storing messages in BranchCheckpoint introduces O(N^2) space complexity for message storage over a session of N turns, because each checkpoint stores a copy of the entire conversation history up to that point.

Since PrefixTrie already implements rebuild_messages(node) (which reconstructs the exact same message list by walking up the parent pointers in O(N) time), storing messages on the checkpoint is redundant.

By removing messages from BranchCheckpoint, we can reduce the space complexity to O(N) and avoid significant memory bloat in long conversations (especially those with large system prompts or tool outputs).

Suggested change
trajectory_buffer: TrajectoryBuffer
request_tools: list[dict[str, Any]] | None = None
chat_template_kwargs_key: tuple | None = None
messages: list[dict[str, Any]] | None = None
image_data: list[Any] | None = None
video_data: list[Any] | None = None
extra_fields: dict[str, Any] | None = None
trajectory_buffer: TrajectoryBuffer
request_tools: list[dict[str, Any]] | None = None
chat_template_kwargs_key: tuple | None = None
image_data: list[Any] | None = None
video_data: list[Any] | None = None
extra_fields: dict[str, Any] | None = None

video_data=None,
)

checkpoint_messages = ckpt.messages or self.rebuild_messages(ckpt_node)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Simplify checkpoint_messages retrieval by directly calling self.rebuild_messages(ckpt_node) now that messages is removed from BranchCheckpoint.

Suggested change
checkpoint_messages = ckpt.messages or self.rebuild_messages(ckpt_node)
checkpoint_messages = self.rebuild_messages(ckpt_node)

Comment on lines +433 to +446
# The checkpoint lives on the assistant node and its buffer covers the
# request prompt PLUS the generated assistant turn, so the stored prefix
# must include ``assistant_msg`` (this is what the next turn's
# ``checkpoint_messages`` slices against).
covered_messages = [dict(m) for m in messages] + [dict(assistant_msg)] if messages is not None else None
checkpoint = BranchCheckpoint(
trajectory_buffer=trajectory_buffer,
request_tools=request_tools,
chat_template_kwargs_key=chat_template_kwargs_key,
messages=covered_messages,
image_data=image_data,
video_data=video_data,
extra_fields=extra_fields,
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Redundant messages in BranchCheckpoint (Commit Step)

As noted in the BranchCheckpoint definition, storing the full conversation history at each checkpoint is redundant because rebuild_messages can reconstruct it on demand. Removing covered_messages here avoids O(N^2) memory overhead.

        checkpoint = BranchCheckpoint(
            trajectory_buffer=trajectory_buffer,
            request_tools=request_tools,
            chat_template_kwargs_key=chat_template_kwargs_key,
            image_data=image_data,
            video_data=video_data,
            extra_fields=extra_fields,
        )

Comment thread uni_agent/gateway/session/session.py Outdated
trajectories: list[Trajectory] = []
for node in self._trie.iter_export_nodes():
checkpoint = node.checkpoint
messages = checkpoint.messages or self._trie.rebuild_messages(node)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Simplify messages retrieval by directly calling self._trie.rebuild_messages(node) now that messages is removed from BranchCheckpoint.

Suggested change
messages = checkpoint.messages or self._trie.rebuild_messages(node)
messages = self._trie.rebuild_messages(node)

@gxlvera

gxlvera commented Jun 18, 2026

Copy link
Copy Markdown
Author

With prefix trie, I think maybe we can get rid of generation lock but only use request lock? the reason is stated in details in: #51 as

One shared state requires a generation lock and serial LLM calls. With a trie, each call owns a cloned branch state; tokenize and commit can interleave—supporting sub-agents, best-of-n, etc.

@zhanghuiyao zhanghuiyao left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two trajectory parity comments from local review.

# Reuse the cloned checkpoint only when its tools match this request;
# a tools change forces a full re-encode (mirrors the legacy
# ``active_tool_schemas != tools`` gate).
use_incremental = prepared.trajectory_buffer is not None and prepared.request_tools == tools

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: when prepared.request_tools != tools, this forces a full re-encode but does not materialize/export the previous checkpoint. The new checkpoint is still committed as a descendant, while finalize exports only terminal checkpoints, so the parent trajectory's trainable response tokens are dropped. This should either create an explicit export boundary, or be documented/tested as an intentional flag-on/off parity exception.

differing output creates a new sibling (best-of-N).
"""
key = make_message_key(assistant_msg)
child = parent.children.get(key)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1/P2: using only MessageKey here conflates idempotent retry with independent best-of-N samples. If two sampled completions are token-identical, the second commit reuses this child and overwrites the checkpoint, so trie mode exports 1 trajectory where legacy keeps 2 samples. Training semantics should distinguish retry from duplicate samples, or document/test this as an intentional contract.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants