From ccc1abed1d4a3d1ca879c466f9129b56568c6c27 Mon Sep 17 00:00:00 2001 From: Dejan K Date: Wed, 13 May 2026 14:38:42 +0200 Subject: [PATCH 1/4] feat: improve log archival, write timeouts, and log error handling --- main.go | 2 +- pkg/eventlogger/filebackend.go | 6 +- pkg/eventlogger/filebackend_test.go | 26 ++++ pkg/jobs/job.go | 176 ++++++++++++++++++++++++---- pkg/jobs/job_test.go | 44 +++++++ pkg/server/server.go | 38 ++++-- pkg/server/server_test.go | 98 ++++++++++++++++ 7 files changed, 355 insertions(+), 35 deletions(-) diff --git a/main.go b/main.go index 4766b87c..014ee141 100644 --- a/main.go +++ b/main.go @@ -435,7 +435,7 @@ func RunSingleJob(httpClient *http.Client) { panic(err) } - job.JobLogArchived = true + job.SetLogArchivalStatus(jobs.JobLogArchivalStatusCompleted) job.Run() } diff --git a/pkg/eventlogger/filebackend.go b/pkg/eventlogger/filebackend.go index 23b69ab1..858373cc 100644 --- a/pkg/eventlogger/filebackend.go +++ b/pkg/eventlogger/filebackend.go @@ -140,7 +140,11 @@ func (l *FileBackend) Read(startingLineNumber, maxLines int, writer io.Writer) ( // Otherwise, we advance to the next line and stream the current line. lineNumber++ - fmt.Fprint(writer, line) + _, err = fmt.Fprint(writer, line) + if err != nil { + _ = fd.Close() + return lineNumber, err + } linesStreamed++ // if we have streamed the number of lines we want, we stop. diff --git a/pkg/eventlogger/filebackend_test.go b/pkg/eventlogger/filebackend_test.go index 917529d1..6ddcbe37 100644 --- a/pkg/eventlogger/filebackend_test.go +++ b/pkg/eventlogger/filebackend_test.go @@ -2,6 +2,7 @@ package eventlogger import ( "bytes" + "errors" "fmt" "io/ioutil" "os" @@ -50,6 +51,23 @@ func Test__LogsArePushedToFile(t *testing.T) { assert.Nil(t, err) } +func Test__ReadReturnsErrorFromWriter(t *testing.T) { + tmpFileName := filepath.Join(os.TempDir(), fmt.Sprintf("logs_%d.json", time.Now().UnixNano())) + fileBackend, err := NewFileBackend(tmpFileName, DefaultMaxSizeInBytes) + assert.Nil(t, err) + assert.Nil(t, fileBackend.Open()) + + timestamp := int(time.Now().Unix()) + assert.Nil(t, fileBackend.Write(&JobStartedEvent{Timestamp: timestamp, Event: "job_started"})) + + writerErr := errors.New("write failed") + _, err = fileBackend.Read(0, 1000, errorWriter{err: writerErr}) + assert.Equal(t, writerErr, err) + + err = fileBackend.Close() + assert.Nil(t, err) +} + func Test__ReadDoesNotIncludeDoubleNewlines(t *testing.T) { tmpFileName := filepath.Join(os.TempDir(), fmt.Sprintf("logs_%d.json", time.Now().UnixNano())) fileBackend, err := NewFileBackend(tmpFileName, DefaultMaxSizeInBytes) @@ -131,3 +149,11 @@ func Test__CloseWithOptions(t *testing.T) { assert.False(t, logsWereTrimmed) }) } + +type errorWriter struct { + err error +} + +func (w errorWriter) Write(p []byte) (n int, err error) { + return 0, w.err +} diff --git a/pkg/jobs/job.go b/pkg/jobs/job.go index 1561bb01..95879776 100644 --- a/pkg/jobs/job.go +++ b/pkg/jobs/job.go @@ -10,6 +10,7 @@ import ( "runtime" "strconv" "strings" + "sync" "time" api "github.com/semaphoreci/agent/pkg/api" @@ -34,6 +35,18 @@ const JobStopped = "stopped" const MinSizeForCompression = 1024 * 1024 const DefaultSizeForCompression = 1024 * 1024 * 100 const MaxSizeForCompression = 1024 * 1024 * 1024 +const DefaultLogArchivalPendingWaitTimeout = 300 * time.Second +const DefaultLogArchivalFailedWaitTimeout = 15 * time.Second +const MaxLogArchivalWaitTimeout = 3600 * time.Second + +type JobLogArchivalStatus string + +const ( + JobLogArchivalStatusPending JobLogArchivalStatus = "pending" + JobLogArchivalStatusCompleted JobLogArchivalStatus = "completed" + JobLogArchivalStatusFailed JobLogArchivalStatus = "failed" + JobLogArchivalStatusTimedOut JobLogArchivalStatus = "timed_out" +) type Job struct { Client *http.Client @@ -42,11 +55,12 @@ type Job struct { Executor executors.Executor - JobLogArchived bool - Stopped bool - Finished bool - UploadJobLogs string - UserAgent string + logArchivalStatus JobLogArchivalStatus + logArchivalStatusLock sync.RWMutex + Stopped bool + Finished bool + UploadJobLogs string + UserAgent string } type JobOptions struct { @@ -92,12 +106,12 @@ func NewJobWithOptions(options *JobOptions) (*Job, error) { } job := &Job{ - Client: options.Client, - Request: options.Request, - JobLogArchived: false, - Stopped: false, - UploadJobLogs: options.UploadJobLogs, - UserAgent: options.UserAgent, + Client: options.Client, + Request: options.Request, + logArchivalStatus: JobLogArchivalStatusPending, + Stopped: false, + UploadJobLogs: options.UploadJobLogs, + UserAgent: options.UserAgent, } if options.Logger != nil { @@ -173,6 +187,118 @@ type RunOptions struct { CallbackRetryAttempts int } +func (job *Job) SetLogArchivalStatus(status JobLogArchivalStatus) { + job.logArchivalStatusLock.Lock() + defer job.logArchivalStatusLock.Unlock() + + job.logArchivalStatus = status +} + +func (job *Job) GetLogArchivalStatus() JobLogArchivalStatus { + job.logArchivalStatusLock.RLock() + defer job.logArchivalStatusLock.RUnlock() + + if job.logArchivalStatus == "" { + return JobLogArchivalStatusPending + } + + return job.logArchivalStatus +} + +func (job *Job) waitForLogArchival(pendingTimeout, failedTimeout, pollInterval time.Duration) JobLogArchivalStatus { + pendingDeadline := time.Now().Add(pendingTimeout) + failedDeadline := time.Time{} + + for { + status := job.GetLogArchivalStatus() + if status == JobLogArchivalStatusCompleted || status == JobLogArchivalStatusTimedOut { + return status + } + + if status == JobLogArchivalStatusFailed { + if failedDeadline.IsZero() { + failedDeadline = time.Now().Add(failedTimeout) + } + + if time.Now().After(failedDeadline) { + return JobLogArchivalStatusFailed + } + } + + if status == JobLogArchivalStatusPending && time.Now().After(pendingDeadline) { + job.SetLogArchivalStatus(JobLogArchivalStatusTimedOut) + return JobLogArchivalStatusTimedOut + } + + time.Sleep(pollInterval) + } +} + +func (job *Job) logArchivalPendingWaitTimeout() time.Duration { + fromEnv := os.Getenv("SEMAPHORE_AGENT_LOG_ARCHIVAL_PENDING_WAIT_TIMEOUT") + if fromEnv == "" { + // Backward-compatible fallback + fromEnv = os.Getenv("SEMAPHORE_AGENT_LOG_ARCHIVAL_WAIT_TIMEOUT") + } + + if fromEnv == "" { + return DefaultLogArchivalPendingWaitTimeout + } + + n, err := strconv.ParseInt(fromEnv, 10, 64) + if err != nil { + log.Errorf( + "Error parsing SEMAPHORE_AGENT_LOG_ARCHIVAL_PENDING_WAIT_TIMEOUT: %v - using default of %s", + err, + DefaultLogArchivalPendingWaitTimeout, + ) + return DefaultLogArchivalPendingWaitTimeout + } + + timeout := time.Duration(n) * time.Second + if timeout <= 0 || timeout > MaxLogArchivalWaitTimeout { + log.Errorf( + "Invalid SEMAPHORE_AGENT_LOG_ARCHIVAL_PENDING_WAIT_TIMEOUT %s, must be in range 1s-%s, using default %s", + timeout, + MaxLogArchivalWaitTimeout, + DefaultLogArchivalPendingWaitTimeout, + ) + return DefaultLogArchivalPendingWaitTimeout + } + + return timeout +} + +func (job *Job) logArchivalFailedWaitTimeout() time.Duration { + fromEnv := os.Getenv("SEMAPHORE_AGENT_LOG_ARCHIVAL_FAILED_WAIT_TIMEOUT") + if fromEnv == "" { + return DefaultLogArchivalFailedWaitTimeout + } + + n, err := strconv.ParseInt(fromEnv, 10, 64) + if err != nil { + log.Errorf( + "Error parsing SEMAPHORE_AGENT_LOG_ARCHIVAL_FAILED_WAIT_TIMEOUT: %v - using default of %s", + err, + DefaultLogArchivalFailedWaitTimeout, + ) + return DefaultLogArchivalFailedWaitTimeout + } + + timeout := time.Duration(n) * time.Second + if timeout <= 0 || timeout > MaxLogArchivalWaitTimeout { + log.Errorf( + "Invalid SEMAPHORE_AGENT_LOG_ARCHIVAL_FAILED_WAIT_TIMEOUT %s, must be in range 1s-%s, using default %s", + timeout, + MaxLogArchivalWaitTimeout, + DefaultLogArchivalFailedWaitTimeout, + ) + return DefaultLogArchivalFailedWaitTimeout + } + + return timeout +} + func (o *RunOptions) GetPreJobHookWarning() string { if o.PreJobHookPath == "" { return "" @@ -490,9 +616,9 @@ func (job *Job) Teardown(result string, epiloguesExecuted bool, callbackRetryAtt /* * For hosted jobs, we use callbacks: - * 1. Send finished callback and log job_finished event - * 2. Wait for archivator to collect all the logs - * 3. Send teardown_finished callback and close the logger + * 1. Send finished callback and log job_finished event. + * 2. Wait for archivator to reach a terminal archival state (completed/failed/timed_out). + * 3. Send teardown_finished callback and close the logger. */ func (job *Job) teardownWithCallbacks(result string, callbackRetryAttempts int) error { err := job.SendFinishedCallback(result, callbackRetryAttempts) @@ -504,16 +630,22 @@ func (job *Job) teardownWithCallbacks(result string, callbackRetryAttempts int) job.Logger.LogJobFinished(result) log.Debug("Waiting for archivator") - for { - if job.JobLogArchived { - break - } else { - time.Sleep(1000 * time.Millisecond) - } + archivalStatus := job.waitForLogArchival( + job.logArchivalPendingWaitTimeout(), + job.logArchivalFailedWaitTimeout(), + time.Second, + ) + switch archivalStatus { + case JobLogArchivalStatusCompleted: + log.Debug("Archivator finished") + case JobLogArchivalStatusFailed: + log.Warn("Archivator log streaming failed - continuing teardown") + case JobLogArchivalStatusTimedOut: + log.Warn("Timed out waiting for archivator log fetch - continuing teardown") + default: + log.Warnf("Unexpected log archival status '%s' - continuing teardown", archivalStatus) } - log.Debug("Archivator finished") - // The job already finished, but executor is still open. // We use the open executor to upload the job logs as // an artifact, in case it is above the acceptable limit. diff --git a/pkg/jobs/job_test.go b/pkg/jobs/job_test.go index 6bcad3cc..26abdbae 100644 --- a/pkg/jobs/job_test.go +++ b/pkg/jobs/job_test.go @@ -1381,3 +1381,47 @@ func Test__UsePreJobHookAndFailOnError(t *testing.T) { os.Remove(hook) } + +func Test__LogArchivalStatusDefaultsToPending(t *testing.T) { + job := &Job{} + assert.Equal(t, JobLogArchivalStatusPending, job.GetLogArchivalStatus()) +} + +func Test__WaitForLogArchivalStatus(t *testing.T) { + t.Run("returns completed immediately", func(t *testing.T) { + job := &Job{} + job.SetLogArchivalStatus(JobLogArchivalStatusCompleted) + + status := job.waitForLogArchival(100*time.Millisecond, 50*time.Millisecond, 10*time.Millisecond) + assert.Equal(t, JobLogArchivalStatusCompleted, status) + }) + + t.Run("returns completed when retry succeeds after a failed fetch", func(t *testing.T) { + job := &Job{} + job.SetLogArchivalStatus(JobLogArchivalStatusFailed) + + go func() { + time.Sleep(20 * time.Millisecond) + job.SetLogArchivalStatus(JobLogArchivalStatusCompleted) + }() + + status := job.waitForLogArchival(500*time.Millisecond, 100*time.Millisecond, 10*time.Millisecond) + assert.Equal(t, JobLogArchivalStatusCompleted, status) + }) + + t.Run("returns timed_out instead of waiting forever", func(t *testing.T) { + job := &Job{} + + status := job.waitForLogArchival(50*time.Millisecond, 50*time.Millisecond, 10*time.Millisecond) + assert.Equal(t, JobLogArchivalStatusTimedOut, status) + assert.Equal(t, JobLogArchivalStatusTimedOut, job.GetLogArchivalStatus()) + }) + + t.Run("returns failed after failed-state grace timeout", func(t *testing.T) { + job := &Job{} + job.SetLogArchivalStatus(JobLogArchivalStatusFailed) + + status := job.waitForLogArchival(500*time.Millisecond, 50*time.Millisecond, 10*time.Millisecond) + assert.Equal(t, JobLogArchivalStatusFailed, status) + }) +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 55b6d318..f8991116 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -25,6 +25,13 @@ import ( const DefaultCallbackRetryAttempts = 300 +var ( + defaultReadHeaderTimeout = 5 * time.Second + defaultWriteTimeout = 30 * time.Second + defaultReadTimeout = 10 * time.Second + defaultIdleTimeout = 30 * time.Second +) + type Server struct { Logfile io.Writer ActiveJob *jobs.Job @@ -101,15 +108,7 @@ func (s *Server) Serve() { log.Infof("Agent %s listening on https://%s\n", s.Config.Version, address) loggedRouter := handlers.LoggingHandler(s.Logfile, s.router) - - server := &http.Server{ - Addr: address, - ReadHeaderTimeout: 5 * time.Second, - WriteTimeout: 5 * time.Second, - ReadTimeout: 10 * time.Second, - IdleTimeout: 30 * time.Second, - Handler: loggedRouter, - } + server := newHTTPServer(address, loggedRouter) err := server.ListenAndServeTLS(s.Config.TLSCertPath, s.Config.TLSKeyPath) if err != nil { @@ -117,6 +116,17 @@ func (s *Server) Serve() { } } +func newHTTPServer(address string, handler http.Handler) *http.Server { + return &http.Server{ + Addr: address, + ReadHeaderTimeout: defaultReadHeaderTimeout, + WriteTimeout: defaultWriteTimeout, + ReadTimeout: defaultReadTimeout, + IdleTimeout: defaultIdleTimeout, + Handler: handler, + } +} + func (s *Server) Status(w http.ResponseWriter, r *http.Request) { w.WriteHeader(200) m := make(map[string]interface{}) @@ -168,16 +178,22 @@ func (s *Server) JobLogs(w http.ResponseWriter, r *http.Request) { startFromLine = 0 } + isArchivator := r.Header.Get("X-Client-Name") == "archivator" + _, err = s.ActiveJob.Logger.Backend.Read(startFromLine, math.MaxInt32, w) if err != nil { log.Errorf("Error while streaming logs: %v", err) + if isArchivator { + s.ActiveJob.SetLogArchivalStatus(jobs.JobLogArchivalStatusFailed) + } http.Error(w, err.Error(), 500) fmt.Fprintf(w, `{"message": "%s"}`, err) + return } - if r.Header.Get("X-Client-Name") == "archivator" { - s.ActiveJob.JobLogArchived = true + if isArchivator { + s.ActiveJob.SetLogArchivalStatus(jobs.JobLogArchivalStatusCompleted) } } diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index 444d4246..83837e48 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -3,7 +3,9 @@ package server import ( "bytes" "encoding/json" + "errors" "fmt" + "io" "net/http" "net/http/httptest" "sync" @@ -12,6 +14,8 @@ import ( "github.com/golang-jwt/jwt/v4" api "github.com/semaphoreci/agent/pkg/api" + eventlogger "github.com/semaphoreci/agent/pkg/eventlogger" + jobs "github.com/semaphoreci/agent/pkg/jobs" "github.com/stretchr/testify/assert" ) @@ -186,6 +190,66 @@ func Test__RunJobAcceptsSameJobAgain(t *testing.T) { assert.Equal(t, totalReq-1, countBodies(bodies, "job is already running")) } +func Test__HTTPServerTimeouts(t *testing.T) { + server := newHTTPServer("0.0.0.0:1234", http.NewServeMux()) + + assert.Equal(t, defaultReadHeaderTimeout, server.ReadHeaderTimeout) + assert.Equal(t, defaultWriteTimeout, server.WriteTimeout) + assert.Equal(t, defaultReadTimeout, server.ReadTimeout) + assert.Equal(t, defaultIdleTimeout, server.IdleTimeout) +} + +func Test__JobLogsArchivatorStatusIsSetOnlyOnSuccessfulStream(t *testing.T) { + dummyKey := "dummykey" + testServer := NewServer(ServerConfig{ + HTTPClient: http.DefaultClient, + JWTSecret: []byte(dummyKey), + }) + + token, err := generateToken(dummyKey) + if !assert.NoError(t, err) { + return + } + + t.Run("stream succeeds -> archived flag is true", func(t *testing.T) { + testServer.ActiveJob = &jobs.Job{ + Request: &api.JobRequest{JobID: "job-success"}, + Logger: &eventlogger.Logger{ + Backend: staticReadBackend{line: "{\"event\":\"job_started\"}\n"}, + }, + } + + req, _ := http.NewRequest("GET", "/jobs/job-success/log", nil) + req.Header.Add("Authorization", "Token "+token) + req.Header.Add("X-Client-Name", "archivator") + + rr := httptest.NewRecorder() + testServer.router.ServeHTTP(rr, req) + + assert.Equal(t, http.StatusOK, rr.Code) + assert.Equal(t, jobs.JobLogArchivalStatusCompleted, testServer.ActiveJob.GetLogArchivalStatus()) + }) + + t.Run("stream fails -> archival status is failed", func(t *testing.T) { + testServer.ActiveJob = &jobs.Job{ + Request: &api.JobRequest{JobID: "job-fail"}, + Logger: &eventlogger.Logger{ + Backend: staticReadBackend{err: errors.New("stream failed")}, + }, + } + + req, _ := http.NewRequest("GET", "/jobs/job-fail/log", nil) + req.Header.Add("Authorization", "Token "+token) + req.Header.Add("X-Client-Name", "archivator") + + rr := httptest.NewRecorder() + testServer.router.ServeHTTP(rr, req) + + assert.Equal(t, http.StatusInternalServerError, rr.Code) + assert.Equal(t, jobs.JobLogArchivalStatusFailed, testServer.ActiveJob.GetLogArchivalStatus()) + }) +} + func getAgentStatus(t *testing.T, testServer *Server, token string) string { req, _ := http.NewRequest("GET", "/status", nil) req.Header.Add("Authorization", "Token "+token) @@ -265,3 +329,37 @@ func generateToken(key string) (string, error) { return token.SignedString([]byte(key)) } + +type staticReadBackend struct { + line string + err error +} + +func (b staticReadBackend) Open() error { + return nil +} + +func (b staticReadBackend) Write(interface{}) error { + return nil +} + +func (b staticReadBackend) Read(startFrom, maxLines int, writer io.Writer) (int, error) { + if b.err != nil { + return startFrom, b.err + } + + _, err := io.WriteString(writer, b.line) + return startFrom + 1, err +} + +func (b staticReadBackend) Iterate(func(event []byte) error) error { + return nil +} + +func (b staticReadBackend) Close() error { + return nil +} + +func (b staticReadBackend) CloseWithOptions(eventlogger.CloseOptions) error { + return nil +} From 14ee3f4abc4c38671502fc78c3e00763831f6db1 Mon Sep 17 00:00:00 2001 From: Dejan K Date: Thu, 4 Jun 2026 15:29:26 +0200 Subject: [PATCH 2/4] feat: enhance log archival timeout handling and update tests for improved clarity --- pkg/jobs/job.go | 72 ++++++++++++++------------------------------ pkg/jobs/job_test.go | 21 ++++++++----- 2 files changed, 36 insertions(+), 57 deletions(-) diff --git a/pkg/jobs/job.go b/pkg/jobs/job.go index 95879776..9ea78b1a 100644 --- a/pkg/jobs/job.go +++ b/pkg/jobs/job.go @@ -35,8 +35,13 @@ const JobStopped = "stopped" const MinSizeForCompression = 1024 * 1024 const DefaultSizeForCompression = 1024 * 1024 * 100 const MaxSizeForCompression = 1024 * 1024 * 1024 -const DefaultLogArchivalPendingWaitTimeout = 300 * time.Second -const DefaultLogArchivalFailedWaitTimeout = 15 * time.Second +// Must comfortably exceed loghub archivator's worst-case in-process retry budget +// after a failed fetch. Loghub retries up to 10 times with attempt*1s backoff and +// a 10s curl --max-time per attempt: backoff (1+2+...+9 = 45s) plus up to 9*10s of +// curl time ≈ 135s. The window must cover that so a slow retry can finish streaming +// before the agent tears down the job. Override via +// SEMAPHORE_AGENT_LOG_ARCHIVAL_FAILED_WAIT_TIMEOUT if loghub's retry budget changes. +const DefaultLogArchivalFailedWaitTimeout = 150 * time.Second const MaxLogArchivalWaitTimeout = 3600 * time.Second type JobLogArchivalStatus string @@ -45,7 +50,6 @@ const ( JobLogArchivalStatusPending JobLogArchivalStatus = "pending" JobLogArchivalStatusCompleted JobLogArchivalStatus = "completed" JobLogArchivalStatusFailed JobLogArchivalStatus = "failed" - JobLogArchivalStatusTimedOut JobLogArchivalStatus = "timed_out" ) type Job struct { @@ -205,13 +209,22 @@ func (job *Job) GetLogArchivalStatus() JobLogArchivalStatus { return job.logArchivalStatus } -func (job *Job) waitForLogArchival(pendingTimeout, failedTimeout, pollInterval time.Duration) JobLogArchivalStatus { - pendingDeadline := time.Now().Add(pendingTimeout) +// waitForLogArchival blocks until the loghub archivator reaches a terminal state. +// +// While the archivator has never attempted a fetch (Pending), we wait +// indefinitely. For hosted jobs the job_finished event stays queued, so the +// archivator will eventually fetch once its backend recovers - bounding this +// would risk discarding the job's logs during a prolonged archivator outage. +// +// Once the archivator has engaged but a fetch failed (Failed), we know it is +// alive and retrying, so we wait only failedTimeout for a retry to succeed +// before giving up and continuing teardown. +func (job *Job) waitForLogArchival(failedTimeout, pollInterval time.Duration) JobLogArchivalStatus { failedDeadline := time.Time{} for { status := job.GetLogArchivalStatus() - if status == JobLogArchivalStatusCompleted || status == JobLogArchivalStatusTimedOut { + if status == JobLogArchivalStatusCompleted { return status } @@ -225,50 +238,10 @@ func (job *Job) waitForLogArchival(pendingTimeout, failedTimeout, pollInterval t } } - if status == JobLogArchivalStatusPending && time.Now().After(pendingDeadline) { - job.SetLogArchivalStatus(JobLogArchivalStatusTimedOut) - return JobLogArchivalStatusTimedOut - } - time.Sleep(pollInterval) } } -func (job *Job) logArchivalPendingWaitTimeout() time.Duration { - fromEnv := os.Getenv("SEMAPHORE_AGENT_LOG_ARCHIVAL_PENDING_WAIT_TIMEOUT") - if fromEnv == "" { - // Backward-compatible fallback - fromEnv = os.Getenv("SEMAPHORE_AGENT_LOG_ARCHIVAL_WAIT_TIMEOUT") - } - - if fromEnv == "" { - return DefaultLogArchivalPendingWaitTimeout - } - - n, err := strconv.ParseInt(fromEnv, 10, 64) - if err != nil { - log.Errorf( - "Error parsing SEMAPHORE_AGENT_LOG_ARCHIVAL_PENDING_WAIT_TIMEOUT: %v - using default of %s", - err, - DefaultLogArchivalPendingWaitTimeout, - ) - return DefaultLogArchivalPendingWaitTimeout - } - - timeout := time.Duration(n) * time.Second - if timeout <= 0 || timeout > MaxLogArchivalWaitTimeout { - log.Errorf( - "Invalid SEMAPHORE_AGENT_LOG_ARCHIVAL_PENDING_WAIT_TIMEOUT %s, must be in range 1s-%s, using default %s", - timeout, - MaxLogArchivalWaitTimeout, - DefaultLogArchivalPendingWaitTimeout, - ) - return DefaultLogArchivalPendingWaitTimeout - } - - return timeout -} - func (job *Job) logArchivalFailedWaitTimeout() time.Duration { fromEnv := os.Getenv("SEMAPHORE_AGENT_LOG_ARCHIVAL_FAILED_WAIT_TIMEOUT") if fromEnv == "" { @@ -617,7 +590,9 @@ func (job *Job) Teardown(result string, epiloguesExecuted bool, callbackRetryAtt /* * For hosted jobs, we use callbacks: * 1. Send finished callback and log job_finished event. - * 2. Wait for archivator to reach a terminal archival state (completed/failed/timed_out). + * 2. Wait for archivator to reach a terminal archival state (completed, or failed + * after a bounded retry window). A job the archivator never reaches stays + * pending and waits indefinitely so its logs are not lost. * 3. Send teardown_finished callback and close the logger. */ func (job *Job) teardownWithCallbacks(result string, callbackRetryAttempts int) error { @@ -631,7 +606,6 @@ func (job *Job) teardownWithCallbacks(result string, callbackRetryAttempts int) log.Debug("Waiting for archivator") archivalStatus := job.waitForLogArchival( - job.logArchivalPendingWaitTimeout(), job.logArchivalFailedWaitTimeout(), time.Second, ) @@ -640,8 +614,6 @@ func (job *Job) teardownWithCallbacks(result string, callbackRetryAttempts int) log.Debug("Archivator finished") case JobLogArchivalStatusFailed: log.Warn("Archivator log streaming failed - continuing teardown") - case JobLogArchivalStatusTimedOut: - log.Warn("Timed out waiting for archivator log fetch - continuing teardown") default: log.Warnf("Unexpected log archival status '%s' - continuing teardown", archivalStatus) } diff --git a/pkg/jobs/job_test.go b/pkg/jobs/job_test.go index 26abdbae..2138c0c4 100644 --- a/pkg/jobs/job_test.go +++ b/pkg/jobs/job_test.go @@ -1392,7 +1392,7 @@ func Test__WaitForLogArchivalStatus(t *testing.T) { job := &Job{} job.SetLogArchivalStatus(JobLogArchivalStatusCompleted) - status := job.waitForLogArchival(100*time.Millisecond, 50*time.Millisecond, 10*time.Millisecond) + status := job.waitForLogArchival(50*time.Millisecond, 10*time.Millisecond) assert.Equal(t, JobLogArchivalStatusCompleted, status) }) @@ -1405,23 +1405,30 @@ func Test__WaitForLogArchivalStatus(t *testing.T) { job.SetLogArchivalStatus(JobLogArchivalStatusCompleted) }() - status := job.waitForLogArchival(500*time.Millisecond, 100*time.Millisecond, 10*time.Millisecond) + status := job.waitForLogArchival(100*time.Millisecond, 10*time.Millisecond) assert.Equal(t, JobLogArchivalStatusCompleted, status) }) - t.Run("returns timed_out instead of waiting forever", func(t *testing.T) { + t.Run("waits indefinitely while pending and exits when completed", func(t *testing.T) { job := &Job{} - status := job.waitForLogArchival(50*time.Millisecond, 50*time.Millisecond, 10*time.Millisecond) - assert.Equal(t, JobLogArchivalStatusTimedOut, status) - assert.Equal(t, JobLogArchivalStatusTimedOut, job.GetLogArchivalStatus()) + // Completed arrives well after the failed-state grace window would have + // elapsed; a pending job must keep waiting regardless and never give up + // (so logs are not lost during a prolonged archivator outage). + go func() { + time.Sleep(80 * time.Millisecond) + job.SetLogArchivalStatus(JobLogArchivalStatusCompleted) + }() + + status := job.waitForLogArchival(20*time.Millisecond, 10*time.Millisecond) + assert.Equal(t, JobLogArchivalStatusCompleted, status) }) t.Run("returns failed after failed-state grace timeout", func(t *testing.T) { job := &Job{} job.SetLogArchivalStatus(JobLogArchivalStatusFailed) - status := job.waitForLogArchival(500*time.Millisecond, 50*time.Millisecond, 10*time.Millisecond) + status := job.waitForLogArchival(50*time.Millisecond, 10*time.Millisecond) assert.Equal(t, JobLogArchivalStatusFailed, status) }) } From 2d6ca77c1de7397e6f83ef273ba72cd03dd816c7 Mon Sep 17 00:00:00 2001 From: Dejan K Date: Thu, 4 Jun 2026 16:01:45 +0200 Subject: [PATCH 3/4] feat: update Go version to 1.26.3 and adjust directory permissions for security --- .semaphore/release.yml | 2 +- .semaphore/semaphore.yml | 14 +++++++------- main.go | 4 ++-- pkg/eventlogger/filebackend.go | 4 ++-- pkg/executors/authorized_keys.go | 3 ++- pkg/executors/docker_compose_executor.go | 7 ++++++- pkg/server/server.go | 4 ++-- pkg/shell/shell_test.go | 8 ++++++-- test/hub_reference/Gemfile.lock | 4 ++-- 9 files changed, 30 insertions(+), 20 deletions(-) diff --git a/.semaphore/release.yml b/.semaphore/release.yml index 69c9f68b..f415f28b 100644 --- a/.semaphore/release.yml +++ b/.semaphore/release.yml @@ -15,7 +15,7 @@ blocks: - name: dockerhub-write prologue: commands: - - sem-version go 1.23.8 + - sem-version go 1.26.3 - "export GOPATH=~/go" - "export PATH=/home/semaphore/go/bin:$PATH" - checkout diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index 01b93609..c7392079 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -18,7 +18,7 @@ blocks: prologue: commands: - - sem-version go 1.24.12 + - sem-version go 1.26.3 - checkout jobs: @@ -37,7 +37,7 @@ blocks: - checkout - mv ~/.ssh/security-toolbox ~/.ssh/id_rsa - sudo chmod 600 ~/.ssh/id_rsa - - sem-version go 1.24.12 + - sem-version go 1.26.3 jobs: - name: Check dependencies commands: @@ -63,7 +63,7 @@ blocks: prologue: commands: - - sem-version go 1.24.12 + - sem-version go 1.26.3 - checkout - go version - go get @@ -96,7 +96,7 @@ blocks: prologue: commands: - - sem-version go 1.24.12 + - sem-version go 1.26.3 - checkout - go version - go get @@ -163,7 +163,7 @@ blocks: prologue: commands: - - sem-version go 1.24.12 + - sem-version go 1.26.3 - checkout - go version - go get @@ -200,7 +200,7 @@ blocks: prologue: commands: - - sem-version go 1.24.12 + - sem-version go 1.26.3 - checkout - go version - go get @@ -241,7 +241,7 @@ blocks: prologue: commands: - - sem-version go 1.24.12 + - sem-version go 1.26.3 - curl -sLO https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 && install minikube-linux-amd64 /tmp/ - /tmp/minikube-linux-amd64 config set WantUpdateNotification false - /tmp/minikube-linux-amd64 start --driver=docker diff --git a/main.go b/main.go index 014ee141..105fae41 100644 --- a/main.go +++ b/main.go @@ -101,8 +101,8 @@ func getLogFilePath() string { return filepath.Join(os.TempDir(), "agent_log") } - parentDirectory := path.Dir(logFilePath) - err := os.MkdirAll(parentDirectory, 0640) + parentDirectory := filepath.Clean(path.Dir(logFilePath)) + err := os.MkdirAll(parentDirectory, 0750) if err != nil { log.Panicf("Could not create directories to place log file in '%s': %v", logFilePath, err) } diff --git a/pkg/eventlogger/filebackend.go b/pkg/eventlogger/filebackend.go index 858373cc..ef66a2c9 100644 --- a/pkg/eventlogger/filebackend.go +++ b/pkg/eventlogger/filebackend.go @@ -90,7 +90,7 @@ func (l *FileBackend) CloseWithOptions(options CloseOptions) error { } func (l *FileBackend) Iterate(fn func([]byte) error) error { - fd, err := os.OpenFile(l.path, os.O_RDONLY, os.ModePerm) + fd, err := os.OpenFile(l.path, os.O_RDONLY, 0600) if err != nil { return fmt.Errorf("error opening file '%s': %v", l.path, err) } @@ -112,7 +112,7 @@ func (l *FileBackend) Iterate(fn func([]byte) error) error { } func (l *FileBackend) Read(startingLineNumber, maxLines int, writer io.Writer) (int, error) { - fd, err := os.OpenFile(l.path, os.O_RDONLY, os.ModePerm) + fd, err := os.OpenFile(l.path, os.O_RDONLY, 0600) if err != nil { return startingLineNumber, err } diff --git a/pkg/executors/authorized_keys.go b/pkg/executors/authorized_keys.go index 22c679c2..7250c55e 100644 --- a/pkg/executors/authorized_keys.go +++ b/pkg/executors/authorized_keys.go @@ -18,7 +18,8 @@ func InjectEntriesToAuthorizedKeys(keys []api.PublicKey) error { } sshDirectory := filepath.Join(homeDir, ".ssh") - err = os.MkdirAll(sshDirectory, os.ModePerm) + // SSH requires the .ssh directory to be accessible only by its owner. + err = os.MkdirAll(sshDirectory, 0700) if err != nil { return err } diff --git a/pkg/executors/docker_compose_executor.go b/pkg/executors/docker_compose_executor.go index 32454397..a01ebbf6 100644 --- a/pkg/executors/docker_compose_executor.go +++ b/pkg/executors/docker_compose_executor.go @@ -66,7 +66,12 @@ func (e *DockerComposeExecutor) Prepare() int { return 1 } - err := os.MkdirAll(e.tmpDirectory, os.ModePerm) + // This directory is bind-mounted read-only into the job container (see the + // `-v` mount in startBashShell), whose process may run as a non-root user, so + // it must stay world-traversable for that user to read the command files. + // We drop world-write (0777 -> 0755) but keep read/execute for that reason. + // #nosec G301 + err := os.MkdirAll(e.tmpDirectory, 0755) if err != nil { return 1 } diff --git a/pkg/server/server.go b/pkg/server/server.go index f8991116..42d334e4 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -159,7 +159,7 @@ func (s *Server) JobLogs(w http.ResponseWriter, r *http.Request) { if s.ActiveJob == nil { log.Warnf("Attempt to fetch logs for '%s' before any job is received", jobID) w.WriteHeader(http.StatusNotFound) - fmt.Fprintf(w, `{"message": "job %s is not running"}`, jobID) + fmt.Fprint(w, `{"message": "job is not running"}`) return } @@ -169,7 +169,7 @@ func (s *Server) JobLogs(w http.ResponseWriter, r *http.Request) { if runningJobID != jobID { log.Warnf("Attempt to fetch logs for '%s', but job '%s' is the one running", jobID, runningJobID) w.WriteHeader(http.StatusForbidden) - fmt.Fprintf(w, `{"message": "job %s is not running"}`, jobID) + fmt.Fprint(w, `{"message": "job is not running"}`) return } diff --git a/pkg/shell/shell_test.go b/pkg/shell/shell_test.go index fe4631f9..c901b73f 100644 --- a/pkg/shell/shell_test.go +++ b/pkg/shell/shell_test.go @@ -101,7 +101,11 @@ func Test__Shell__HandlingBashProcessKill(t *testing.T) { } ` } else { - cmd = "echo Hello && exit 1" + // The `sleep` gives the agent time to read and flush "Hello" from the PTY + // before `exit 1` abruptly closes the shell. Without the gap, the output + // read races the shell-closed signal and "Hello" is dropped intermittently, + // which is what made this test flaky. + cmd = "echo Hello && sleep 1 && exit 1" } p1 := shell.NewProcessWithOutput(cmd, func(line string) { @@ -109,7 +113,7 @@ func Test__Shell__HandlingBashProcessKill(t *testing.T) { }) p1.Run() - assert.Equal(t, output.String(), "Hello\n") + assert.Equal(t, "Hello\n", output.String()) } func Test__Shell__HandlingBashProcessKillThatHasBackgroundJobs(t *testing.T) { diff --git a/test/hub_reference/Gemfile.lock b/test/hub_reference/Gemfile.lock index 3348f4b3..d23d9266 100644 --- a/test/hub_reference/Gemfile.lock +++ b/test/hub_reference/Gemfile.lock @@ -7,12 +7,12 @@ GEM logger (1.7.0) mustermann (3.0.4) ruby2_keywords (~> 0.0.1) - rack (3.2.4) + rack (3.2.6) rack-protection (4.2.1) base64 (>= 0.1.0) logger (>= 1.6.0) rack (>= 3.0.0, < 4) - rack-session (2.1.1) + rack-session (2.1.2) base64 (>= 0.1.0) rack (>= 3.0.0) ruby2_keywords (0.0.5) From 9fe4be42fbe8ee8b998dec6d7311f4b26f6c034b Mon Sep 17 00:00:00 2001 From: Dejan K Date: Thu, 4 Jun 2026 16:18:06 +0200 Subject: [PATCH 4/4] test: improve output buffer flush timeout handling for deterministic behavior --- pkg/shell/output_buffer_test.go | 35 ++++++++++++--------------------- 1 file changed, 13 insertions(+), 22 deletions(-) diff --git a/pkg/shell/output_buffer_test.go b/pkg/shell/output_buffer_test.go index 1454c2dd..5abf474e 100644 --- a/pkg/shell/output_buffer_test.go +++ b/pkg/shell/output_buffer_test.go @@ -1,7 +1,6 @@ package shell import ( - "context" "strings" "testing" "time" @@ -205,32 +204,24 @@ func Test__OutputBuffer__DoesNotWaitForeverForOutputToBeFlushed(t *testing.T) { input = append(input, 'a') } - buffer, _ := NewOutputBufferWithFlushTimeout(func(s string) {}, time.Second) + flushTimeout := 100 * time.Millisecond - // write a lot of data to the buffer + // A consumer slower than the flush timeout makes even a single flush overrun + // the deadline, so Close() is guaranteed to give up with a context deadline + // error instead of draining the buffer. This is deterministic; the previous + // version raced a concurrent writer to keep the buffer non-empty, which could + // intermittently let the buffer drain and made Close() return nil (flaky). + buffer, _ := NewOutputBufferWithFlushTimeout(func(s string) { + time.Sleep(3 * flushTimeout) + }, flushTimeout) + + // Pre-fill so the buffer is non-empty when Close() starts flushing. for i := 0; i < 1000; i++ { buffer.Append(input) } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // on a separate goroutine, we continuosly write - // to make sure the buffer is never empty - go func() { - for { - select { - case <-ctx.Done(): - return - default: - buffer.Append(input) - } - } - }() - - // here, we try to close, which will not work - // since we will attempt to flush while the buffer is being continuosly written. + // Close() flushes one chunk - which blocks in the slow consumer past the + // deadline - and then returns the deadline error rather than draining the rest. err := buffer.Close() assert.ErrorContains(t, err, "context deadline exceeded") - cancel() }