feat(mirror): replay captured traffic live to a local process via --to#5793
feat(mirror): replay captured traffic live to a local process via --to#5793devantler wants to merge 1 commit into
Conversation
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
📝 WalkthroughWalkthroughThis PR adds a Sequence Diagram(s)sequenceDiagram
participant User
participant MirrorCmd
participant CaptureSession
participant ReplayCapture
participant LocalProcess
User->>MirrorCmd: ksail workload mirror <deployment> --to localhost:8080
MirrorCmd->>MirrorCmd: validate --to (net.SplitHostPort)
MirrorCmd->>CaptureSession: start capture (writes to MultiWriter)
CaptureSession->>ReplayCapture: stream pcap bytes via io.Pipe
ReplayCapture->>ReplayCapture: reassemble TCP payloads per flow
ReplayCapture->>LocalProcess: replay ordered payload bytes
LocalProcess-->>ReplayCapture: response bytes (discarded)
CaptureSession-->>MirrorCmd: capture result / error
ReplayCapture-->>MirrorCmd: replay result / error
MirrorCmd-->>User: combined outcome (capture error takes priority)
Possibly related issues
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
Comment |
✅MegaLinter analysis: Success✅ Linters with no issuesactionlint, bash-exec, git_diff, hadolint, jscpd, jsonlint, lychee, markdown-table-formatter, markdownlint, prettier, prettier, shellcheck, shfmt, stylelint, syft, trivy-sbom, trufflehog, v8r, v8r, yamllint Notices📣 MegaLinter 9.5.0 is out! Discover the new features and security recommendations in the release announcement. (Skip this info by defining See detailed reports in MegaLinter artifacts
|
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pkg/cli/cmd/workload/mirror.go (1)
163-176: 🩺 Stability & Availability | 🔵 Trivial | ⚡ Quick winOptional: preflight
--toreachability before injecting the (irremovable) tap.Validation here only checks
host:portsyntax; actual reachability is only checked oncerunCaptureWithReplaystarts, which happens afterresolveTapPoint/ensureTaphave already injected the ephemeral tap container. Per this file's own docs, ephemeral containers can't be removed — a typo'd--toaddress still leaves a permanent tap on the pod even though the session aborts immediately. Preflighting reachability before tap injection would let a bad--tofail without that side effect.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/cli/cmd/workload/mirror.go` around lines 163 - 176, Add a preflight reachability check for opts.replayTo in cmd.RunE before calling runMirrorCommand, so a bad --to fails before resolveTapPoint/ensureTap injects the ephemeral tap. Keep the existing host:port syntax validation, but also attempt an early connection/probe using the replay target and return ErrInvalidMirrorTo if it is unreachable. Reference cmd.RunE, runMirrorCommand, and the opts.replayTo validation block when updating the flow.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@pkg/cli/cmd/workload/mirror.go`:
- Around line 307-380: The replay tee in runCaptureWithReplay is coupled to the
primary capture write path through io.MultiWriter and io.Pipe, so a slow or
blocked --to target can stall pcap capture. Refactor runCaptureWithReplay so the
write to out is never blocked by the replay leg; keep capture output direct and
feed replay from a separate best-effort/buffered path with bounded buffering or
drop-on-full behavior. Use the existing runCaptureSession, runReplaySession, and
pipeWriter/pipeReader flow as the main location to decouple.
In `@pkg/svc/mirror/replay.go`:
- Around line 122-140: The tcpReplayer.flows map is never cleaned up, so
replayFlow entries and their pending buffers accumulate indefinitely during long
captures. Update tcpReplayer.flowFor and the close/error paths in accept to
remove a flow from flows when it ends via FIN, RST, or a write failure, so
terminated 4-tuples do not remain cached. Make sure pruning also clears the
stale failed state so a reused 4-tuple can be treated as a new connection
instead of being silently dropped.
---
Outside diff comments:
In `@pkg/cli/cmd/workload/mirror.go`:
- Around line 163-176: Add a preflight reachability check for opts.replayTo in
cmd.RunE before calling runMirrorCommand, so a bad --to fails before
resolveTapPoint/ensureTap injects the ephemeral tap. Keep the existing host:port
syntax validation, but also attempt an early connection/probe using the replay
target and return ErrInvalidMirrorTo if it is unreachable. Reference cmd.RunE,
runMirrorCommand, and the opts.replayTo validation block when updating the flow.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro Plus
Run ID: dea4d255-a7c6-4d82-a20e-72c774de9c6b
⛔ Files ignored due to path filters (1)
pkg/cli/cmd/workload/__snapshots__/workload_test.snapis excluded by!**/*.snap
📒 Files selected for processing (5)
docs/src/content/docs/cli-flags/workload/workload-mirror.mdxpkg/cli/cmd/workload/mirror.gopkg/svc/chat/docs_generated.gopkg/svc/mirror/replay.gopkg/svc/mirror/replay_test.go
📜 Review details
⏰ Context from checks skipped due to timeout. (12)
- GitHub Check: 🏠 Home Isolation Guard
- GitHub Check: 🏗️ Build KSail Binary
- GitHub Check: 🧪 Test
- GitHub Check: 📊 Code Coverage
- GitHub Check: 🧹 Lint - golangci-lint
- GitHub Check: 🔍 Dead Code Analysis
- GitHub Check: 🧹 Lint - mega-linter
- GitHub Check: 📦 Tidy
- GitHub Check: 🛡️ Vulnerability Scan
- GitHub Check: 🏗️ Build
- GitHub Check: Analyze (go)
- GitHub Check: Analyze (go)
🧰 Additional context used
📓 Path-based instructions (1)
docs/src/content/docs/cli-flags/**/*.mdx
📄 CodeRabbit inference engine (AGENTS.md)
Keep the generated CLI reference under
docs/src/content/docs/cli-flags/in sync with the command tree; do not edit generated CLI flag docs by hand.
Files:
docs/src/content/docs/cli-flags/workload/workload-mirror.mdx
🪛 ast-grep (0.44.0)
pkg/svc/mirror/replay.go
[warning] 134-134: Narrowing a non-constant integer to a smaller fixed-width type (int8/int16/int32, uint8/uint16/uint32) can silently overflow or wrap, yielding negative or truncated values that are dangerous in size, length, or index logic. Validate the source value is within the target type's range before converting (e.g. bounds-check, or use a checked helper), and avoid narrowing untrusted or len()/parsed values.
Context: uint16(port)
Note: [CWE-190] Integer Overflow or Wraparound.
(integer-overflow-narrowing-conversion-go)
[warning] 176-176: Narrowing a non-constant integer to a smaller fixed-width type (int8/int16/int32, uint8/uint16/uint32) can silently overflow or wrap, yielding negative or truncated values that are dangerous in size, length, or index logic. Validate the source value is within the target type's range before converting (e.g. bounds-check, or use a checked helper), and avoid narrowing untrusted or len()/parsed values.
Context: uint16(tcpLayer.DstPort)
Note: [CWE-190] Integer Overflow or Wraparound.
(integer-overflow-narrowing-conversion-go)
[warning] 326-326: Narrowing a non-constant integer to a smaller fixed-width type (int8/int16/int32, uint8/uint16/uint32) can silently overflow or wrap, yielding negative or truncated values that are dangerous in size, length, or index logic. Validate the source value is within the target type's range before converting (e.g. bounds-check, or use a checked helper), and avoid narrowing untrusted or len()/parsed values.
Context: uint32(len(payload))
Note: [CWE-190] Integer Overflow or Wraparound.
(integer-overflow-narrowing-conversion-go)
🔇 Additional comments (5)
pkg/svc/mirror/replay_test.go (1)
70-86: 📐 Maintainability & Code QualityNo change needed here.
go.modtargets Go 1.26.4, sosync.WaitGroup.Gois supported.pkg/svc/mirror/replay.go (1)
143-164: 🩺 Stability & AvailabilityContext cancellation still needs an unblock path.
runCaptureWithReplayclosespipeWriter, but only afterrunCaptureSessionreturns. If the live pcap reader can sit idle,ReadPacketDatamay still block until the capture loop closes the pipe on cancel.docs/src/content/docs/cli-flags/workload/workload-mirror.mdx (1)
20-24: LGTM!Also applies to: 41-43, 52-52
pkg/cli/cmd/workload/mirror.go (2)
5-9: LGTM!Also applies to: 28-30, 59-63, 74-77, 107-123, 152-153, 268-305
336-379: 🎯 Functional CorrectnessDrop this concern: capture cancellation already returns nil. RunCaptureSession treats
context.Canceledandcontext.DeadlineExceededas a clean stop, so a replay-triggeredcancelSession()won’t hide a real replay failure.> Likely an incorrect or invalid review comment.
| // runMirrorSession runs the blocking capture session, plain when --to is not | ||
| // given, else with the live replay sink teed off the pcap stream. | ||
| func runMirrorSession( | ||
| cmd *cobra.Command, | ||
| client kubernetes.Interface, | ||
| restConfig *rest.Config, | ||
| point *mirror.TapPoint, | ||
| opts mirrorOptions, | ||
| out io.Writer, | ||
| ) error { | ||
| if opts.replayTo == "" { | ||
| return runCaptureSession(cmd.Context(), client, restConfig, point, opts.port, out) | ||
| } | ||
|
|
||
| return runCaptureWithReplay(cmd, client, restConfig, point, opts, out) | ||
| } | ||
|
|
||
| // runCaptureWithReplay tees the live pcap stream into a replay sink that | ||
| // delivers the mirrored inbound TCP payloads to --to while the capture runs. | ||
| // A replay failure cancels the capture (reported as the session error); a | ||
| // capture failure wins over a secondary replay error. | ||
| func runCaptureWithReplay( | ||
| cmd *cobra.Command, | ||
| client kubernetes.Interface, | ||
| restConfig *rest.Config, | ||
| point *mirror.TapPoint, | ||
| opts mirrorOptions, | ||
| out io.Writer, | ||
| ) error { | ||
| sessionCtx, cancelSession := context.WithCancel(cmd.Context()) | ||
| defer cancelSession() | ||
|
|
||
| notify.WriteMessage(notify.Message{ | ||
| Type: notify.ActivityType, | ||
| Content: "replaying mirrored inbound TCP payloads to %s", | ||
| Args: []any{opts.replayTo}, | ||
| Writer: cmd.ErrOrStderr(), | ||
| }) | ||
|
|
||
| pipeReader, pipeWriter := io.Pipe() | ||
| replayErrCh := make(chan error, 1) | ||
|
|
||
| go func() { | ||
| replayErr := runReplaySession(sessionCtx, pipeReader, opts.port, opts.replayTo) | ||
| if replayErr != nil { | ||
| cancelSession() | ||
| } | ||
|
|
||
| // Keep draining so capture writes into the tee never block after the | ||
| // replay side stopped early. | ||
| _, _ = io.Copy(io.Discard, pipeReader) | ||
|
|
||
| replayErrCh <- replayErr | ||
| }() | ||
|
|
||
| captureErr := runCaptureSession( | ||
| sessionCtx, client, restConfig, point, opts.port, io.MultiWriter(out, pipeWriter), | ||
| ) | ||
|
|
||
| _ = pipeWriter.Close() | ||
|
|
||
| replayErr := <-replayErrCh | ||
|
|
||
| if captureErr != nil { | ||
| return captureErr | ||
| } | ||
|
|
||
| if replayErr != nil { | ||
| return fmt.Errorf("replay session: %w", replayErr) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
There was a problem hiding this comment.
🩺 Stability & Availability | 🔴 Critical | 🏗️ Heavy lift
Slow/unresponsive --to target can stall the primary pcap capture too.
io.Pipe() is synchronous and unbuffered — each pipeWriter.Write() blocks until a matching Read() fully consumes the data, with no internal buffering. In runCaptureWithReplay, capture writes go to io.MultiWriter(out, pipeWriter) (Line 363), so if the replay side (runReplaySession → tcpReplayer.run) stalls mid-session — e.g. blocked in net.Dialer.DialContext for a new flow, or blocked in an unbounded conn.Write() to a local process that isn't reading (a well-known Go footgun since net.Conn.Write has no default deadline) — the shared pipe write blocks, and so does the write to out, the pcap file.
The comment at Line 355-357 ("drain so capture writes into the tee never block") only covers the case where replay has already returned; it does nothing while replay is actively but slowly consuming. This directly contradicts the PR's requirement that --to be additive and that pcap file output/summaries remain unaffected — a stuck local replay target can silently stall or corrupt the timing of the primary capture.
Consider decoupling the two paths so a slow replay consumer can never block the primary write path, e.g. an internal bounded buffer/channel with drop-on-full semantics for the replay leg, or writing to out directly while feeding the pipe from a separate best-effort goroutine.
💡 Sketch: decouple replay tee from the primary capture write path
- pipeReader, pipeWriter := io.Pipe()
- replayErrCh := make(chan error, 1)
-
- go func() {
- replayErr := runReplaySession(sessionCtx, pipeReader, opts.port, opts.replayTo)
- if replayErr != nil {
- cancelSession()
- }
-
- // Keep draining so capture writes into the tee never block after the
- // replay side stopped early.
- _, _ = io.Copy(io.Discard, pipeReader)
-
- replayErrCh <- replayErr
- }()
-
- captureErr := runCaptureSession(
- sessionCtx, client, restConfig, point, opts.port, io.MultiWriter(out, pipeWriter),
- )
+ // bufferedTee never blocks the caller: it drops replay bytes instead of
+ // stalling the primary capture write path when the replay side is slow.
+ tee := newNonBlockingTee(sessionCtx)
+ replayErrCh := make(chan error, 1)
+
+ go func() {
+ replayErr := runReplaySession(sessionCtx, tee.reader(), opts.port, opts.replayTo)
+ if replayErr != nil {
+ cancelSession()
+ }
+ tee.drain()
+ replayErrCh <- replayErr
+ }()
+
+ captureErr := runCaptureSession(
+ sessionCtx, client, restConfig, point, opts.port, io.MultiWriter(out, tee),
+ )🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@pkg/cli/cmd/workload/mirror.go` around lines 307 - 380, The replay tee in
runCaptureWithReplay is coupled to the primary capture write path through
io.MultiWriter and io.Pipe, so a slow or blocked --to target can stall pcap
capture. Refactor runCaptureWithReplay so the write to out is never blocked by
the replay leg; keep capture output direct and feed replay from a separate
best-effort/buffered path with bounded buffering or drop-on-full behavior. Use
the existing runCaptureSession, runReplaySession, and pipeWriter/pipeReader flow
as the main location to decouple.
| // tcpReplayer reassembles the capture's inbound TCP flows and replays their | ||
| // payload bytes, in sequence order, to the configured local address. | ||
| type tcpReplayer struct { | ||
| port uint16 | ||
| localAddr string | ||
| dialer net.Dialer | ||
| flows map[string]*replayFlow | ||
| } | ||
|
|
||
| // newTCPReplayer builds a replayer for the given (pre-validated) service port | ||
| // and local destination. | ||
| func newTCPReplayer(port int, localAddr string) *tcpReplayer { | ||
| return &tcpReplayer{ | ||
| port: uint16(port), //nolint:gosec // G115: bounds-checked by validateReplayInputs. | ||
| localAddr: localAddr, | ||
| dialer: net.Dialer{}, | ||
| flows: map[string]*replayFlow{}, | ||
| } | ||
| } |
There was a problem hiding this comment.
🚀 Performance & Scalability | 🟠 Major | ⚡ Quick win
Flows map is never pruned — unbounded memory growth on a long/busy session.
flows retains an entry (plus its pending buffer, up to maxPendingSegments) for every client 4-tuple ever seen, and entries are kept even after RST/FIN closes the flow. For a live replay that runs for the duration of a busy capture, this grows without bound. Consider deleting the flow from the map when it terminates (FIN/RST or write failure) so long sessions stay bounded.
Note the side effect: a reused 4-tuple currently resolves to the stale failed flow in flowFor (Line 196) and is silently dropped by accept (Line 267); pruning on close would also let a genuinely new connection on the same tuple be replayed.
Also applies to: 195-228
🧰 Tools
🪛 ast-grep (0.44.0)
[warning] 134-134: Narrowing a non-constant integer to a smaller fixed-width type (int8/int16/int32, uint8/uint16/uint32) can silently overflow or wrap, yielding negative or truncated values that are dangerous in size, length, or index logic. Validate the source value is within the target type's range before converting (e.g. bounds-check, or use a checked helper), and avoid narrowing untrusted or len()/parsed values.
Context: uint16(port)
Note: [CWE-190] Integer Overflow or Wraparound.
(integer-overflow-narrowing-conversion-go)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@pkg/svc/mirror/replay.go` around lines 122 - 140, The tcpReplayer.flows map
is never cleaned up, so replayFlow entries and their pending buffers accumulate
indefinitely during long captures. Update tcpReplayer.flowFor and the
close/error paths in accept to remove a flow from flows when it ends via FIN,
RST, or a write failure, so terminated 4-tuples do not remain cached. Make sure
pruning also clears the stale failed state so a reused 4-tuple can be treated as
a new connection instead of being silently dropped.
Closing as superseded by #5794 — two concurrent scheduled sessions implemented #5791 near-simultaneously (this one first, 19:40Z vs 20:06Z). Keeping #5794 because it is the more complete variant: same LiveReplay/sequencer design, plus the command-layer tests (--to happy path, invalid-target, dial-failure surfacing via an ExportSetNewLiveReplay seam) and the regenerated pkg/toolgen tool-surface snapshot this branch is red on (its 2 failing checks). One green PR beats two partial ones on the maintainer's dashboard. |

Why
ksail workload mirrorcaptures a Deployment's inbound traffic to a pcap file, but mirror mode's headline promise — your locally-running service actually receives that traffic — was still unfulfilled: the capture was forensics only.What
Adds a
--to host:portflag that live-replays the mirrored inbound TCP payloads to a local process while the capture runs, one local connection per mirrored client connection, delivered in order. Replay is strictly one-way (responses are discarded; nothing flows back into the cluster), and file output + summary keep working unchanged.Fixes #5791. Part of #4521.
Notes
gopacket/reassemblyas the smallest correct option (the issue left this to the PR).