Add bounded async runner for long running ops#876
Conversation
43b7b11 to
6a139f7
Compare
📝 WalkthroughWalkthroughAdds an async runner with bounded concurrency and rewires image and snapshot reconciliation to submit child flattening and snapshot population asynchronously. New API states, runner wiring, app configuration, and volume-server mappings are added, along with dependency updates. ChangesAsync runner and controller integration
Dependency updates
Sequence Diagram(s)sequenceDiagram
participant App as volumeprovider app
participant Runner as async.Runner
participant Reconciler as ImageReconciler / SnapshotReconciler
participant Store as store
participant Ceph as Ceph RBD
App->>Runner: New(maxWorkers)
App->>Runner: Start(ctx)
Reconciler->>Runner: Submit(ctx, key, op)
Runner->>Store: accept work state
Runner->>Ceph: run flatten / populate operation
Ceph-->>Runner: completion
Runner->>Reconciler: HandleDone(evt)
Reconciler->>Store: requeue / persist state changes
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (2)
internal/async/runner.go (2)
137-139: 🧹 Nitpick | 🔵 Trivial | 💤 Low valueConsider logging a warning if active underflows.
The defensive check
if active > 0before decrementing prevents underflow, but silently ignores the case whereactiveis already zero when a done event arrives. Since each spawned operation should send exactly one done event,activereaching zero prematurely would indicate a bug in the runner logic.Adding a warning log when
active == 0would aid debugging without changing behavior:📊 Suggested observability improvement
case evt := <-r.doneCh: delete(inFlight, evt.Key) - if active > 0 { + if active > 0 { active-- + } else { + r.log.V(1).Info("Received done event with no active operations", "key", evt.Key) } r.notify(evt)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/async/runner.go` around lines 137 - 139, In the runner's event handling logic where the active counter is decremented, the current code silently skips the decrement when active is already zero. Add a warning log statement in an else clause or separate condition that triggers when active is zero to help detect premature done events, which could indicate a bug in the runner logic. This preserves the existing underflow prevention behavior while adding observability for debugging.
142-154: 🧹 Nitpick | 🔵 Trivial | 💤 Low valueConsider simplifying the switch statement to if-else for clarity.
The
switchstatement at line 144 uses an inline functionfunc() bool { _, ok := inFlight[req.key]; return ok }()to check if a key exists, which is unnecessarily complex compared to a straightforward if-else chain.♻️ Suggested refactor for readability
case req := <-r.submitCh: - switch { - case func() bool { _, ok := inFlight[req.key]; return ok }(): + if _, ok := inFlight[req.key]; ok { req.result <- ErrInProgress - case active >= r.maxWorkers: + } else if active >= r.maxWorkers { req.result <- ErrAtCapacity - default: + } else { inFlight[req.key] = struct{}{} active++ go r.run(ctx, req.key, req.op) req.result <- nil }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/async/runner.go` around lines 142 - 154, Replace the switch statement in the case handling for req from r.submitCh with a series of if-else statements to improve readability. Instead of using an inline function func() bool { _, ok := inFlight[req.key]; return ok }() to check if req.key exists in the inFlight map, use a simple if condition directly. Check for the inFlight[req.key] existence first, then check if active >= r.maxWorkers, and finally handle the default case. This will eliminate the unnecessary function wrapper and make the control flow clearer and more maintainable.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@internal/async/runner.go`:
- Around line 196-216: Add a godoc comment to the Submit method that documents
its context lifetime requirements. The comment should clarify that callers must
use a context that will be cancelled when the runner stops (or use a timeout),
and explain that passing a context with a different lifetime could cause
blocking due to the time-of-check-time-of-use race between the running flag
check and the submitCh send. This documentation will prevent accidental misuse
of the API by future developers.
In `@internal/controllers/snapshot_controller.go`:
- Around line 217-240: The defer statement for closeImage(log, img) on line 223
will execute when the function returns, but the image is explicitly closed on
line 235 when snapshot.Source.IronCoreImage is not empty, causing a
double-close. Replace the defer closeImage call with a closure that checks if
img is not nil before closing: defer func() { if img != nil { closeImage(log,
img) } }(), and after the explicit closeImage call on line 235, set img to nil
so the deferred function will skip the close when it eventually executes.
---
Nitpick comments:
In `@internal/async/runner.go`:
- Around line 137-139: In the runner's event handling logic where the active
counter is decremented, the current code silently skips the decrement when
active is already zero. Add a warning log statement in an else clause or
separate condition that triggers when active is zero to help detect premature
done events, which could indicate a bug in the runner logic. This preserves the
existing underflow prevention behavior while adding observability for debugging.
- Around line 142-154: Replace the switch statement in the case handling for req
from r.submitCh with a series of if-else statements to improve readability.
Instead of using an inline function func() bool { _, ok := inFlight[req.key];
return ok }() to check if req.key exists in the inFlight map, use a simple if
condition directly. Check for the inFlight[req.key] existence first, then check
if active >= r.maxWorkers, and finally handle the default case. This will
eliminate the unnecessary function wrapper and make the control flow clearer and
more maintainable.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: e30b12fc-02fd-4815-9be2-69924e5e3717
⛔ Files ignored due to path filters (1)
go.sumis excluded by!**/*.sum
📒 Files selected for processing (13)
api/image.goapi/snapshot.gocmd/volumeprovider/app/app.gogo.modinternal/async/runner.gointernal/controllers/common.gointernal/controllers/image_async.gointernal/controllers/image_controller.gointernal/controllers/snapshot_async.gointernal/controllers/snapshot_controller.gointernal/volumeserver/snapshot.gointernal/volumeserver/volume.gotests/integration/integration_suite_test.go
34d6aec to
35317f8
Compare
Signed-off-by: sujeet01 <phadtare.sujeet@gmail.com>
35317f8 to
5b0e78b
Compare
Proposed Changes
This PR moves long-running snapshot and image work (IronCore populate, flatten-on-delete, child-flatten) to a bounded async runner so reconcile workers are not blocked. A shared runner runs these long operations with capped concurrency (--async-max-workers, default 10), deduplicates by key, and requeues reconcilers when async work finishes.
Fixes #822
Ref #837 (comment)
Summary by CodeRabbit
Release Notes
New Features
--async-max-workers(default: 10) to control async concurrency.Chores
Tests