Skip to content
47 changes: 44 additions & 3 deletions pkg/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"log/slog"
Expand All @@ -24,12 +25,20 @@ const (
serviceName = "executor"
)

// Run starts the command, waits for it to complete, and returns the error.
// The child PID is registered in the global process registry while the process
// is running so that a PID-1 zombie reaper does not steal it.
func Run(cmd *exec.Cmd) error {
// TODO context: hook name, hook phase, hook binding
// TODO observability
log.Debug("Executing command", slog.String(pkg.LogKeyCommand, strings.Join(cmd.Args, " ")), slog.String(pkg.LogKeyDir, cmd.Dir))

return cmd.Run()
if err := startAndRegister(cmd); err != nil {
return err
}
defer unregisterPID(cmd.Process.Pid)

return cmd.Wait()
}

// StderrError is returned by RunAndLogLines when a command fails and produces
Expand Down Expand Up @@ -113,7 +122,34 @@ func (e *Executor) Output() ([]byte, error) {
e.logger.Debug("Executing command",
slog.String(pkg.LogKeyCommand, strings.Join(e.cmd.Args, " ")),
slog.String(pkg.LogKeyDir, e.cmd.Dir))
return e.cmd.Output()

// Reproduce cmd.Output() but interleave PID registration so that the
// PID-1 zombie reaper skips this process.
if e.cmd.Stdout != nil {
return nil, errors.New("exec: Stdout already set")
}
var stdout bytes.Buffer
e.cmd.Stdout = &stdout

captureErr := e.cmd.Stderr == nil
var stderrBuf bytes.Buffer
if captureErr {
e.cmd.Stderr = &stderrBuf
}

if err := startAndRegister(e.cmd); err != nil {
return nil, err
}
defer unregisterPID(e.cmd.Process.Pid)

err := e.cmd.Wait()
if err != nil && captureErr {
if ee, ok := err.(*exec.ExitError); ok {
ee.Stderr = stderrBuf.Bytes()
}
}

return stdout.Bytes(), err
}

type CmdUsage struct {
Expand Down Expand Up @@ -154,7 +190,12 @@ func (e *Executor) RunAndLogLines(ctx context.Context, logLabels map[string]stri
e.cmd.Stdout = plo
e.cmd.Stderr = io.MultiWriter(ple, stdErr)

err := e.cmd.Run()
if err := startAndRegister(e.cmd); err != nil {
return nil, fmt.Errorf("cmd start: %w", err)
}
defer unregisterPID(e.cmd.Process.Pid)

err := e.cmd.Wait()
if err != nil {
if len(stdErr.Bytes()) > 0 {
return nil, &StderrError{Message: stdErr.String()}
Expand Down
222 changes: 221 additions & 1 deletion pkg/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package executor
import (
"bytes"
"context"
json "github.com/flant/shell-operator/pkg/utils/json"
"fmt"
"io"
"math/rand/v2"
"os"
"os/exec"
"regexp"
"strings"
"testing"
Expand All @@ -16,6 +16,8 @@ import (
"github.com/deckhouse/deckhouse/pkg/log"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

json "github.com/flant/shell-operator/pkg/utils/json"
)

func TestRunAndLogLines(t *testing.T) {
Expand Down Expand Up @@ -250,3 +252,221 @@ func randStringRunes(n int) string {
}
return string(b)
}

// newTestRegistry creates a fresh processRegistry for tests, swaps the
// global singleton, and restores the original with t.Cleanup. It returns
// the fresh test registry.
func newTestRegistry(t *testing.T) *processRegistry {
t.Helper()

r := &processRegistry{activePIDs: make(map[int]struct{})}
orig := registry
registry = r
t.Cleanup(func() { registry = orig })

return r
}

func TestProcessRegistry_Basic(t *testing.T) {
r := &processRegistry{activePIDs: make(map[int]struct{})}

// Initially empty
assert.False(t, r.IsActive(1), "IsActive should return false for unknown PID")
assert.False(t, r.IsActive(12345), "IsActive should return false for unknown PID")

// Register and check
r.register(42)
assert.True(t, r.IsActive(42), "IsActive should return true for registered PID")
assert.False(t, r.IsActive(43), "IsActive should return false for different PID")

// Unregister and check
r.unregister(42)
assert.False(t, r.IsActive(42), "IsActive should return false after unregister")
}

func TestProcessRegistry_DoubleUnregister(t *testing.T) {
r := &processRegistry{activePIDs: make(map[int]struct{})}

r.register(100)
r.unregister(100)
r.unregister(100) // should not panic

assert.False(t, r.IsActive(100))
}

func TestProcessRegistry_Concurrent(t *testing.T) {
r := &processRegistry{activePIDs: make(map[int]struct{})}
const goroutines = 100
const pidsPerGoroutine = 100

done := make(chan struct{})

// Concurrently register PIDs
for i := range goroutines {
go func() {
defer func() { done <- struct{}{} }()
for j := 0; j < pidsPerGoroutine; j++ {
r.register(i*pidsPerGoroutine + j)
}
}()
}

for range goroutines {
<-done
}

// All PIDs should be registered
for i := range goroutines {
for j := 0; j < pidsPerGoroutine; j++ {
assert.True(t, r.IsActive(i*pidsPerGoroutine+j))
}
}

// Concurrently unregister PIDs
for i := range goroutines {
go func() {
defer func() { done <- struct{}{} }()
for j := 0; j < pidsPerGoroutine; j++ {
r.unregister(i*pidsPerGoroutine + j)
}
}()
}

for range goroutines {
<-done
}

// All PIDs should be unregistered
for i := range goroutines {
for j := 0; j < pidsPerGoroutine; j++ {
assert.False(t, r.IsActive(i*pidsPerGoroutine+j))
}
}
}

func TestTracker_IsActive(t *testing.T) {
newTestRegistry(t)
tracker := Tracker()

// PID not registered
assert.False(t, tracker.IsActive(42))

// Register via internal helper (same path as executor methods)
registerPID(42)
assert.True(t, tracker.IsActive(42))

unregisterPID(42)
assert.False(t, tracker.IsActive(42))
}

func TestStartAndRegister_AtomicWithReaper(t *testing.T) {
r := newTestRegistry(t)

// StartAndRegister must hold the write-lock across both cmd.Start() and
// PID registration, so there is no window where a zombie reaper could
// observe IsActive(pid) == false for a child that cmd.Wait will later reap.
cmd := exec.Command("sleep", "2")
require.NoError(t, startAndRegister(cmd))
defer cmd.Process.Kill()

pid := cmd.Process.Pid

// The PID must already be visible in the registry — no race window.
assert.True(t, r.IsActive(pid), "PID should be registered immediately after StartAndRegister returns")

// Simulate what the reaper does: check via the ProcessTracker interface.
tracker := Tracker()
assert.True(t, tracker.IsActive(pid), "ProcessTracker must see the PID as active")

// Clean up: wait for the process to finish after killing it.
_ = cmd.Process.Kill()
_ = cmd.Wait()

unregisterPID(pid)
assert.False(t, r.IsActive(pid), "PID should be gone after unregister")
}

func TestGlobalRegistry_Output_RegistersPID(t *testing.T) {
r := newTestRegistry(t)

ex := NewExecutor("", "sh", []string{"-c", "sleep 0.2; echo hello"}, []string{})

outputCh := make(chan []byte, 1)
errCh := make(chan error, 1)
go func() {
output, err := ex.Output()
outputCh <- output
errCh <- err
}()

assert.Eventually(t, func() bool {
r.mu.RLock()
defer r.mu.RUnlock()
return len(r.activePIDs) > 0
}, time.Second, 10*time.Millisecond, "expected registry to contain an active PID while Output is running")

output := <-outputCh
err := <-errCh
assert.NoError(t, err)
assert.Contains(t, string(output), "hello")

r.mu.RLock()
count := len(r.activePIDs)
r.mu.RUnlock()
assert.Empty(t, count, "expected registry to be empty after Output returns")
}

func TestGlobalRegistry_Output_FailedStart(t *testing.T) {
newTestRegistry(t)

// Command that doesn't exist — Start() should fail.
ex := NewExecutor("", "/nonexistent/binary", []string{}, []string{})
_, err := ex.Output()
assert.Error(t, err)
}

func TestGlobalRegistry_RunAndLogLines_RegistersPID(t *testing.T) {
r := newTestRegistry(t)

logger := log.NewLogger()
logger.SetLevel(log.LevelInfo)

ex := NewExecutor("", "sh", []string{"-c", "sleep 0.2; echo test-output"}, []string{}).
WithLogger(logger)

usageCh := make(chan *CmdUsage, 1)
errCh := make(chan error, 1)
go func() {
usage, err := ex.RunAndLogLines(context.Background(), map[string]string{})
usageCh <- usage
errCh <- err
}()

assert.Eventually(t, func() bool {
r.mu.RLock()
defer r.mu.RUnlock()
return len(r.activePIDs) > 0
}, time.Second, 10*time.Millisecond, "expected registry to contain an active PID while RunAndLogLines is running")

usage := <-usageCh
err := <-errCh
assert.NoError(t, err)
assert.NotNil(t, usage)

r.mu.RLock()
count := len(r.activePIDs)
r.mu.RUnlock()
assert.Empty(t, count, "expected registry to be empty after RunAndLogLines returns")
}

func TestGlobalRegistry_RunAndLogLines_FailedStart(t *testing.T) {
newTestRegistry(t)

logger := log.NewLogger()

ex := NewExecutor("", "/nonexistent/binary", []string{}, []string{}).
WithLogger(logger)

_, err := ex.RunAndLogLines(context.Background(), map[string]string{})
assert.Error(t, err)
}
Loading
Loading