Skip to content

perf(got): batch aggregation pairs to reduce scheduler round-trips#449

Open
zatchbell1311-wq wants to merge 4 commits into
pie-project:mainfrom
zatchbell1311-wq:feat/got-batched-aggregation
Open

perf(got): batch aggregation pairs to reduce scheduler round-trips#449
zatchbell1311-wq wants to merge 4 commits into
pie-project:mainfrom
zatchbell1311-wq:feat/got-batched-aggregation

Conversation

@zatchbell1311-wq

@zatchbell1311-wq zatchbell1311-wq commented Jun 26, 2026

Copy link
Copy Markdown
Contributor

Problem

Two issues in the original GoT implementation:

  1. Sequential pairing — proposals were processed one at a time in a while let Some loop, preventing the scheduler from seeing all aggregation requests simultaneously and batching them into a single forward pass.

  2. Question context loss — during aggregation, the forked context carries full conversation history, so the model loses track of what problem it's solving, causing incorrect answers.

Fix

  • Collect all proposals via future::join_all before pairing, then launch all aggregation pairs simultaneously. The scheduler can now coalesce all N/2 flush() calls into a single batched forward pass per level.
  • Inject the original question explicitly in each aggregation prompt so the model always has full context.
  • Extract launch_aggregation_pairs() as a reusable helper used across all aggregation levels.
  • Add guard rejecting inputs with fewer than 4 proposals.

Benchmark (5 GSM8K samples, Qwen3-0.6B, CUDA, RTX 2050)

Accuracy Avg Latency Avg Tokens
Text-Completion 3/5 (60%) 14.5s 86
Graph-of-Thought 2/5 (40%) 26.3s 253

GoT uses 3x more tokens and is 1.8x slower on this sample. The accuracy gap traces to the aggregation context issue addressed here. Further improvement requires restructuring aggregation to use fresh contexts rather than forked ones.

Closes #418 (partially — documents the context loss root cause)

@coderabbitai

coderabbitai Bot commented Jun 26, 2026

Copy link
Copy Markdown

Review Change Stack

Walkthrough

The GoT example now generates proposals and both aggregation levels in batches with join_all, using a new helper to pair intermediate (String, Context) results. The example description and imports were updated to match the batched flow.

Changes

Graph-of-Thought batched aggregation

Layer / File(s) Summary
Docs and concurrency imports
inferlets/graph-of-thought/src/lib.rs
The example description and futures imports are updated for the batched aggregation flow.
Pair-launch helper
inferlets/graph-of-thought/src/lib.rs
launch_aggregation_pairs forms adjacent (String, Context) pairs, starts concurrent aggregation tasks, and drops any unpaired trailing context.
Aggregation setup and input validation
inferlets/graph-of-thought/src/lib.rs
run_hierarchical_aggregation flushes the proposal prompt, builds batched proposal futures, and main rejects proposal token lists shorter than four entries.
Hierarchical batching flow
inferlets/graph-of-thought/src/lib.rs
run_hierarchical_aggregation collects proposal results, launches both aggregation levels with join_all, and returns the final aggregated texts.
🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly summarizes the main change: batching GoT aggregation pairs to reduce scheduler round-trips.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
inferlets/graph-of-thought/src/lib.rs (1)

117-120: 🩺 Stability & Availability | 🔵 Trivial

Use try_join_all to short-circuit on the first error.

The futures = "0.3" dependency is available, which supports futures::future::try_join_all. The current pattern of join_all(...).await.into_iter().collect() waits for all futures to complete even if one fails, potentially wasting resources and delaying error reporting.

Replace the manual collection with try_join_all to:

  1. Preserve concurrent execution of all futures.
  2. Return immediately upon the first error.
  3. Cancel remaining pending futures (if their Future implementation supports cancellation).
    let proposal_futures = proposal_tokens
        .into_iter()
        .map(|max_tokens| {
            // ...
        });
-   let proposals: Vec<(String, Context)> = future::join_all(proposal_futures)
-       .await
-       .into_iter()
-       .collect::<Result<_>>()?;
+   let proposals: Vec<(String, Context)> = future::try_join_all(proposal_futures).await?;

Also replace the occurrences in:

  • Lines 126–129 (first_aggregations)
  • Lines 133–136 (final_results)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@inferlets/graph-of-thought/src/lib.rs` around lines 117 - 120, Replace the
manual `future::join_all(...).await.into_iter().collect::<Result<_>>()?`
patterns in `lib.rs` with `future::try_join_all` so the async work
short-circuits on the first error instead of waiting for every future to finish.
Update the `proposals`, `first_aggregations`, and `final_results` joins in the
relevant helper flow to use `try_join_all` while keeping the same concurrent
execution and result types.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@inferlets/graph-of-thought/src/lib.rs`:
- Around line 94-99: The proposal pipeline in graph-of-thought can succeed with
an empty result when there are fewer than four proposal branches, because the
fixed aggregation stages in the flow leading from proposal_futures can never
form a second-level pair. Add an upfront validation in the main execution path
before starting Stage 1 or the later aggregation steps to reject insufficient
proposal counts, and return an error instead of Ok(vec![]) so main cannot report
success without a final solution.
- Around line 65-82: The aggregation loop in lib.rs assumes contexts are freed
immediately when dropped, but Context relies on destroy(self) for deterministic
KV-page release. Update the logic around the while let Some((text_a, _ctx_a))
pattern so discarded contexts are explicitly destroyed via Context::destroy(),
including the unused ctx_a and any odd trailing context, while keeping ctx_b
alive only for the async task in the aggregation flow.

---

Nitpick comments:
In `@inferlets/graph-of-thought/src/lib.rs`:
- Around line 117-120: Replace the manual
`future::join_all(...).await.into_iter().collect::<Result<_>>()?` patterns in
`lib.rs` with `future::try_join_all` so the async work short-circuits on the
first error instead of waiting for every future to finish. Update the
`proposals`, `first_aggregations`, and `final_results` joins in the relevant
helper flow to use `try_join_all` while keeping the same concurrent execution
and result types.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: f4808318-76eb-4f0b-b6fb-ee2aff171833

📥 Commits

Reviewing files that changed from the base of the PR and between 17e5125 and 109f9ad.

📒 Files selected for processing (1)
  • inferlets/graph-of-thought/src/lib.rs

Comment on lines +65 to +82
while let Some((text_a, _ctx_a)) = iter.next() {
if let Some((_text_b, mut ctx_b)) = iter.next() {
// ctx_a is dropped here — its unique KV pages are freed immediately.
// ctx_b carries the shared prefix forward into the aggregation.
let prompt = format!("{}{}", AGGREGATE_PROMPT, text_a);
ctx_b.user(&prompt);
ctx_b.cue();
tasks.push(async move {
let text = ctx_b
.generate(Sampler::TopP { temperature: 0.6, p: 0.95 })
.max_tokens(aggregation_tokens)
.collect_text()
.await?;
Ok((text, ctx_b))
});
}
// Odd item out: context dropped, KV pages freed.
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Description: Inspect Context lifecycle/drop behavior without executing repository code.
rg -n -C4 'impl\s+Drop\s+for\s+Context|fn\s+drop\(|pub fn destroy|pub fn suspend' sdk/rust/inferlet/src/context.rs sdk/rust/inferlet/src

Repository: pie-project/pie

Length of output: 3738


🏁 Script executed:

#!/bin/bash
# Verify Context definition and Drop implementation
# 1. Find the struct definition of Context
echo "=== Searching Context struct definition ==="
rg -n -B5 -A20 '^pub struct Context' sdk/rust/inferlet/src/context.rs

echo -e "\n=== Searching for 'impl.*Context' blocks ==="
rg -n -A3 'pub impl.*Context|impl.*Context' sdk/rust/inferlet/src/context.rs | head -50

echo -e "\n=== Checking if Context is an alias or has derived Drop ==="
rg -n -B2 -A10 'pub type Context|struct Context' sdk/rust/inferlet/src/context.rs

Repository: pie-project/pie

Length of output: 2383


🏁 Script executed:

#!/bin/bash
# Check RawContext Drop implementation
echo "=== Searching for RawContext struct and Drop ==="
rg -n -B2 -A10 'pub struct RawContext|impl\s+Drop\s+for\s+RawContext' sdk/rust/inferlet/src/context.rs

echo -e "\n=== Searching for 'destroy' implementation in RawContext or inner ==="
rg -n -B2 -A5 'fn\s+destroy\s*\(' sdk/rust/inferlet/src/context.rs | head -30

Repository: pie-project/pie

Length of output: 504


Use explicit Context::destroy() for contexts intentionally discarded.

The Context struct does not implement a custom Drop trait to immediately free KV pages upon scope exit. Relying on the default drop behavior makes the claim that "KV pages are freed immediately" uncertain. The API provides destroy(self) specifically for deterministic, immediate release of resources.

Explicitly call destroy() for ctx_a and any odd trailing context to guarantee the intended lifecycle behavior.

♻️ Proposed lifecycle cleanup
-    while let Some((text_a, _ctx_a)) = iter.next() {
+    while let Some((text_a, ctx_a)) = iter.next() {
         if let Some((_text_b, mut ctx_b)) = iter.next() {
-            // ctx_a is dropped here — its unique KV pages are freed immediately.
+            ctx_a.destroy();
             // ctx_b carries the shared prefix forward into the aggregation.
             let prompt = format!("{}{}", AGGREGATE_PROMPT, text_a);
             ctx_b.user(&prompt);
             ctx_b.cue();
@@
-        }
-        // Odd item out: context dropped, KV pages freed.
+        } else {
+            ctx_a.destroy();
+        }
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
while let Some((text_a, _ctx_a)) = iter.next() {
if let Some((_text_b, mut ctx_b)) = iter.next() {
// ctx_a is dropped here — its unique KV pages are freed immediately.
// ctx_b carries the shared prefix forward into the aggregation.
let prompt = format!("{}{}", AGGREGATE_PROMPT, text_a);
ctx_b.user(&prompt);
ctx_b.cue();
tasks.push(async move {
let text = ctx_b
.generate(Sampler::TopP { temperature: 0.6, p: 0.95 })
.max_tokens(aggregation_tokens)
.collect_text()
.await?;
Ok((text, ctx_b))
});
}
// Odd item out: context dropped, KV pages freed.
}
while let Some((text_a, ctx_a)) = iter.next() {
if let Some((_text_b, mut ctx_b)) = iter.next() {
ctx_a.destroy();
// ctx_b carries the shared prefix forward into the aggregation.
let prompt = format!("{}{}", AGGREGATE_PROMPT, text_a);
ctx_b.user(&prompt);
ctx_b.cue();
tasks.push(async move {
let text = ctx_b
.generate(Sampler::TopP { temperature: 0.6, p: 0.95 })
.max_tokens(aggregation_tokens)
.collect_text()
.await?;
Ok((text, ctx_b))
});
} else {
ctx_a.destroy();
}
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@inferlets/graph-of-thought/src/lib.rs` around lines 65 - 82, The aggregation
loop in lib.rs assumes contexts are freed immediately when dropped, but Context
relies on destroy(self) for deterministic KV-page release. Update the logic
around the while let Some((text_a, _ctx_a)) pattern so discarded contexts are
explicitly destroyed via Context::destroy(), including the unused ctx_a and any
odd trailing context, while keeping ctx_b alive only for the async task in the
aggregation flow.

Comment thread inferlets/graph-of-thought/src/lib.rs
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

2 participants