feat(executor): stream command stdout via chunk callback#320
feat(executor): stream command stdout via chunk callback#320Jisung Chae (jisung-02) wants to merge 19 commits into
Conversation
Wire a chunk callback through the executor pipeline so system shell
commands stream stdout/stderr to alpacon-server in real time.
- Add ChunkCallback on CommandArgs and CommandOptions
- chunkWriter emits chunks on newline boundaries or when the buffer
exceeds 4 KB (hybrid line + size strategy)
- ExecWithStreamingHook added to CommandExecutor and the mock
- ShellHandler forwards ChunkCallback through executeCommand and
executeWithOperators
- CommandRunner posts each chunk via scheduler.Rqueue to the new
/api/events/commands/{id}/chunk/ endpoint
When ChunkCallback is nil the existing CombinedOutput / Start+Wait
paths are unchanged.
chunkWriter no longer manages sequence numbers. Multiple chunkWriter instances spawned across shell operators in executeWithOperators previously each restarted seq from 0, which violated the unique (command, seq) constraint on the server. The runner callback now owns a closure-scoped counter so each command_id receives one monotonic seq series across all operator branches. - ChunkCallback signature simplified to func(content string) - chunkWriter: drop seq field - ExecWithStreamingHook and mock signatures updated to match - CommandRunner: closure captures `var seq int` and increments per chunk
- chunkWriter: newline emission, multi-line single write, buffer threshold, Flush, concurrent Writes - Executor.ExecWithStreamingHook: real /bin/sh -c invocation verifies chunks arrive in order and concatenate to the returned output - ShellHandler streaming: ChunkCallback forwarded through executeCommand and executeWithOperators for both single and operator-separated runs
Multiple Reporter goroutines share one priority queue; when fin and chunk both used priority 10 their pop order was undefined, letting fin race ahead of trailing chunk POSTs and produce empty CLI output on short commands. Raise the fin priority value so the queue drains chunks first; the server-side fix accepts late chunks as well, but enforcing pop order keeps the common path monotonic.
There was a problem hiding this comment.
Pull request overview
This PR adds real-time streaming of system command stdout/stderr from Alpamon to Alpacon by introducing a per-command chunk callback that posts output increments to a new /api/events/commands/{id}/chunk/ endpoint, while still returning/sending the full combined output on fin.
Changes:
- Added streaming execution support in the executor via a
chunkWriterand a newExecWithStreamingHookAPI that wires stdout/stderr to a chunk callback. - Plumbed
ChunkCallbackthrough runner → handler args → shell handler → executor, ensuring a runner-owned monotonicseqacross operator-split subcommands. - Added unit/integration tests for chunk emission behavior and shell/operator forwarding behavior.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/runner/command.go | Builds per-command chunk callback (seq ownership) and adjusts fin queue priority for chunk-first draining. |
| pkg/runner/client.go | Adds chunk endpoint URL constant. |
| pkg/executor/handlers/shell/shell.go | Forwards chunk callback and selects streaming vs legacy execution paths. |
| pkg/executor/handlers/shell/shell_streaming_test.go | Adds regression tests to ensure callback forwarding and operator-split execution behavior. |
| pkg/executor/handlers/common/args.go | Adds ChunkCallback to handler command args. |
| pkg/executor/handlers/common/interfaces.go | Extends CommandExecutor with ExecWithStreamingHook. |
| pkg/executor/handlers/common/testing.go | Updates mock executor to implement streaming hook and emit chunks. |
| pkg/executor/executor.go | Implements chunkWriter, adds ChunkCallback to options, and adds streaming execution path + ExecWithStreamingHook. |
| pkg/executor/executor_test.go | Adds integration-style test validating streamed chunks concatenate to final output. |
| pkg/executor/chunk_writer_test.go | Adds focused unit tests for newline/threshold/flush behavior and full-output collection. |
- Wrap PIDHook invocations with deferred recover so a bad hook can't crash the agent, matching the CommandOptions.PIDHook contract. - Reword fin priority comment: priority 11 is lower than chunks' 10, so trailing chunks drain before fin. - Tighten runCommand and PIDHook docstrings.
Split partial lines into chunkSizeThreshold-sized pieces in chunkWriter so a single very long no-newline line cannot produce an arbitrarily large payload. Sub-threshold tail stays buffered. Wrap every ChunkCallback invocation (Write + Flush) in a nil-guarded helper that recovers panics, matching the PIDHook contract so a faulty callback cannot crash the agent mid-stream or during teardown.
Move to a chunks-only streaming contract: chunkWriter no longer retains the full body, so fin no longer double-ships output and multi-GB streams cannot OOM the agent. The newline path now also splits payloads >4 KB (buffered tail + next Write could exceed the cap). Surface cmd.Start and demote failures so the fin/stream carry diagnostics instead of an empty result.
…cate helpers Funnel the shell handler through a single ExecWithStreamingHook call, merge runCommand's streaming and pidHook branches, and reuse chunkWriter for the demote-failure path so the third nil-guard helper goes away. Move the chunk payload to a typed protocol.CommandChunk to cut per-chunk allocations and give the wire shape one definition.
…output # Conflicts: # pkg/executor/executor.go # pkg/executor/executor_test.go
The streaming privilege-demotion failure path returned an empty result, so the fin payload omitted the error text and it was only delivered as a chunk. Return the diagnostic in result (keeping the in-band emit) so fin still carries it when chunk delivery fails, consistent with the streaming timeout path. Addresses Copilot review on PR #320.
Trim multi-line rationale comments while preserving intent; no logic changes.
Eunyoung Jeong (eunyoung14)
left a comment
There was a problem hiding this comment.
Review — output delivery reliability & resource usage
The chunkWriter mechanics are solid: memory stays bounded (the strings.Clone avoids pinning a large backing array, and TestChunkWriter_LargeStreamDoesNotRetainBody locks that in), the caller-owned seq counter keeps a single monotonic series across shell operators (&&/||/;), and the callback panic recovery is in place. A few things to consider before merge, all scoped to alpamon itself.
🟠 One POST per line — unbatched, on a shared bounded queue
chunkWriter emits a chunk on every newline; the 4 KB threshold only caps a chunk, it does not batch small lines. So a high-volume command (yes, seq 1 10000000, cat large.log) produces roughly one HTTP POST per output line.
Those POSTs go through the shared scheduler.Rqueue (MaxQueueSize = 36000, drained by 4 reporters). When the queue is full, request() logs-and-drops the entry with no retry. Consequences:
- A single chatty command can saturate the queue that is shared with all other agent telemetry (acks, fins, command results, events) → unrelated reports get dropped agent-wide.
chunkWriter.Writealways returns immediately andPostnever blocks, so the child process gets no backpressure — alpamon converts the firehose into unbounded queue churn.
Suggestion: add a batching window to chunkWriter (coalesce up to N lines / 4 KB / ~50–100 ms into one chunk), and optionally apply real backpressure when the queue is near capacity so the child is throttled rather than silently dropped.
🟠 No retained copy when chunk delivery fails
Under streaming, a successful command's fin payload carries an empty result (runCommand returns nil bytes on the cw != nil path) and the output is delivered solely via best-effort chunk POSTs. Nothing retains the output, so a dropped or retry-exhausted chunk POST (see the queue concern above) means that output is permanently unrecoverable — there is no fallback.
Suggestion: tee a bounded buffer (cap, e.g. 1 MiB, truncate-middle) into the fin payload so a complete-up-to-cap copy is delivered reliably in a single request, independent of per-chunk delivery. This keeps the memory bound this PR introduced while removing the "all-or-nothing per chunk" fragility.
Notes (non-blocking)
command.go:56-57honestly documents that the priority bump (10→11) is a best-effort hint, not an ordering guarantee — with 4 concurrent reporters a chunk can still land after fin. That's fine as long as the receiver tolerates post-fin, out-of-order arrival keyed by(command_id, seq).- stdout and stderr are merged into one stream (parity with the old
CombinedOutput), so the receiver can't distinguish them — worth noting as a known characteristic. - Test coverage for the streaming path is good; a test exercising queue-pressure / drop-on-full behavior would be a nice addition given the concern above.
Coalesce stdout/stderr into chunks emitted on a 4KB threshold or a periodic flush tick instead of one POST per output line, and throttle the producing command (dropping past ctx/maxWait) when the shared queue nears capacity so a chatty command can't starve other telemetry.
Under streaming the fin payload carried an empty result, so output lived only in best-effort chunk POSTs and was permanently lost if a chunk dropped. Tee a bounded (1 MiB, truncate-middle) copy into the returned output so fin reliably carries the command's start and end for audit, while memory stays bounded.
…ance streaming result accumulation
Addressed review feedbackThanks for the review — summary of the changes made since then (all on this branch). 🟠 One POST per line / no backpressure — fixed (
|
Summary
/api/events/commands/{id}/chunk/endpoint, replacing the previous fin-only post-completion model.chunkWriteremits chunks on newline boundaries or when the in-memory buffer crosses 4 KB, whichever comes first; partial lines are carried over between writes and flushed at completion.CommandRunnerowns the monotonicseqcounter so multiplechunkWriterinstances spawned across shell operators (&&,||,;) inexecuteWithOperatorsshare one series percommand_id, preventing the unique(command, seq)collision that the initial implementation had.Changes
pkg/executor/executor.go: newchunkWriter(syncio.Writer),ChunkCallbackonCommandOptions,ExecWithStreamingHookentry point.runCommandroutes throughchunkWriterwhen streaming is requested and falls back to the existingCombinedOutput/Start+Waitpaths otherwise.pkg/executor/handlers/common:CommandExecutor.ExecWithStreamingHookinterface,ChunkCallbackfield onCommandArgs, mock implementation.pkg/executor/handlers/shell/shell.go: forwardsChunkCallbackthroughexecuteCommandandexecuteWithOperators, choosing the streaming or non-streaming executor entry point per call.pkg/runner/client.go: neweventCommandChunkURLconstant.pkg/runner/command.go: builds the per-command chunk callback closure (ownsseq), posts each chunk to the chunk URL, and lowers fin priority to 11.Test plan
go build ./pkg/executor/... ./pkg/runner/...go vet ./pkg/executor/... ./pkg/runner/...go test ./pkg/executor/... -count=1 -p 1(all packages pass)chunkWriternewline emission, multi-line single write, 4 KB threshold trigger, partial line carry-over,Flushremainder,Bytes()full output,Writelength return/bin/sh -cinvocation throughExecWithStreamingHookverifies chunk ordering and concatenation equals the returned combined output&&/||/;with a caller-owned monotonic counter (locks in the seq-collision fix), nil callback falls back to legacy pathgo run ./cmd/alpamonboots cleanly against the existing config (connection refused only because no local alpacon-server is running, which is unrelated to this change)Notes
/api/events/commands/{id}/chunk/endpoint being deployed. Under streaming, command stdout/stderr is delivered solely via chunk POSTs—the fin payload no longer carries the full combined output, only short diagnostics (e.g. privilege-demotion errors and the timeout banner). This keeps the executor from retaining unbounded command output in memory.