From a366e1f8eb5b2b14974592adedd13fe95350f502 Mon Sep 17 00:00:00 2001 From: Ruslan Gorbunov Date: Fri, 22 May 2026 15:04:24 +0300 Subject: [PATCH 1/8] fix: add ProcessRegistry to prevent zombie reaper race Signed-off-by: Ruslan Gorbunov --- pkg/executor/executor.go | 96 +++++++++++++++++++++- pkg/executor/executor_test.go | 146 ++++++++++++++++++++++++++++++++++ 2 files changed, 239 insertions(+), 3 deletions(-) diff --git a/pkg/executor/executor.go b/pkg/executor/executor.go index 6e723066..0feb4b8a 100644 --- a/pkg/executor/executor.go +++ b/pkg/executor/executor.go @@ -4,11 +4,13 @@ import ( "bufio" "bytes" "context" + "errors" "fmt" "io" "log/slog" "os/exec" "strings" + "sync" "syscall" "time" @@ -24,12 +26,64 @@ const ( serviceName = "executor" ) +// ProcessRegistry tracks PIDs of processes started by the executor so that +// a PID-1 zombie reaper can skip them (their parent already calls wait). +// This prevents the reaper from stealing a child that cmd.Wait expects to reap. +type ProcessRegistry struct { + mu sync.RWMutex + activePIDs map[int32]struct{} +} + +// NewProcessRegistry creates a new ProcessRegistry. +func NewProcessRegistry() *ProcessRegistry { + return &ProcessRegistry{ + activePIDs: make(map[int32]struct{}), + } +} + +// Register adds pid to the set of active PIDs. +func (r *ProcessRegistry) Register(pid int) { + r.mu.Lock() + r.activePIDs[int32(pid)] = struct{}{} + r.mu.Unlock() +} + +// Unregister removes pid from the set of active PIDs. +func (r *ProcessRegistry) Unregister(pid int) { + r.mu.Lock() + delete(r.activePIDs, int32(pid)) + r.mu.Unlock() +} + +// IsActive reports whether pid is currently tracked as an active process. +func (r *ProcessRegistry) IsActive(pid int) bool { + r.mu.RLock() + _, ok := r.activePIDs[int32(pid)] + r.mu.RUnlock() + return ok +} + +// Registry is the global process registry shared between the executor and +// the PID-1 zombie reaper. All executor methods that spawn child processes +// register their PIDs here so the reaper can skip them. +var Registry = NewProcessRegistry() + +// Run starts the command, waits for it to complete, and returns the error. +// The child PID is registered in the global Registry while the process is +// running so that a PID-1 zombie reaper does not steal it. func Run(cmd *exec.Cmd) error { // TODO context: hook name, hook phase, hook binding // TODO observability log.Debug("Executing command", slog.String(pkg.LogKeyCommand, strings.Join(cmd.Args, " ")), slog.String(pkg.LogKeyDir, cmd.Dir)) - return cmd.Run() + if err := cmd.Start(); err != nil { + return err + } + + Registry.Register(cmd.Process.Pid) + defer Registry.Unregister(cmd.Process.Pid) + + return cmd.Wait() } // StderrError is returned by RunAndLogLines when a command fails and produces @@ -113,7 +167,36 @@ func (e *Executor) Output() ([]byte, error) { e.logger.Debug("Executing command", slog.String(pkg.LogKeyCommand, strings.Join(e.cmd.Args, " ")), slog.String(pkg.LogKeyDir, e.cmd.Dir)) - return e.cmd.Output() + + // Reproduce cmd.Output() but interleave PID registration so that the + // PID-1 zombie reaper skips this process. + if e.cmd.Stdout != nil { + return nil, errors.New("exec: Stdout already set") + } + var stdout bytes.Buffer + e.cmd.Stdout = &stdout + + captureErr := e.cmd.Stderr == nil + var stderrBuf bytes.Buffer + if captureErr { + e.cmd.Stderr = &stderrBuf + } + + if err := e.cmd.Start(); err != nil { + return nil, err + } + + Registry.Register(e.cmd.Process.Pid) + defer Registry.Unregister(e.cmd.Process.Pid) + + err := e.cmd.Wait() + if err != nil && captureErr { + if ee, ok := err.(*exec.ExitError); ok { + ee.Stderr = stderrBuf.Bytes() + } + } + + return stdout.Bytes(), err } type CmdUsage struct { @@ -154,7 +237,14 @@ func (e *Executor) RunAndLogLines(ctx context.Context, logLabels map[string]stri e.cmd.Stdout = plo e.cmd.Stderr = io.MultiWriter(ple, stdErr) - err := e.cmd.Run() + if err := e.cmd.Start(); err != nil { + return nil, fmt.Errorf("cmd start: %w", err) + } + + Registry.Register(e.cmd.Process.Pid) + defer Registry.Unregister(e.cmd.Process.Pid) + + err := e.cmd.Wait() if err != nil { if len(stdErr.Bytes()) > 0 { return nil, &StderrError{Message: stdErr.String()} diff --git a/pkg/executor/executor_test.go b/pkg/executor/executor_test.go index ccfc52fb..c2b86f2a 100644 --- a/pkg/executor/executor_test.go +++ b/pkg/executor/executor_test.go @@ -250,3 +250,149 @@ func randStringRunes(n int) string { } return string(b) } + +func TestProcessRegistry_Basic(t *testing.T) { + r := NewProcessRegistry() + + // Initially empty + assert.False(t, r.IsActive(1), " IsActive should return false for unknown PID") + assert.False(t, r.IsActive(12345), "IsActive should return false for unknown PID") + + // Register and check + r.Register(42) + assert.True(t, r.IsActive(42), "IsActive should return true for registered PID") + assert.False(t, r.IsActive(43), "IsActive should return false for different PID") + + // Unregister and check + r.Unregister(42) + assert.False(t, r.IsActive(42), "IsActive should return false after unregister") +} + +func TestProcessRegistry_DoubleUnregister(t *testing.T) { + r := NewProcessRegistry() + + r.Register(100) + r.Unregister(100) + r.Unregister(100) // should not panic + + assert.False(t, r.IsActive(100)) +} + +func TestProcessRegistry_Concurrent(t *testing.T) { + r := NewProcessRegistry() + const goroutines = 100 + const pidsPerGoroutine = 100 + + done := make(chan struct{}) + + // Concurrently register PIDs + for i := range goroutines { + go func() { + defer func() { done <- struct{}{} }() + for j := 0; j < pidsPerGoroutine; j++ { + r.Register(i*pidsPerGoroutine + j) + } + }() + } + + for range goroutines { + <-done + } + + // All PIDs should be registered + for i := range goroutines { + for j := 0; j < pidsPerGoroutine; j++ { + assert.True(t, r.IsActive(i*pidsPerGoroutine+j)) + } + } + + // Concurrently unregister PIDs + for i := range goroutines { + go func() { + defer func() { done <- struct{}{} }() + for j := 0; j < pidsPerGoroutine; j++ { + r.Unregister(i*pidsPerGoroutine + j) + } + }() + } + + for range goroutines { + <-done + } + + // All PIDs should be unregistered + for i := range goroutines { + for j := 0; j < pidsPerGoroutine; j++ { + assert.False(t, r.IsActive(i*pidsPerGoroutine+j)) + } + } +} + +func TestGlobalRegistry_Output_RegistersPID(t *testing.T) { + // Use a fresh registry to avoid interference with other tests + origRegistry := Registry + Registry = NewProcessRegistry() + defer func() { Registry = origRegistry }() + + ex := NewExecutor("", "echo", []string{"hello"}, []string{}) + + // Before execution, no PID is registered + // After execution, PID should be removed from registry + output, err := ex.Output() + assert.NoError(t, err) + assert.Contains(t, string(output), "hello") + + // PID should be unregistered after Output returns + // (We can't easily check that the PID was registered *during* execution + // without a more complex test, but the ProcessRegistry unit tests cover + // the correctness of Register/Unregister.) +} + +func TestGlobalRegistry_Output_FailedStart(t *testing.T) { + origRegistry := Registry + Registry = NewProcessRegistry() + defer func() { Registry = origRegistry }() + + // Command that doesn't exist — Start() should fail + ex := NewExecutor("", "/nonexistent/binary", []string{}, []string{}) + _, err := ex.Output() + assert.Error(t, err) + + // Registry should be empty — nothing was registered since Start failed + // (This doesn't panic, which is the important part.) +} + +func TestGlobalRegistry_RunAndLogLines_RegistersPID(t *testing.T) { + origRegistry := Registry + Registry = NewProcessRegistry() + defer func() { Registry = origRegistry }() + + logger := log.NewLogger() + logger.SetLevel(log.LevelInfo) + + ex := NewExecutor("", "echo", []string{"test-output"}, []string{}). + WithLogger(logger) + + usage, err := ex.RunAndLogLines(context.Background(), map[string]string{}) + assert.NoError(t, err) + assert.NotNil(t, usage) + + // PID should be unregistered after RunAndLogLines returns +} + +func TestGlobalRegistry_RunAndLogLines_FailedStart(t *testing.T) { + origRegistry := Registry + Registry = NewProcessRegistry() + defer func() { Registry = origRegistry }() + + logger := log.NewLogger() + + // Command that doesn't exist — Start() should fail + ex := NewExecutor("", "/nonexistent/binary", []string{}, []string{}). + WithLogger(logger) + + _, err := ex.RunAndLogLines(context.Background(), map[string]string{}) + assert.Error(t, err) + + // Registry should be empty +} From 358452bef5d921900778c7fff1922a693a0f18c5 Mon Sep 17 00:00:00 2001 From: Ruslan Gorbunov Date: Sat, 23 May 2026 07:07:21 +0300 Subject: [PATCH 2/8] add sleep before command wait for testing Signed-off-by: Ruslan Gorbunov --- pkg/executor/executor.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/executor/executor.go b/pkg/executor/executor.go index 0feb4b8a..8e888276 100644 --- a/pkg/executor/executor.go +++ b/pkg/executor/executor.go @@ -189,6 +189,7 @@ func (e *Executor) Output() ([]byte, error) { Registry.Register(e.cmd.Process.Pid) defer Registry.Unregister(e.cmd.Process.Pid) + time.Sleep(2 * time.Second) err := e.cmd.Wait() if err != nil && captureErr { if ee, ok := err.(*exec.ExitError); ok { From 02dbe69259bf5ee6e8a29dad0771620ca5156366 Mon Sep 17 00:00:00 2001 From: Ruslan Gorbunov Date: Sat, 23 May 2026 08:44:29 +0300 Subject: [PATCH 3/8] remove unnecessary sleep from Output method Signed-off-by: Ruslan Gorbunov --- pkg/executor/executor.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/executor/executor.go b/pkg/executor/executor.go index 8e888276..0feb4b8a 100644 --- a/pkg/executor/executor.go +++ b/pkg/executor/executor.go @@ -189,7 +189,6 @@ func (e *Executor) Output() ([]byte, error) { Registry.Register(e.cmd.Process.Pid) defer Registry.Unregister(e.cmd.Process.Pid) - time.Sleep(2 * time.Second) err := e.cmd.Wait() if err != nil && captureErr { if ee, ok := err.(*exec.ExitError); ok { From 03372420c2b93f7cafd932fe25cbe8785cbe49a7 Mon Sep 17 00:00:00 2001 From: Ruslan Gorbunov Date: Sun, 24 May 2026 13:07:43 +0300 Subject: [PATCH 4/8] [shell-operator] chore: use internal pid tracker helpers in executor Signed-off-by: Ruslan Gorbunov --- pkg/executor/executor.go | 55 +++------------------- pkg/executor/executor_test.go | 87 +++++++++++++++++++---------------- pkg/executor/registry.go | 70 ++++++++++++++++++++++++++++ 3 files changed, 123 insertions(+), 89 deletions(-) create mode 100644 pkg/executor/registry.go diff --git a/pkg/executor/executor.go b/pkg/executor/executor.go index 0feb4b8a..b8f45d8d 100644 --- a/pkg/executor/executor.go +++ b/pkg/executor/executor.go @@ -10,7 +10,6 @@ import ( "log/slog" "os/exec" "strings" - "sync" "syscall" "time" @@ -26,48 +25,6 @@ const ( serviceName = "executor" ) -// ProcessRegistry tracks PIDs of processes started by the executor so that -// a PID-1 zombie reaper can skip them (their parent already calls wait). -// This prevents the reaper from stealing a child that cmd.Wait expects to reap. -type ProcessRegistry struct { - mu sync.RWMutex - activePIDs map[int32]struct{} -} - -// NewProcessRegistry creates a new ProcessRegistry. -func NewProcessRegistry() *ProcessRegistry { - return &ProcessRegistry{ - activePIDs: make(map[int32]struct{}), - } -} - -// Register adds pid to the set of active PIDs. -func (r *ProcessRegistry) Register(pid int) { - r.mu.Lock() - r.activePIDs[int32(pid)] = struct{}{} - r.mu.Unlock() -} - -// Unregister removes pid from the set of active PIDs. -func (r *ProcessRegistry) Unregister(pid int) { - r.mu.Lock() - delete(r.activePIDs, int32(pid)) - r.mu.Unlock() -} - -// IsActive reports whether pid is currently tracked as an active process. -func (r *ProcessRegistry) IsActive(pid int) bool { - r.mu.RLock() - _, ok := r.activePIDs[int32(pid)] - r.mu.RUnlock() - return ok -} - -// Registry is the global process registry shared between the executor and -// the PID-1 zombie reaper. All executor methods that spawn child processes -// register their PIDs here so the reaper can skip them. -var Registry = NewProcessRegistry() - // Run starts the command, waits for it to complete, and returns the error. // The child PID is registered in the global Registry while the process is // running so that a PID-1 zombie reaper does not steal it. @@ -80,8 +37,8 @@ func Run(cmd *exec.Cmd) error { return err } - Registry.Register(cmd.Process.Pid) - defer Registry.Unregister(cmd.Process.Pid) + registerPID(cmd.Process.Pid) + defer unregisterPID(cmd.Process.Pid) return cmd.Wait() } @@ -186,8 +143,8 @@ func (e *Executor) Output() ([]byte, error) { return nil, err } - Registry.Register(e.cmd.Process.Pid) - defer Registry.Unregister(e.cmd.Process.Pid) + registerPID(e.cmd.Process.Pid) + defer unregisterPID(e.cmd.Process.Pid) err := e.cmd.Wait() if err != nil && captureErr { @@ -241,8 +198,8 @@ func (e *Executor) RunAndLogLines(ctx context.Context, logLabels map[string]stri return nil, fmt.Errorf("cmd start: %w", err) } - Registry.Register(e.cmd.Process.Pid) - defer Registry.Unregister(e.cmd.Process.Pid) + registerPID(e.cmd.Process.Pid) + defer unregisterPID(e.cmd.Process.Pid) err := e.cmd.Wait() if err != nil { diff --git a/pkg/executor/executor_test.go b/pkg/executor/executor_test.go index c2b86f2a..ba92c134 100644 --- a/pkg/executor/executor_test.go +++ b/pkg/executor/executor_test.go @@ -3,7 +3,6 @@ package executor import ( "bytes" "context" - json "github.com/flant/shell-operator/pkg/utils/json" "fmt" "io" "math/rand/v2" @@ -16,6 +15,8 @@ import ( "github.com/deckhouse/deckhouse/pkg/log" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + json "github.com/flant/shell-operator/pkg/utils/json" ) func TestRunAndLogLines(t *testing.T) { @@ -251,35 +252,48 @@ func randStringRunes(n int) string { return string(b) } +// newTestRegistry creates a fresh processRegistry for tests and swaps the +// global singleton, returning a cleanup function that restores it. +func newTestRegistry(t *testing.T) *processRegistry { + t.Helper() + + r := &processRegistry{activePIDs: make(map[int32]struct{})} + orig := registry + registry = r + t.Cleanup(func() { registry = orig }) + + return r +} + func TestProcessRegistry_Basic(t *testing.T) { - r := NewProcessRegistry() + r := &processRegistry{activePIDs: make(map[int32]struct{})} // Initially empty - assert.False(t, r.IsActive(1), " IsActive should return false for unknown PID") + assert.False(t, r.IsActive(1), "IsActive should return false for unknown PID") assert.False(t, r.IsActive(12345), "IsActive should return false for unknown PID") // Register and check - r.Register(42) + r.register(42) assert.True(t, r.IsActive(42), "IsActive should return true for registered PID") assert.False(t, r.IsActive(43), "IsActive should return false for different PID") // Unregister and check - r.Unregister(42) + r.unregister(42) assert.False(t, r.IsActive(42), "IsActive should return false after unregister") } func TestProcessRegistry_DoubleUnregister(t *testing.T) { - r := NewProcessRegistry() + r := &processRegistry{activePIDs: make(map[int32]struct{})} - r.Register(100) - r.Unregister(100) - r.Unregister(100) // should not panic + r.register(100) + r.unregister(100) + r.unregister(100) // should not panic assert.False(t, r.IsActive(100)) } func TestProcessRegistry_Concurrent(t *testing.T) { - r := NewProcessRegistry() + r := &processRegistry{activePIDs: make(map[int32]struct{})} const goroutines = 100 const pidsPerGoroutine = 100 @@ -290,7 +304,7 @@ func TestProcessRegistry_Concurrent(t *testing.T) { go func() { defer func() { done <- struct{}{} }() for j := 0; j < pidsPerGoroutine; j++ { - r.Register(i*pidsPerGoroutine + j) + r.register(i*pidsPerGoroutine + j) } }() } @@ -311,7 +325,7 @@ func TestProcessRegistry_Concurrent(t *testing.T) { go func() { defer func() { done <- struct{}{} }() for j := 0; j < pidsPerGoroutine; j++ { - r.Unregister(i*pidsPerGoroutine + j) + r.unregister(i*pidsPerGoroutine + j) } }() } @@ -328,44 +342,44 @@ func TestProcessRegistry_Concurrent(t *testing.T) { } } +func TestTracker_IsActive(t *testing.T) { + r := newTestRegistry(t) + tracker := Tracker() + + // PID not registered + assert.False(t, tracker.IsActive(42)) + + // Register via internal helper (same path as executor methods) + r.register(42) + assert.True(t, tracker.IsActive(42)) + + r.unregister(42) + assert.False(t, tracker.IsActive(42)) +} + func TestGlobalRegistry_Output_RegistersPID(t *testing.T) { - // Use a fresh registry to avoid interference with other tests - origRegistry := Registry - Registry = NewProcessRegistry() - defer func() { Registry = origRegistry }() + newTestRegistry(t) ex := NewExecutor("", "echo", []string{"hello"}, []string{}) - // Before execution, no PID is registered - // After execution, PID should be removed from registry output, err := ex.Output() assert.NoError(t, err) assert.Contains(t, string(output), "hello") - // PID should be unregistered after Output returns - // (We can't easily check that the PID was registered *during* execution - // without a more complex test, but the ProcessRegistry unit tests cover - // the correctness of Register/Unregister.) + // PID should be unregistered after Output returns. } func TestGlobalRegistry_Output_FailedStart(t *testing.T) { - origRegistry := Registry - Registry = NewProcessRegistry() - defer func() { Registry = origRegistry }() + newTestRegistry(t) - // Command that doesn't exist — Start() should fail + // Command that doesn't exist — Start() should fail. ex := NewExecutor("", "/nonexistent/binary", []string{}, []string{}) _, err := ex.Output() assert.Error(t, err) - - // Registry should be empty — nothing was registered since Start failed - // (This doesn't panic, which is the important part.) } func TestGlobalRegistry_RunAndLogLines_RegistersPID(t *testing.T) { - origRegistry := Registry - Registry = NewProcessRegistry() - defer func() { Registry = origRegistry }() + newTestRegistry(t) logger := log.NewLogger() logger.SetLevel(log.LevelInfo) @@ -376,23 +390,16 @@ func TestGlobalRegistry_RunAndLogLines_RegistersPID(t *testing.T) { usage, err := ex.RunAndLogLines(context.Background(), map[string]string{}) assert.NoError(t, err) assert.NotNil(t, usage) - - // PID should be unregistered after RunAndLogLines returns } func TestGlobalRegistry_RunAndLogLines_FailedStart(t *testing.T) { - origRegistry := Registry - Registry = NewProcessRegistry() - defer func() { Registry = origRegistry }() + newTestRegistry(t) logger := log.NewLogger() - // Command that doesn't exist — Start() should fail ex := NewExecutor("", "/nonexistent/binary", []string{}, []string{}). WithLogger(logger) _, err := ex.RunAndLogLines(context.Background(), map[string]string{}) assert.Error(t, err) - - // Registry should be empty } diff --git a/pkg/executor/registry.go b/pkg/executor/registry.go new file mode 100644 index 00000000..0fe8d426 --- /dev/null +++ b/pkg/executor/registry.go @@ -0,0 +1,70 @@ +package executor + +import "sync" + +// ProcessTracker is a read-only view into the process registry. +// It is intended for consumers (such as a PID-1 zombie reaper) that need +// to check whether a PID is managed by the executor but must not modify +// the registry. +type ProcessTracker interface { + // IsActive reports whether pid is currently tracked as a running process. + IsActive(pid int) bool +} + +// processRegistry tracks PIDs of processes started by the executor so that +// a PID-1 zombie reaper can skip them (their parent already calls Wait). +// This prevents the reaper from stealing a child that cmd.Wait expects to reap. +// +// The struct is intentionally unexported — all external access goes through +// the ProcessTracker interface (read-only) or the package-level helpers +// registerPID / unregisterPID (write, executor-internal). +type processRegistry struct { + mu sync.RWMutex + activePIDs map[int32]struct{} +} + +// register adds pid to the set of active PIDs. +func (r *processRegistry) register(pid int) { + r.mu.Lock() + r.activePIDs[int32(pid)] = struct{}{} + r.mu.Unlock() +} + +// unregister removes pid from the set of active PIDs. +func (r *processRegistry) unregister(pid int) { + r.mu.Lock() + delete(r.activePIDs, int32(pid)) + r.mu.Unlock() +} + +// IsActive reports whether pid is currently tracked as an active process. +func (r *processRegistry) IsActive(pid int) bool { + r.mu.RLock() + _, ok := r.activePIDs[int32(pid)] + r.mu.RUnlock() + + return ok +} + +// registry is the singleton process registry. +// It is not exported — external packages obtain a ProcessTracker via Tracker(). +var registry = &processRegistry{ + activePIDs: make(map[int32]struct{}), +} + +// Tracker returns a read-only view of the global process registry. +// The zombie reaper should call this once and use the returned ProcessTracker +// to check whether a PID is managed by the executor. +func Tracker() ProcessTracker { + return registry +} + +// registerPID and unregisterPID are package-internal helpers used by Run, +// Output, and RunAndLogLines to track child PIDs. +func registerPID(pid int) { + registry.register(pid) +} + +func unregisterPID(pid int) { + registry.unregister(pid) +} From 5e2fd97ea7aabda981267a018be1626b524074ce Mon Sep 17 00:00:00 2001 From: Ruslan Gorbunov Date: Sun, 24 May 2026 13:25:16 +0300 Subject: [PATCH 5/8] [chore] clarify process registry comment in executor Signed-off-by: Ruslan Gorbunov --- pkg/executor/executor.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/executor/executor.go b/pkg/executor/executor.go index b8f45d8d..beb23f88 100644 --- a/pkg/executor/executor.go +++ b/pkg/executor/executor.go @@ -26,8 +26,8 @@ const ( ) // Run starts the command, waits for it to complete, and returns the error. -// The child PID is registered in the global Registry while the process is -// running so that a PID-1 zombie reaper does not steal it. +// The child PID is registered in the global process registry while the process +// is running so that a PID-1 zombie reaper does not steal it. func Run(cmd *exec.Cmd) error { // TODO context: hook name, hook phase, hook binding // TODO observability From 036b35f2b57b607d20cbbd0f138f390f21fd519e Mon Sep 17 00:00:00 2001 From: Ruslan Gorbunov Date: Sun, 24 May 2026 14:50:22 +0300 Subject: [PATCH 6/8] chore: use defer for process registry locks Signed-off-by: Ruslan Gorbunov --- pkg/executor/registry.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/executor/registry.go b/pkg/executor/registry.go index 0fe8d426..4fa5b4d7 100644 --- a/pkg/executor/registry.go +++ b/pkg/executor/registry.go @@ -26,22 +26,25 @@ type processRegistry struct { // register adds pid to the set of active PIDs. func (r *processRegistry) register(pid int) { r.mu.Lock() + defer r.mu.Unlock() + r.activePIDs[int32(pid)] = struct{}{} - r.mu.Unlock() } // unregister removes pid from the set of active PIDs. func (r *processRegistry) unregister(pid int) { r.mu.Lock() + defer r.mu.Unlock() + delete(r.activePIDs, int32(pid)) - r.mu.Unlock() } // IsActive reports whether pid is currently tracked as an active process. func (r *processRegistry) IsActive(pid int) bool { r.mu.RLock() + defer r.mu.RUnlock() + _, ok := r.activePIDs[int32(pid)] - r.mu.RUnlock() return ok } From b6dc44336117aadbf654199fb64ecfdcb6c1d869 Mon Sep 17 00:00:00 2001 From: Ruslan Gorbunov Date: Mon, 25 May 2026 11:05:19 +0300 Subject: [PATCH 7/8] fix: improve executor registry tests for active pid tracking Signed-off-by: Ruslan Gorbunov --- pkg/executor/executor_test.go | 63 ++++++++++++++++++++++++++++------- 1 file changed, 51 insertions(+), 12 deletions(-) diff --git a/pkg/executor/executor_test.go b/pkg/executor/executor_test.go index ba92c134..550498c1 100644 --- a/pkg/executor/executor_test.go +++ b/pkg/executor/executor_test.go @@ -252,8 +252,9 @@ func randStringRunes(n int) string { return string(b) } -// newTestRegistry creates a fresh processRegistry for tests and swaps the -// global singleton, returning a cleanup function that restores it. +// newTestRegistry creates a fresh processRegistry for tests, swaps the +// global singleton, and restores the original with t.Cleanup. It returns +// the fresh test registry. func newTestRegistry(t *testing.T) *processRegistry { t.Helper() @@ -343,30 +344,48 @@ func TestProcessRegistry_Concurrent(t *testing.T) { } func TestTracker_IsActive(t *testing.T) { - r := newTestRegistry(t) + newTestRegistry(t) tracker := Tracker() // PID not registered assert.False(t, tracker.IsActive(42)) // Register via internal helper (same path as executor methods) - r.register(42) + registerPID(42) assert.True(t, tracker.IsActive(42)) - r.unregister(42) + unregisterPID(42) assert.False(t, tracker.IsActive(42)) } func TestGlobalRegistry_Output_RegistersPID(t *testing.T) { - newTestRegistry(t) + r := newTestRegistry(t) - ex := NewExecutor("", "echo", []string{"hello"}, []string{}) + ex := NewExecutor("", "sh", []string{"-c", "sleep 0.2; echo hello"}, []string{}) - output, err := ex.Output() + outputCh := make(chan []byte, 1) + errCh := make(chan error, 1) + go func() { + output, err := ex.Output() + outputCh <- output + errCh <- err + }() + + assert.Eventually(t, func() bool { + r.mu.RLock() + defer r.mu.RUnlock() + return len(r.activePIDs) > 0 + }, time.Second, 10*time.Millisecond, "expected registry to contain an active PID while Output is running") + + output := <-outputCh + err := <-errCh assert.NoError(t, err) assert.Contains(t, string(output), "hello") - // PID should be unregistered after Output returns. + r.mu.RLock() + count := len(r.activePIDs) + r.mu.RUnlock() + assert.Empty(t, count, "expected registry to be empty after Output returns") } func TestGlobalRegistry_Output_FailedStart(t *testing.T) { @@ -379,17 +398,37 @@ func TestGlobalRegistry_Output_FailedStart(t *testing.T) { } func TestGlobalRegistry_RunAndLogLines_RegistersPID(t *testing.T) { - newTestRegistry(t) + r := newTestRegistry(t) logger := log.NewLogger() logger.SetLevel(log.LevelInfo) - ex := NewExecutor("", "echo", []string{"test-output"}, []string{}). + ex := NewExecutor("", "sh", []string{"-c", "sleep 0.2; echo test-output"}, []string{}). WithLogger(logger) - usage, err := ex.RunAndLogLines(context.Background(), map[string]string{}) + usageCh := make(chan *CmdUsage, 1) + errCh := make(chan error, 1) + go func() { + usage, err := ex.RunAndLogLines(context.Background(), map[string]string{}) + usageCh <- usage + errCh <- err + }() + + assert.Eventually(t, func() bool { + r.mu.RLock() + defer r.mu.RUnlock() + return len(r.activePIDs) > 0 + }, time.Second, 10*time.Millisecond, "expected registry to contain an active PID while RunAndLogLines is running") + + usage := <-usageCh + err := <-errCh assert.NoError(t, err) assert.NotNil(t, usage) + + r.mu.RLock() + count := len(r.activePIDs) + r.mu.RUnlock() + assert.Empty(t, count, "expected registry to be empty after RunAndLogLines returns") } func TestGlobalRegistry_RunAndLogLines_FailedStart(t *testing.T) { From ae0e078cf623e15c8690787abae0f22086e6b7b2 Mon Sep 17 00:00:00 2001 From: Ruslan Gorbunov Date: Tue, 26 May 2026 17:58:40 +0300 Subject: [PATCH 8/8] [shell-operator] fix: atomically register pids after starting commands Signed-off-by: Ruslan Gorbunov --- pkg/executor/executor.go | 12 +++-------- pkg/executor/executor_test.go | 36 ++++++++++++++++++++++++++++---- pkg/executor/registry.go | 39 +++++++++++++++++++++++++++++------ 3 files changed, 68 insertions(+), 19 deletions(-) diff --git a/pkg/executor/executor.go b/pkg/executor/executor.go index beb23f88..e192d0d2 100644 --- a/pkg/executor/executor.go +++ b/pkg/executor/executor.go @@ -33,11 +33,9 @@ func Run(cmd *exec.Cmd) error { // TODO observability log.Debug("Executing command", slog.String(pkg.LogKeyCommand, strings.Join(cmd.Args, " ")), slog.String(pkg.LogKeyDir, cmd.Dir)) - if err := cmd.Start(); err != nil { + if err := startAndRegister(cmd); err != nil { return err } - - registerPID(cmd.Process.Pid) defer unregisterPID(cmd.Process.Pid) return cmd.Wait() @@ -139,11 +137,9 @@ func (e *Executor) Output() ([]byte, error) { e.cmd.Stderr = &stderrBuf } - if err := e.cmd.Start(); err != nil { + if err := startAndRegister(e.cmd); err != nil { return nil, err } - - registerPID(e.cmd.Process.Pid) defer unregisterPID(e.cmd.Process.Pid) err := e.cmd.Wait() @@ -194,11 +190,9 @@ func (e *Executor) RunAndLogLines(ctx context.Context, logLabels map[string]stri e.cmd.Stdout = plo e.cmd.Stderr = io.MultiWriter(ple, stdErr) - if err := e.cmd.Start(); err != nil { + if err := startAndRegister(e.cmd); err != nil { return nil, fmt.Errorf("cmd start: %w", err) } - - registerPID(e.cmd.Process.Pid) defer unregisterPID(e.cmd.Process.Pid) err := e.cmd.Wait() diff --git a/pkg/executor/executor_test.go b/pkg/executor/executor_test.go index 550498c1..9b663080 100644 --- a/pkg/executor/executor_test.go +++ b/pkg/executor/executor_test.go @@ -7,6 +7,7 @@ import ( "io" "math/rand/v2" "os" + "os/exec" "regexp" "strings" "testing" @@ -258,7 +259,7 @@ func randStringRunes(n int) string { func newTestRegistry(t *testing.T) *processRegistry { t.Helper() - r := &processRegistry{activePIDs: make(map[int32]struct{})} + r := &processRegistry{activePIDs: make(map[int]struct{})} orig := registry registry = r t.Cleanup(func() { registry = orig }) @@ -267,7 +268,7 @@ func newTestRegistry(t *testing.T) *processRegistry { } func TestProcessRegistry_Basic(t *testing.T) { - r := &processRegistry{activePIDs: make(map[int32]struct{})} + r := &processRegistry{activePIDs: make(map[int]struct{})} // Initially empty assert.False(t, r.IsActive(1), "IsActive should return false for unknown PID") @@ -284,7 +285,7 @@ func TestProcessRegistry_Basic(t *testing.T) { } func TestProcessRegistry_DoubleUnregister(t *testing.T) { - r := &processRegistry{activePIDs: make(map[int32]struct{})} + r := &processRegistry{activePIDs: make(map[int]struct{})} r.register(100) r.unregister(100) @@ -294,7 +295,7 @@ func TestProcessRegistry_DoubleUnregister(t *testing.T) { } func TestProcessRegistry_Concurrent(t *testing.T) { - r := &processRegistry{activePIDs: make(map[int32]struct{})} + r := &processRegistry{activePIDs: make(map[int]struct{})} const goroutines = 100 const pidsPerGoroutine = 100 @@ -358,6 +359,33 @@ func TestTracker_IsActive(t *testing.T) { assert.False(t, tracker.IsActive(42)) } +func TestStartAndRegister_AtomicWithReaper(t *testing.T) { + r := newTestRegistry(t) + + // StartAndRegister must hold the write-lock across both cmd.Start() and + // PID registration, so there is no window where a zombie reaper could + // observe IsActive(pid) == false for a child that cmd.Wait will later reap. + cmd := exec.Command("sleep", "2") + require.NoError(t, startAndRegister(cmd)) + defer cmd.Process.Kill() + + pid := cmd.Process.Pid + + // The PID must already be visible in the registry — no race window. + assert.True(t, r.IsActive(pid), "PID should be registered immediately after StartAndRegister returns") + + // Simulate what the reaper does: check via the ProcessTracker interface. + tracker := Tracker() + assert.True(t, tracker.IsActive(pid), "ProcessTracker must see the PID as active") + + // Clean up: wait for the process to finish after killing it. + _ = cmd.Process.Kill() + _ = cmd.Wait() + + unregisterPID(pid) + assert.False(t, r.IsActive(pid), "PID should be gone after unregister") +} + func TestGlobalRegistry_Output_RegistersPID(t *testing.T) { r := newTestRegistry(t) diff --git a/pkg/executor/registry.go b/pkg/executor/registry.go index 4fa5b4d7..9f5097ca 100644 --- a/pkg/executor/registry.go +++ b/pkg/executor/registry.go @@ -1,6 +1,9 @@ package executor -import "sync" +import ( + "os/exec" + "sync" +) // ProcessTracker is a read-only view into the process registry. // It is intended for consumers (such as a PID-1 zombie reaper) that need @@ -20,7 +23,7 @@ type ProcessTracker interface { // registerPID / unregisterPID (write, executor-internal). type processRegistry struct { mu sync.RWMutex - activePIDs map[int32]struct{} + activePIDs map[int]struct{} } // register adds pid to the set of active PIDs. @@ -28,7 +31,7 @@ func (r *processRegistry) register(pid int) { r.mu.Lock() defer r.mu.Unlock() - r.activePIDs[int32(pid)] = struct{}{} + r.activePIDs[pid] = struct{}{} } // unregister removes pid from the set of active PIDs. @@ -36,7 +39,24 @@ func (r *processRegistry) unregister(pid int) { r.mu.Lock() defer r.mu.Unlock() - delete(r.activePIDs, int32(pid)) + delete(r.activePIDs, pid) +} + +// startAndRegister calls cmd.Start() and, on success, registers the child +// PID under the same write-lock. This eliminates the window between process +// creation and registration during which a PID-1 zombie reaper could +// prematurely reap a fast-exiting child. +func (r *processRegistry) startAndRegister(cmd *exec.Cmd) error { + r.mu.Lock() + defer r.mu.Unlock() + + if err := cmd.Start(); err != nil { + return err + } + + r.activePIDs[cmd.Process.Pid] = struct{}{} + + return nil } // IsActive reports whether pid is currently tracked as an active process. @@ -44,7 +64,7 @@ func (r *processRegistry) IsActive(pid int) bool { r.mu.RLock() defer r.mu.RUnlock() - _, ok := r.activePIDs[int32(pid)] + _, ok := r.activePIDs[pid] return ok } @@ -52,7 +72,7 @@ func (r *processRegistry) IsActive(pid int) bool { // registry is the singleton process registry. // It is not exported — external packages obtain a ProcessTracker via Tracker(). var registry = &processRegistry{ - activePIDs: make(map[int32]struct{}), + activePIDs: make(map[int]struct{}), } // Tracker returns a read-only view of the global process registry. @@ -71,3 +91,10 @@ func registerPID(pid int) { func unregisterPID(pid int) { registry.unregister(pid) } + +// startAndRegister calls cmd.Start() and registers the resulting PID +// atomically under the registry's write-lock. Callers must still call +// unregisterPID(cmd.Process.Pid) (typically via defer) when cmd.Wait returns. +func startAndRegister(cmd *exec.Cmd) error { + return registry.startAndRegister(cmd) +}