refactor: move local subworkflows behind runner#2247
Conversation
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (5)
✅ Files skipped from review due to trivial changes (1)
🚧 Files skipped from review as they are similar to previous changes (2)
📝 WalkthroughWalkthroughThis PR introduces a router-based dispatch pattern for child workflow execution, consolidating distributed and local process handling through the ChangesSubworkflow Dispatch and Execution
🎯 4 (Complex) | ⏱️ ~45 minutes Possibly Related PRs
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/runtime/executor/dag_runner.go (1)
319-379:⚠️ Potential issue | 🟠 Major | ⚡ Quick winClose
killedbefore snapshottingactiveRuns.
StopsnapshotsactiveRunsand only closese.killedafterward. A concurrentExecute/Retrycan register a new run after that snapshot, passcancellationErr, and reachRunwithout ever being canceled by this stop request. Gate new dispatches first, then collect/cancel the active runs.Suggested fix
func (e *SubDAGExecutor) Stop(intent cmdutil.TerminationIntent) error { + e.cancelOnce.Do(func() { + close(e.killed) + }) + type activeRun struct { runID string cancel context.CancelFunc } e.mu.Lock() activeRuns := make([]activeRun, 0, len(e.activeRuns)) for runID, cancel := range e.activeRuns { activeRuns = append(activeRuns, activeRun{ runID: runID, cancel: cancel, }) } e.mu.Unlock() @@ - e.cancelOnce.Do(func() { - close(e.killed) - }) - return errors.Join(errs...) }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/runtime/executor/dag_runner.go` around lines 319 - 379, Modify the stop flow to gate new dispatches by closing e.killed (using e.cancelOnce.Do to close the channel) before snapshotting e.activeRuns so no new runs can register after the stop begins; specifically, call e.cancelOnce.Do(func(){ close(e.killed) }) at the start of the method, then lock e.mu and copy e.activeRuns into the local activeRuns slice, unlock, and proceed to iterate and cancel each run (calling subWorkflowRunner.Cancel, dagCtx.DB.RequestChildCancel, and run.cancel as currently implemented), collecting errs and returning errors.Join(errs...).
🧹 Nitpick comments (1)
internal/runtime/executor/subworkflow.go (1)
46-49: 💤 Low valueAdd doc comments for
SubWorkflowCancelModeGraceful/SubWorkflowCancelModeForce(consistency).These constants are exported but lack doc comments; revive’s
exportedrule isn’t enabled in the current.golangci.ymlconfiguration, so this likely won’t fail CI.♻️ Proposed doc comments
const ( + // SubWorkflowCancelModeGraceful requests a graceful stop of the child workflow. SubWorkflowCancelModeGraceful SubWorkflowCancelMode = "graceful" + // SubWorkflowCancelModeForce requests a forced stop of the child workflow. SubWorkflowCancelModeForce SubWorkflowCancelMode = "force" )🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/runtime/executor/subworkflow.go` around lines 46 - 49, Add proper doc comments for the exported constants SubWorkflowCancelModeGraceful and SubWorkflowCancelModeForce: place a sentence starting with the constant name that describes what the mode means/does (e.g., graceful cancels allowing cleanup, force cancels immediately), matching the style of other exported docs in the package and ensuring both constants have GoDoc comments for consistency.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@internal/subflow/local_cli.go`:
- Around line 132-142: validateLocalRequest currently doesn't check that
req.DAG.Location is set, causing downstream malformed CLI target errors; update
validateLocalRequest (which takes executor.SubWorkflowRequest) to return a clear
error when req.DAG is non-nil but req.DAG.Location is empty (e.g., treat it like
a required field similar to RunID/RootDAGRun checks), adding a new error
constant or reusing an appropriate existing one and returning it from
validateLocalRequest so callers fail fast with a descriptive message.
- Around line 285-290: materializeLocalWorkspace dereferences
req.Workspace.Descriptor without checking for nil which can panic; before
calling workspacebundle.Extract and before building target with
filepath.FromSlash(req.Workspace.Descriptor.DAGPath) add a nil-check for
req.Workspace and req.Workspace.Descriptor and return a descriptive error
(including req.RunID) if nil (e.g., "missing workspace descriptor for run %q"),
then proceed to call workspacebundle.Extract and construct target only when
Descriptor is non-nil.
- Around line 289-290: The target path creation using filepath.Join(dest,
filepath.FromSlash(req.Workspace.Descriptor.DAGPath)) can be escaped by absolute
paths or `..` segments in req.Workspace.Descriptor.DAGPath; sanitize and
validate the DAGPath before joining: normalize with filepath.FromSlash and
filepath.Clean, reject or strip any leading path separators or absolute paths
and reject any path that is "." or starts with "..", then join with dest and
verify the result is within dest (use filepath.Rel to ensure the relative path
does not begin with ".."); if validation fails return an error instead of
returning a target outside dest.
---
Outside diff comments:
In `@internal/runtime/executor/dag_runner.go`:
- Around line 319-379: Modify the stop flow to gate new dispatches by closing
e.killed (using e.cancelOnce.Do to close the channel) before snapshotting
e.activeRuns so no new runs can register after the stop begins; specifically,
call e.cancelOnce.Do(func(){ close(e.killed) }) at the start of the method, then
lock e.mu and copy e.activeRuns into the local activeRuns slice, unlock, and
proceed to iterate and cancel each run (calling subWorkflowRunner.Cancel,
dagCtx.DB.RequestChildCancel, and run.cancel as currently implemented),
collecting errs and returning errors.Join(errs...).
---
Nitpick comments:
In `@internal/runtime/executor/subworkflow.go`:
- Around line 46-49: Add proper doc comments for the exported constants
SubWorkflowCancelModeGraceful and SubWorkflowCancelModeForce: place a sentence
starting with the constant name that describes what the mode means/does (e.g.,
graceful cancels allowing cleanup, force cancels immediately), matching the
style of other exported docs in the package and ensuring both constants have
GoDoc comments for consistency.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 4841d24c-f3c8-4a5a-a3a0-c684265f9115
📒 Files selected for processing (12)
internal/cmd/context.gointernal/engine/engine.gointernal/runtime/agent/agent.gointernal/runtime/executor/dag_runner.gointernal/runtime/executor/dag_runner_test.gointernal/runtime/executor/subworkflow.gointernal/service/worker/remote_handler.gointernal/subflow/local_cli.gointernal/subflow/router.gointernal/subflow/router_test.gointernal/subflow/runner.gointernal/test/helper.go
| if err := workspacebundle.Extract(req.Workspace.Archive, dest, req.Workspace.Descriptor, workspacebundle.DefaultLimits()); err != nil { | ||
| cleanup() | ||
| return "", "", nil, fmt.Errorf("materialize action workspace for run %q: %w", req.RunID, err) | ||
| } | ||
| target := filepath.Join(dest, filepath.FromSlash(req.Workspace.Descriptor.DAGPath)) | ||
| return dest, target, cleanup, nil |
There was a problem hiding this comment.
Guard against nil Workspace.Descriptor before dereference.
materializeLocalWorkspace dereferences req.Workspace.Descriptor without a nil check, which can panic on malformed requests.
Suggested fix
func validateLocalRequest(req executor.SubWorkflowRequest) error {
if req.DAG == nil {
return errMissingChildDAG
}
if req.RunID == "" {
return errRunIDNotSet
}
if req.RootDAGRun.Zero() {
return errRootRunNotSet
}
+ if req.Workspace != nil && req.Workspace.Descriptor == nil {
+ return fmt.Errorf("workspace descriptor is required")
+ }
return nil
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if err := workspacebundle.Extract(req.Workspace.Archive, dest, req.Workspace.Descriptor, workspacebundle.DefaultLimits()); err != nil { | |
| cleanup() | |
| return "", "", nil, fmt.Errorf("materialize action workspace for run %q: %w", req.RunID, err) | |
| } | |
| target := filepath.Join(dest, filepath.FromSlash(req.Workspace.Descriptor.DAGPath)) | |
| return dest, target, cleanup, nil | |
| func validateLocalRequest(req executor.SubWorkflowRequest) error { | |
| if req.DAG == nil { | |
| return errMissingChildDAG | |
| } | |
| if req.RunID == "" { | |
| return errRunIDNotSet | |
| } | |
| if req.RootDAGRun.Zero() { | |
| return errRootRunNotSet | |
| } | |
| if req.Workspace != nil && req.Workspace.Descriptor == nil { | |
| return fmt.Errorf("workspace descriptor is required") | |
| } | |
| return nil | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@internal/subflow/local_cli.go` around lines 285 - 290,
materializeLocalWorkspace dereferences req.Workspace.Descriptor without checking
for nil which can panic; before calling workspacebundle.Extract and before
building target with filepath.FromSlash(req.Workspace.Descriptor.DAGPath) add a
nil-check for req.Workspace and req.Workspace.Descriptor and return a
descriptive error (including req.RunID) if nil (e.g., "missing workspace
descriptor for run %q"), then proceed to call workspacebundle.Extract and
construct target only when Descriptor is non-nil.
There was a problem hiding this comment.
1 issue found across 12 files
You’re at about 99% of the monthly reviewed-line limit. You may want to disable incremental reviews to conserve quota. Reviews will continue until that limit is exceeded. If you need help avoiding interruptions, please contact contact@cubic.dev.
Reply with feedback, questions, or to request a fix.
Fix all with cubic | Re-trigger cubic
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
Summary
Testing
Summary by cubic
Moved local sub-workflow execution behind a router and
LocalCLIrunner, removing process management from the executor. Added strict request and workspace validation with cleaner cancellation mapping for safer child runs.Refactors
internal/subflowrouter that selects the first runner that accepts a request.subflow.NewLocalCLI();internal/runtime/executorno longer spawns/tracks OS processes.LocalCLI.subflow.NewRouter(subflow.New(...), subflow.NewLocalCLI())across context, engine, worker, and tests.Bug Fixes
LocalCLIwith validation for DAG location and workspace DAG path (normalized and bounds-checked to prevent traversal).Written for commit e1a2b56. Summary will update on new commits.
Summary by CodeRabbit