diff --git a/.semaphore/release.yml b/.semaphore/release.yml index 69c9f68..f415f28 100644 --- a/.semaphore/release.yml +++ b/.semaphore/release.yml @@ -15,7 +15,7 @@ blocks: - name: dockerhub-write prologue: commands: - - sem-version go 1.23.8 + - sem-version go 1.26.3 - "export GOPATH=~/go" - "export PATH=/home/semaphore/go/bin:$PATH" - checkout diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index 01b9360..c739207 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -18,7 +18,7 @@ blocks: prologue: commands: - - sem-version go 1.24.12 + - sem-version go 1.26.3 - checkout jobs: @@ -37,7 +37,7 @@ blocks: - checkout - mv ~/.ssh/security-toolbox ~/.ssh/id_rsa - sudo chmod 600 ~/.ssh/id_rsa - - sem-version go 1.24.12 + - sem-version go 1.26.3 jobs: - name: Check dependencies commands: @@ -63,7 +63,7 @@ blocks: prologue: commands: - - sem-version go 1.24.12 + - sem-version go 1.26.3 - checkout - go version - go get @@ -96,7 +96,7 @@ blocks: prologue: commands: - - sem-version go 1.24.12 + - sem-version go 1.26.3 - checkout - go version - go get @@ -163,7 +163,7 @@ blocks: prologue: commands: - - sem-version go 1.24.12 + - sem-version go 1.26.3 - checkout - go version - go get @@ -200,7 +200,7 @@ blocks: prologue: commands: - - sem-version go 1.24.12 + - sem-version go 1.26.3 - checkout - go version - go get @@ -241,7 +241,7 @@ blocks: prologue: commands: - - sem-version go 1.24.12 + - sem-version go 1.26.3 - curl -sLO https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 && install minikube-linux-amd64 /tmp/ - /tmp/minikube-linux-amd64 config set WantUpdateNotification false - /tmp/minikube-linux-amd64 start --driver=docker diff --git a/main.go b/main.go index 4766b87..105fae4 100644 --- a/main.go +++ b/main.go @@ -101,8 +101,8 @@ func getLogFilePath() string { return filepath.Join(os.TempDir(), "agent_log") } - parentDirectory := path.Dir(logFilePath) - err := os.MkdirAll(parentDirectory, 0640) + parentDirectory := filepath.Clean(path.Dir(logFilePath)) + err := os.MkdirAll(parentDirectory, 0750) if err != nil { log.Panicf("Could not create directories to place log file in '%s': %v", logFilePath, err) } @@ -435,7 +435,7 @@ func RunSingleJob(httpClient *http.Client) { panic(err) } - job.JobLogArchived = true + job.SetLogArchivalStatus(jobs.JobLogArchivalStatusCompleted) job.Run() } diff --git a/pkg/eventlogger/filebackend.go b/pkg/eventlogger/filebackend.go index 23b69ab..ef66a2c 100644 --- a/pkg/eventlogger/filebackend.go +++ b/pkg/eventlogger/filebackend.go @@ -90,7 +90,7 @@ func (l *FileBackend) CloseWithOptions(options CloseOptions) error { } func (l *FileBackend) Iterate(fn func([]byte) error) error { - fd, err := os.OpenFile(l.path, os.O_RDONLY, os.ModePerm) + fd, err := os.OpenFile(l.path, os.O_RDONLY, 0600) if err != nil { return fmt.Errorf("error opening file '%s': %v", l.path, err) } @@ -112,7 +112,7 @@ func (l *FileBackend) Iterate(fn func([]byte) error) error { } func (l *FileBackend) Read(startingLineNumber, maxLines int, writer io.Writer) (int, error) { - fd, err := os.OpenFile(l.path, os.O_RDONLY, os.ModePerm) + fd, err := os.OpenFile(l.path, os.O_RDONLY, 0600) if err != nil { return startingLineNumber, err } @@ -140,7 +140,11 @@ func (l *FileBackend) Read(startingLineNumber, maxLines int, writer io.Writer) ( // Otherwise, we advance to the next line and stream the current line. lineNumber++ - fmt.Fprint(writer, line) + _, err = fmt.Fprint(writer, line) + if err != nil { + _ = fd.Close() + return lineNumber, err + } linesStreamed++ // if we have streamed the number of lines we want, we stop. diff --git a/pkg/eventlogger/filebackend_test.go b/pkg/eventlogger/filebackend_test.go index 917529d..6ddcbe3 100644 --- a/pkg/eventlogger/filebackend_test.go +++ b/pkg/eventlogger/filebackend_test.go @@ -2,6 +2,7 @@ package eventlogger import ( "bytes" + "errors" "fmt" "io/ioutil" "os" @@ -50,6 +51,23 @@ func Test__LogsArePushedToFile(t *testing.T) { assert.Nil(t, err) } +func Test__ReadReturnsErrorFromWriter(t *testing.T) { + tmpFileName := filepath.Join(os.TempDir(), fmt.Sprintf("logs_%d.json", time.Now().UnixNano())) + fileBackend, err := NewFileBackend(tmpFileName, DefaultMaxSizeInBytes) + assert.Nil(t, err) + assert.Nil(t, fileBackend.Open()) + + timestamp := int(time.Now().Unix()) + assert.Nil(t, fileBackend.Write(&JobStartedEvent{Timestamp: timestamp, Event: "job_started"})) + + writerErr := errors.New("write failed") + _, err = fileBackend.Read(0, 1000, errorWriter{err: writerErr}) + assert.Equal(t, writerErr, err) + + err = fileBackend.Close() + assert.Nil(t, err) +} + func Test__ReadDoesNotIncludeDoubleNewlines(t *testing.T) { tmpFileName := filepath.Join(os.TempDir(), fmt.Sprintf("logs_%d.json", time.Now().UnixNano())) fileBackend, err := NewFileBackend(tmpFileName, DefaultMaxSizeInBytes) @@ -131,3 +149,11 @@ func Test__CloseWithOptions(t *testing.T) { assert.False(t, logsWereTrimmed) }) } + +type errorWriter struct { + err error +} + +func (w errorWriter) Write(p []byte) (n int, err error) { + return 0, w.err +} diff --git a/pkg/executors/authorized_keys.go b/pkg/executors/authorized_keys.go index 22c679c..7250c55 100644 --- a/pkg/executors/authorized_keys.go +++ b/pkg/executors/authorized_keys.go @@ -18,7 +18,8 @@ func InjectEntriesToAuthorizedKeys(keys []api.PublicKey) error { } sshDirectory := filepath.Join(homeDir, ".ssh") - err = os.MkdirAll(sshDirectory, os.ModePerm) + // SSH requires the .ssh directory to be accessible only by its owner. + err = os.MkdirAll(sshDirectory, 0700) if err != nil { return err } diff --git a/pkg/executors/docker_compose_executor.go b/pkg/executors/docker_compose_executor.go index 3245439..a01ebbf 100644 --- a/pkg/executors/docker_compose_executor.go +++ b/pkg/executors/docker_compose_executor.go @@ -66,7 +66,12 @@ func (e *DockerComposeExecutor) Prepare() int { return 1 } - err := os.MkdirAll(e.tmpDirectory, os.ModePerm) + // This directory is bind-mounted read-only into the job container (see the + // `-v` mount in startBashShell), whose process may run as a non-root user, so + // it must stay world-traversable for that user to read the command files. + // We drop world-write (0777 -> 0755) but keep read/execute for that reason. + // #nosec G301 + err := os.MkdirAll(e.tmpDirectory, 0755) if err != nil { return 1 } diff --git a/pkg/jobs/job.go b/pkg/jobs/job.go index 1561bb0..9ea78b1 100644 --- a/pkg/jobs/job.go +++ b/pkg/jobs/job.go @@ -10,6 +10,7 @@ import ( "runtime" "strconv" "strings" + "sync" "time" api "github.com/semaphoreci/agent/pkg/api" @@ -34,6 +35,22 @@ const JobStopped = "stopped" const MinSizeForCompression = 1024 * 1024 const DefaultSizeForCompression = 1024 * 1024 * 100 const MaxSizeForCompression = 1024 * 1024 * 1024 +// Must comfortably exceed loghub archivator's worst-case in-process retry budget +// after a failed fetch. Loghub retries up to 10 times with attempt*1s backoff and +// a 10s curl --max-time per attempt: backoff (1+2+...+9 = 45s) plus up to 9*10s of +// curl time ≈ 135s. The window must cover that so a slow retry can finish streaming +// before the agent tears down the job. Override via +// SEMAPHORE_AGENT_LOG_ARCHIVAL_FAILED_WAIT_TIMEOUT if loghub's retry budget changes. +const DefaultLogArchivalFailedWaitTimeout = 150 * time.Second +const MaxLogArchivalWaitTimeout = 3600 * time.Second + +type JobLogArchivalStatus string + +const ( + JobLogArchivalStatusPending JobLogArchivalStatus = "pending" + JobLogArchivalStatusCompleted JobLogArchivalStatus = "completed" + JobLogArchivalStatusFailed JobLogArchivalStatus = "failed" +) type Job struct { Client *http.Client @@ -42,11 +59,12 @@ type Job struct { Executor executors.Executor - JobLogArchived bool - Stopped bool - Finished bool - UploadJobLogs string - UserAgent string + logArchivalStatus JobLogArchivalStatus + logArchivalStatusLock sync.RWMutex + Stopped bool + Finished bool + UploadJobLogs string + UserAgent string } type JobOptions struct { @@ -92,12 +110,12 @@ func NewJobWithOptions(options *JobOptions) (*Job, error) { } job := &Job{ - Client: options.Client, - Request: options.Request, - JobLogArchived: false, - Stopped: false, - UploadJobLogs: options.UploadJobLogs, - UserAgent: options.UserAgent, + Client: options.Client, + Request: options.Request, + logArchivalStatus: JobLogArchivalStatusPending, + Stopped: false, + UploadJobLogs: options.UploadJobLogs, + UserAgent: options.UserAgent, } if options.Logger != nil { @@ -173,6 +191,87 @@ type RunOptions struct { CallbackRetryAttempts int } +func (job *Job) SetLogArchivalStatus(status JobLogArchivalStatus) { + job.logArchivalStatusLock.Lock() + defer job.logArchivalStatusLock.Unlock() + + job.logArchivalStatus = status +} + +func (job *Job) GetLogArchivalStatus() JobLogArchivalStatus { + job.logArchivalStatusLock.RLock() + defer job.logArchivalStatusLock.RUnlock() + + if job.logArchivalStatus == "" { + return JobLogArchivalStatusPending + } + + return job.logArchivalStatus +} + +// waitForLogArchival blocks until the loghub archivator reaches a terminal state. +// +// While the archivator has never attempted a fetch (Pending), we wait +// indefinitely. For hosted jobs the job_finished event stays queued, so the +// archivator will eventually fetch once its backend recovers - bounding this +// would risk discarding the job's logs during a prolonged archivator outage. +// +// Once the archivator has engaged but a fetch failed (Failed), we know it is +// alive and retrying, so we wait only failedTimeout for a retry to succeed +// before giving up and continuing teardown. +func (job *Job) waitForLogArchival(failedTimeout, pollInterval time.Duration) JobLogArchivalStatus { + failedDeadline := time.Time{} + + for { + status := job.GetLogArchivalStatus() + if status == JobLogArchivalStatusCompleted { + return status + } + + if status == JobLogArchivalStatusFailed { + if failedDeadline.IsZero() { + failedDeadline = time.Now().Add(failedTimeout) + } + + if time.Now().After(failedDeadline) { + return JobLogArchivalStatusFailed + } + } + + time.Sleep(pollInterval) + } +} + +func (job *Job) logArchivalFailedWaitTimeout() time.Duration { + fromEnv := os.Getenv("SEMAPHORE_AGENT_LOG_ARCHIVAL_FAILED_WAIT_TIMEOUT") + if fromEnv == "" { + return DefaultLogArchivalFailedWaitTimeout + } + + n, err := strconv.ParseInt(fromEnv, 10, 64) + if err != nil { + log.Errorf( + "Error parsing SEMAPHORE_AGENT_LOG_ARCHIVAL_FAILED_WAIT_TIMEOUT: %v - using default of %s", + err, + DefaultLogArchivalFailedWaitTimeout, + ) + return DefaultLogArchivalFailedWaitTimeout + } + + timeout := time.Duration(n) * time.Second + if timeout <= 0 || timeout > MaxLogArchivalWaitTimeout { + log.Errorf( + "Invalid SEMAPHORE_AGENT_LOG_ARCHIVAL_FAILED_WAIT_TIMEOUT %s, must be in range 1s-%s, using default %s", + timeout, + MaxLogArchivalWaitTimeout, + DefaultLogArchivalFailedWaitTimeout, + ) + return DefaultLogArchivalFailedWaitTimeout + } + + return timeout +} + func (o *RunOptions) GetPreJobHookWarning() string { if o.PreJobHookPath == "" { return "" @@ -490,9 +589,11 @@ func (job *Job) Teardown(result string, epiloguesExecuted bool, callbackRetryAtt /* * For hosted jobs, we use callbacks: - * 1. Send finished callback and log job_finished event - * 2. Wait for archivator to collect all the logs - * 3. Send teardown_finished callback and close the logger + * 1. Send finished callback and log job_finished event. + * 2. Wait for archivator to reach a terminal archival state (completed, or failed + * after a bounded retry window). A job the archivator never reaches stays + * pending and waits indefinitely so its logs are not lost. + * 3. Send teardown_finished callback and close the logger. */ func (job *Job) teardownWithCallbacks(result string, callbackRetryAttempts int) error { err := job.SendFinishedCallback(result, callbackRetryAttempts) @@ -504,16 +605,19 @@ func (job *Job) teardownWithCallbacks(result string, callbackRetryAttempts int) job.Logger.LogJobFinished(result) log.Debug("Waiting for archivator") - for { - if job.JobLogArchived { - break - } else { - time.Sleep(1000 * time.Millisecond) - } + archivalStatus := job.waitForLogArchival( + job.logArchivalFailedWaitTimeout(), + time.Second, + ) + switch archivalStatus { + case JobLogArchivalStatusCompleted: + log.Debug("Archivator finished") + case JobLogArchivalStatusFailed: + log.Warn("Archivator log streaming failed - continuing teardown") + default: + log.Warnf("Unexpected log archival status '%s' - continuing teardown", archivalStatus) } - log.Debug("Archivator finished") - // The job already finished, but executor is still open. // We use the open executor to upload the job logs as // an artifact, in case it is above the acceptable limit. diff --git a/pkg/jobs/job_test.go b/pkg/jobs/job_test.go index 6bcad3c..2138c0c 100644 --- a/pkg/jobs/job_test.go +++ b/pkg/jobs/job_test.go @@ -1381,3 +1381,54 @@ func Test__UsePreJobHookAndFailOnError(t *testing.T) { os.Remove(hook) } + +func Test__LogArchivalStatusDefaultsToPending(t *testing.T) { + job := &Job{} + assert.Equal(t, JobLogArchivalStatusPending, job.GetLogArchivalStatus()) +} + +func Test__WaitForLogArchivalStatus(t *testing.T) { + t.Run("returns completed immediately", func(t *testing.T) { + job := &Job{} + job.SetLogArchivalStatus(JobLogArchivalStatusCompleted) + + status := job.waitForLogArchival(50*time.Millisecond, 10*time.Millisecond) + assert.Equal(t, JobLogArchivalStatusCompleted, status) + }) + + t.Run("returns completed when retry succeeds after a failed fetch", func(t *testing.T) { + job := &Job{} + job.SetLogArchivalStatus(JobLogArchivalStatusFailed) + + go func() { + time.Sleep(20 * time.Millisecond) + job.SetLogArchivalStatus(JobLogArchivalStatusCompleted) + }() + + status := job.waitForLogArchival(100*time.Millisecond, 10*time.Millisecond) + assert.Equal(t, JobLogArchivalStatusCompleted, status) + }) + + t.Run("waits indefinitely while pending and exits when completed", func(t *testing.T) { + job := &Job{} + + // Completed arrives well after the failed-state grace window would have + // elapsed; a pending job must keep waiting regardless and never give up + // (so logs are not lost during a prolonged archivator outage). + go func() { + time.Sleep(80 * time.Millisecond) + job.SetLogArchivalStatus(JobLogArchivalStatusCompleted) + }() + + status := job.waitForLogArchival(20*time.Millisecond, 10*time.Millisecond) + assert.Equal(t, JobLogArchivalStatusCompleted, status) + }) + + t.Run("returns failed after failed-state grace timeout", func(t *testing.T) { + job := &Job{} + job.SetLogArchivalStatus(JobLogArchivalStatusFailed) + + status := job.waitForLogArchival(50*time.Millisecond, 10*time.Millisecond) + assert.Equal(t, JobLogArchivalStatusFailed, status) + }) +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 55b6d31..42d334e 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -25,6 +25,13 @@ import ( const DefaultCallbackRetryAttempts = 300 +var ( + defaultReadHeaderTimeout = 5 * time.Second + defaultWriteTimeout = 30 * time.Second + defaultReadTimeout = 10 * time.Second + defaultIdleTimeout = 30 * time.Second +) + type Server struct { Logfile io.Writer ActiveJob *jobs.Job @@ -101,15 +108,7 @@ func (s *Server) Serve() { log.Infof("Agent %s listening on https://%s\n", s.Config.Version, address) loggedRouter := handlers.LoggingHandler(s.Logfile, s.router) - - server := &http.Server{ - Addr: address, - ReadHeaderTimeout: 5 * time.Second, - WriteTimeout: 5 * time.Second, - ReadTimeout: 10 * time.Second, - IdleTimeout: 30 * time.Second, - Handler: loggedRouter, - } + server := newHTTPServer(address, loggedRouter) err := server.ListenAndServeTLS(s.Config.TLSCertPath, s.Config.TLSKeyPath) if err != nil { @@ -117,6 +116,17 @@ func (s *Server) Serve() { } } +func newHTTPServer(address string, handler http.Handler) *http.Server { + return &http.Server{ + Addr: address, + ReadHeaderTimeout: defaultReadHeaderTimeout, + WriteTimeout: defaultWriteTimeout, + ReadTimeout: defaultReadTimeout, + IdleTimeout: defaultIdleTimeout, + Handler: handler, + } +} + func (s *Server) Status(w http.ResponseWriter, r *http.Request) { w.WriteHeader(200) m := make(map[string]interface{}) @@ -149,7 +159,7 @@ func (s *Server) JobLogs(w http.ResponseWriter, r *http.Request) { if s.ActiveJob == nil { log.Warnf("Attempt to fetch logs for '%s' before any job is received", jobID) w.WriteHeader(http.StatusNotFound) - fmt.Fprintf(w, `{"message": "job %s is not running"}`, jobID) + fmt.Fprint(w, `{"message": "job is not running"}`) return } @@ -159,7 +169,7 @@ func (s *Server) JobLogs(w http.ResponseWriter, r *http.Request) { if runningJobID != jobID { log.Warnf("Attempt to fetch logs for '%s', but job '%s' is the one running", jobID, runningJobID) w.WriteHeader(http.StatusForbidden) - fmt.Fprintf(w, `{"message": "job %s is not running"}`, jobID) + fmt.Fprint(w, `{"message": "job is not running"}`) return } @@ -168,16 +178,22 @@ func (s *Server) JobLogs(w http.ResponseWriter, r *http.Request) { startFromLine = 0 } + isArchivator := r.Header.Get("X-Client-Name") == "archivator" + _, err = s.ActiveJob.Logger.Backend.Read(startFromLine, math.MaxInt32, w) if err != nil { log.Errorf("Error while streaming logs: %v", err) + if isArchivator { + s.ActiveJob.SetLogArchivalStatus(jobs.JobLogArchivalStatusFailed) + } http.Error(w, err.Error(), 500) fmt.Fprintf(w, `{"message": "%s"}`, err) + return } - if r.Header.Get("X-Client-Name") == "archivator" { - s.ActiveJob.JobLogArchived = true + if isArchivator { + s.ActiveJob.SetLogArchivalStatus(jobs.JobLogArchivalStatusCompleted) } } diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index 444d424..83837e4 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -3,7 +3,9 @@ package server import ( "bytes" "encoding/json" + "errors" "fmt" + "io" "net/http" "net/http/httptest" "sync" @@ -12,6 +14,8 @@ import ( "github.com/golang-jwt/jwt/v4" api "github.com/semaphoreci/agent/pkg/api" + eventlogger "github.com/semaphoreci/agent/pkg/eventlogger" + jobs "github.com/semaphoreci/agent/pkg/jobs" "github.com/stretchr/testify/assert" ) @@ -186,6 +190,66 @@ func Test__RunJobAcceptsSameJobAgain(t *testing.T) { assert.Equal(t, totalReq-1, countBodies(bodies, "job is already running")) } +func Test__HTTPServerTimeouts(t *testing.T) { + server := newHTTPServer("0.0.0.0:1234", http.NewServeMux()) + + assert.Equal(t, defaultReadHeaderTimeout, server.ReadHeaderTimeout) + assert.Equal(t, defaultWriteTimeout, server.WriteTimeout) + assert.Equal(t, defaultReadTimeout, server.ReadTimeout) + assert.Equal(t, defaultIdleTimeout, server.IdleTimeout) +} + +func Test__JobLogsArchivatorStatusIsSetOnlyOnSuccessfulStream(t *testing.T) { + dummyKey := "dummykey" + testServer := NewServer(ServerConfig{ + HTTPClient: http.DefaultClient, + JWTSecret: []byte(dummyKey), + }) + + token, err := generateToken(dummyKey) + if !assert.NoError(t, err) { + return + } + + t.Run("stream succeeds -> archived flag is true", func(t *testing.T) { + testServer.ActiveJob = &jobs.Job{ + Request: &api.JobRequest{JobID: "job-success"}, + Logger: &eventlogger.Logger{ + Backend: staticReadBackend{line: "{\"event\":\"job_started\"}\n"}, + }, + } + + req, _ := http.NewRequest("GET", "/jobs/job-success/log", nil) + req.Header.Add("Authorization", "Token "+token) + req.Header.Add("X-Client-Name", "archivator") + + rr := httptest.NewRecorder() + testServer.router.ServeHTTP(rr, req) + + assert.Equal(t, http.StatusOK, rr.Code) + assert.Equal(t, jobs.JobLogArchivalStatusCompleted, testServer.ActiveJob.GetLogArchivalStatus()) + }) + + t.Run("stream fails -> archival status is failed", func(t *testing.T) { + testServer.ActiveJob = &jobs.Job{ + Request: &api.JobRequest{JobID: "job-fail"}, + Logger: &eventlogger.Logger{ + Backend: staticReadBackend{err: errors.New("stream failed")}, + }, + } + + req, _ := http.NewRequest("GET", "/jobs/job-fail/log", nil) + req.Header.Add("Authorization", "Token "+token) + req.Header.Add("X-Client-Name", "archivator") + + rr := httptest.NewRecorder() + testServer.router.ServeHTTP(rr, req) + + assert.Equal(t, http.StatusInternalServerError, rr.Code) + assert.Equal(t, jobs.JobLogArchivalStatusFailed, testServer.ActiveJob.GetLogArchivalStatus()) + }) +} + func getAgentStatus(t *testing.T, testServer *Server, token string) string { req, _ := http.NewRequest("GET", "/status", nil) req.Header.Add("Authorization", "Token "+token) @@ -265,3 +329,37 @@ func generateToken(key string) (string, error) { return token.SignedString([]byte(key)) } + +type staticReadBackend struct { + line string + err error +} + +func (b staticReadBackend) Open() error { + return nil +} + +func (b staticReadBackend) Write(interface{}) error { + return nil +} + +func (b staticReadBackend) Read(startFrom, maxLines int, writer io.Writer) (int, error) { + if b.err != nil { + return startFrom, b.err + } + + _, err := io.WriteString(writer, b.line) + return startFrom + 1, err +} + +func (b staticReadBackend) Iterate(func(event []byte) error) error { + return nil +} + +func (b staticReadBackend) Close() error { + return nil +} + +func (b staticReadBackend) CloseWithOptions(eventlogger.CloseOptions) error { + return nil +} diff --git a/pkg/shell/output_buffer_test.go b/pkg/shell/output_buffer_test.go index 1454c2d..5abf474 100644 --- a/pkg/shell/output_buffer_test.go +++ b/pkg/shell/output_buffer_test.go @@ -1,7 +1,6 @@ package shell import ( - "context" "strings" "testing" "time" @@ -205,32 +204,24 @@ func Test__OutputBuffer__DoesNotWaitForeverForOutputToBeFlushed(t *testing.T) { input = append(input, 'a') } - buffer, _ := NewOutputBufferWithFlushTimeout(func(s string) {}, time.Second) + flushTimeout := 100 * time.Millisecond - // write a lot of data to the buffer + // A consumer slower than the flush timeout makes even a single flush overrun + // the deadline, so Close() is guaranteed to give up with a context deadline + // error instead of draining the buffer. This is deterministic; the previous + // version raced a concurrent writer to keep the buffer non-empty, which could + // intermittently let the buffer drain and made Close() return nil (flaky). + buffer, _ := NewOutputBufferWithFlushTimeout(func(s string) { + time.Sleep(3 * flushTimeout) + }, flushTimeout) + + // Pre-fill so the buffer is non-empty when Close() starts flushing. for i := 0; i < 1000; i++ { buffer.Append(input) } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // on a separate goroutine, we continuosly write - // to make sure the buffer is never empty - go func() { - for { - select { - case <-ctx.Done(): - return - default: - buffer.Append(input) - } - } - }() - - // here, we try to close, which will not work - // since we will attempt to flush while the buffer is being continuosly written. + // Close() flushes one chunk - which blocks in the slow consumer past the + // deadline - and then returns the deadline error rather than draining the rest. err := buffer.Close() assert.ErrorContains(t, err, "context deadline exceeded") - cancel() } diff --git a/pkg/shell/shell_test.go b/pkg/shell/shell_test.go index fe4631f..c901b73 100644 --- a/pkg/shell/shell_test.go +++ b/pkg/shell/shell_test.go @@ -101,7 +101,11 @@ func Test__Shell__HandlingBashProcessKill(t *testing.T) { } ` } else { - cmd = "echo Hello && exit 1" + // The `sleep` gives the agent time to read and flush "Hello" from the PTY + // before `exit 1` abruptly closes the shell. Without the gap, the output + // read races the shell-closed signal and "Hello" is dropped intermittently, + // which is what made this test flaky. + cmd = "echo Hello && sleep 1 && exit 1" } p1 := shell.NewProcessWithOutput(cmd, func(line string) { @@ -109,7 +113,7 @@ func Test__Shell__HandlingBashProcessKill(t *testing.T) { }) p1.Run() - assert.Equal(t, output.String(), "Hello\n") + assert.Equal(t, "Hello\n", output.String()) } func Test__Shell__HandlingBashProcessKillThatHasBackgroundJobs(t *testing.T) { diff --git a/test/hub_reference/Gemfile.lock b/test/hub_reference/Gemfile.lock index 3348f4b..d23d926 100644 --- a/test/hub_reference/Gemfile.lock +++ b/test/hub_reference/Gemfile.lock @@ -7,12 +7,12 @@ GEM logger (1.7.0) mustermann (3.0.4) ruby2_keywords (~> 0.0.1) - rack (3.2.4) + rack (3.2.6) rack-protection (4.2.1) base64 (>= 0.1.0) logger (>= 1.6.0) rack (>= 3.0.0, < 4) - rack-session (2.1.1) + rack-session (2.1.2) base64 (>= 0.1.0) rack (>= 3.0.0) ruby2_keywords (0.0.5)