Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .semaphore/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ blocks:
- name: dockerhub-write
prologue:
commands:
- sem-version go 1.23.8
- sem-version go 1.26.3
- "export GOPATH=~/go"
- "export PATH=/home/semaphore/go/bin:$PATH"
- checkout
Expand Down
14 changes: 7 additions & 7 deletions .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ blocks:

prologue:
commands:
- sem-version go 1.24.12
- sem-version go 1.26.3
- checkout

jobs:
Expand All @@ -37,7 +37,7 @@ blocks:
- checkout
- mv ~/.ssh/security-toolbox ~/.ssh/id_rsa
- sudo chmod 600 ~/.ssh/id_rsa
- sem-version go 1.24.12
- sem-version go 1.26.3
jobs:
- name: Check dependencies
commands:
Expand All @@ -63,7 +63,7 @@ blocks:

prologue:
commands:
- sem-version go 1.24.12
- sem-version go 1.26.3
- checkout
- go version
- go get
Expand Down Expand Up @@ -96,7 +96,7 @@ blocks:

prologue:
commands:
- sem-version go 1.24.12
- sem-version go 1.26.3
- checkout
- go version
- go get
Expand Down Expand Up @@ -163,7 +163,7 @@ blocks:

prologue:
commands:
- sem-version go 1.24.12
- sem-version go 1.26.3
- checkout
- go version
- go get
Expand Down Expand Up @@ -200,7 +200,7 @@ blocks:

prologue:
commands:
- sem-version go 1.24.12
- sem-version go 1.26.3
- checkout
- go version
- go get
Expand Down Expand Up @@ -241,7 +241,7 @@ blocks:

prologue:
commands:
- sem-version go 1.24.12
- sem-version go 1.26.3
- curl -sLO https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 && install minikube-linux-amd64 /tmp/
- /tmp/minikube-linux-amd64 config set WantUpdateNotification false
- /tmp/minikube-linux-amd64 start --driver=docker
Expand Down
6 changes: 3 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ func getLogFilePath() string {
return filepath.Join(os.TempDir(), "agent_log")
}

parentDirectory := path.Dir(logFilePath)
err := os.MkdirAll(parentDirectory, 0640)
parentDirectory := filepath.Clean(path.Dir(logFilePath))
err := os.MkdirAll(parentDirectory, 0750)
if err != nil {
log.Panicf("Could not create directories to place log file in '%s': %v", logFilePath, err)
}
Expand Down Expand Up @@ -435,7 +435,7 @@ func RunSingleJob(httpClient *http.Client) {
panic(err)
}

job.JobLogArchived = true
job.SetLogArchivalStatus(jobs.JobLogArchivalStatusCompleted)

job.Run()
}
Expand Down
10 changes: 7 additions & 3 deletions pkg/eventlogger/filebackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (l *FileBackend) CloseWithOptions(options CloseOptions) error {
}

func (l *FileBackend) Iterate(fn func([]byte) error) error {
fd, err := os.OpenFile(l.path, os.O_RDONLY, os.ModePerm)
fd, err := os.OpenFile(l.path, os.O_RDONLY, 0600)
if err != nil {
return fmt.Errorf("error opening file '%s': %v", l.path, err)
}
Expand All @@ -112,7 +112,7 @@ func (l *FileBackend) Iterate(fn func([]byte) error) error {
}

func (l *FileBackend) Read(startingLineNumber, maxLines int, writer io.Writer) (int, error) {
fd, err := os.OpenFile(l.path, os.O_RDONLY, os.ModePerm)
fd, err := os.OpenFile(l.path, os.O_RDONLY, 0600)
if err != nil {
return startingLineNumber, err
}
Expand Down Expand Up @@ -140,7 +140,11 @@ func (l *FileBackend) Read(startingLineNumber, maxLines int, writer io.Writer) (

// Otherwise, we advance to the next line and stream the current line.
lineNumber++
fmt.Fprint(writer, line)
_, err = fmt.Fprint(writer, line)
if err != nil {
_ = fd.Close()
return lineNumber, err
}
linesStreamed++

// if we have streamed the number of lines we want, we stop.
Expand Down
26 changes: 26 additions & 0 deletions pkg/eventlogger/filebackend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package eventlogger

import (
"bytes"
"errors"
"fmt"
"io/ioutil"
"os"
Expand Down Expand Up @@ -50,6 +51,23 @@ func Test__LogsArePushedToFile(t *testing.T) {
assert.Nil(t, err)
}

func Test__ReadReturnsErrorFromWriter(t *testing.T) {
tmpFileName := filepath.Join(os.TempDir(), fmt.Sprintf("logs_%d.json", time.Now().UnixNano()))
fileBackend, err := NewFileBackend(tmpFileName, DefaultMaxSizeInBytes)
assert.Nil(t, err)
assert.Nil(t, fileBackend.Open())

timestamp := int(time.Now().Unix())
assert.Nil(t, fileBackend.Write(&JobStartedEvent{Timestamp: timestamp, Event: "job_started"}))

writerErr := errors.New("write failed")
_, err = fileBackend.Read(0, 1000, errorWriter{err: writerErr})
assert.Equal(t, writerErr, err)

err = fileBackend.Close()
assert.Nil(t, err)
}

func Test__ReadDoesNotIncludeDoubleNewlines(t *testing.T) {
tmpFileName := filepath.Join(os.TempDir(), fmt.Sprintf("logs_%d.json", time.Now().UnixNano()))
fileBackend, err := NewFileBackend(tmpFileName, DefaultMaxSizeInBytes)
Expand Down Expand Up @@ -131,3 +149,11 @@ func Test__CloseWithOptions(t *testing.T) {
assert.False(t, logsWereTrimmed)
})
}

type errorWriter struct {
err error
}

func (w errorWriter) Write(p []byte) (n int, err error) {
return 0, w.err
}
3 changes: 2 additions & 1 deletion pkg/executors/authorized_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ func InjectEntriesToAuthorizedKeys(keys []api.PublicKey) error {
}

sshDirectory := filepath.Join(homeDir, ".ssh")
err = os.MkdirAll(sshDirectory, os.ModePerm)
// SSH requires the .ssh directory to be accessible only by its owner.
err = os.MkdirAll(sshDirectory, 0700)
if err != nil {
return err
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/executors/docker_compose_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ func (e *DockerComposeExecutor) Prepare() int {
return 1
}

err := os.MkdirAll(e.tmpDirectory, os.ModePerm)
// This directory is bind-mounted read-only into the job container (see the
// `-v` mount in startBashShell), whose process may run as a non-root user, so
// it must stay world-traversable for that user to read the command files.
// We drop world-write (0777 -> 0755) but keep read/execute for that reason.
// #nosec G301
err := os.MkdirAll(e.tmpDirectory, 0755)
if err != nil {
return 1
}
Expand Down
148 changes: 126 additions & 22 deletions pkg/jobs/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"time"

api "github.com/semaphoreci/agent/pkg/api"
Expand All @@ -34,6 +35,22 @@ const JobStopped = "stopped"
const MinSizeForCompression = 1024 * 1024
const DefaultSizeForCompression = 1024 * 1024 * 100
const MaxSizeForCompression = 1024 * 1024 * 1024
// Must comfortably exceed loghub archivator's worst-case in-process retry budget
// after a failed fetch. Loghub retries up to 10 times with attempt*1s backoff and
// a 10s curl --max-time per attempt: backoff (1+2+...+9 = 45s) plus up to 9*10s of
// curl time ≈ 135s. The window must cover that so a slow retry can finish streaming
// before the agent tears down the job. Override via
// SEMAPHORE_AGENT_LOG_ARCHIVAL_FAILED_WAIT_TIMEOUT if loghub's retry budget changes.
const DefaultLogArchivalFailedWaitTimeout = 150 * time.Second
const MaxLogArchivalWaitTimeout = 3600 * time.Second

type JobLogArchivalStatus string

const (
JobLogArchivalStatusPending JobLogArchivalStatus = "pending"
JobLogArchivalStatusCompleted JobLogArchivalStatus = "completed"
JobLogArchivalStatusFailed JobLogArchivalStatus = "failed"
)

type Job struct {
Client *http.Client
Expand All @@ -42,11 +59,12 @@ type Job struct {

Executor executors.Executor

JobLogArchived bool
Stopped bool
Finished bool
UploadJobLogs string
UserAgent string
logArchivalStatus JobLogArchivalStatus
logArchivalStatusLock sync.RWMutex
Stopped bool
Finished bool
UploadJobLogs string
UserAgent string
}

type JobOptions struct {
Expand Down Expand Up @@ -92,12 +110,12 @@ func NewJobWithOptions(options *JobOptions) (*Job, error) {
}

job := &Job{
Client: options.Client,
Request: options.Request,
JobLogArchived: false,
Stopped: false,
UploadJobLogs: options.UploadJobLogs,
UserAgent: options.UserAgent,
Client: options.Client,
Request: options.Request,
logArchivalStatus: JobLogArchivalStatusPending,
Stopped: false,
UploadJobLogs: options.UploadJobLogs,
UserAgent: options.UserAgent,
}

if options.Logger != nil {
Expand Down Expand Up @@ -173,6 +191,87 @@ type RunOptions struct {
CallbackRetryAttempts int
}

func (job *Job) SetLogArchivalStatus(status JobLogArchivalStatus) {
job.logArchivalStatusLock.Lock()
defer job.logArchivalStatusLock.Unlock()

job.logArchivalStatus = status
}

func (job *Job) GetLogArchivalStatus() JobLogArchivalStatus {
job.logArchivalStatusLock.RLock()
defer job.logArchivalStatusLock.RUnlock()

if job.logArchivalStatus == "" {
return JobLogArchivalStatusPending
}

return job.logArchivalStatus
}

// waitForLogArchival blocks until the loghub archivator reaches a terminal state.
//
// While the archivator has never attempted a fetch (Pending), we wait
// indefinitely. For hosted jobs the job_finished event stays queued, so the
// archivator will eventually fetch once its backend recovers - bounding this
// would risk discarding the job's logs during a prolonged archivator outage.
//
// Once the archivator has engaged but a fetch failed (Failed), we know it is
// alive and retrying, so we wait only failedTimeout for a retry to succeed
// before giving up and continuing teardown.
func (job *Job) waitForLogArchival(failedTimeout, pollInterval time.Duration) JobLogArchivalStatus {
failedDeadline := time.Time{}

for {
status := job.GetLogArchivalStatus()
if status == JobLogArchivalStatusCompleted {
return status
}

if status == JobLogArchivalStatusFailed {
if failedDeadline.IsZero() {
failedDeadline = time.Now().Add(failedTimeout)
}

if time.Now().After(failedDeadline) {
return JobLogArchivalStatusFailed
}
}

time.Sleep(pollInterval)
}
}

func (job *Job) logArchivalFailedWaitTimeout() time.Duration {
fromEnv := os.Getenv("SEMAPHORE_AGENT_LOG_ARCHIVAL_FAILED_WAIT_TIMEOUT")
if fromEnv == "" {
return DefaultLogArchivalFailedWaitTimeout
}

n, err := strconv.ParseInt(fromEnv, 10, 64)
if err != nil {
log.Errorf(
"Error parsing SEMAPHORE_AGENT_LOG_ARCHIVAL_FAILED_WAIT_TIMEOUT: %v - using default of %s",
err,
DefaultLogArchivalFailedWaitTimeout,
)
return DefaultLogArchivalFailedWaitTimeout
}

timeout := time.Duration(n) * time.Second
if timeout <= 0 || timeout > MaxLogArchivalWaitTimeout {
log.Errorf(
"Invalid SEMAPHORE_AGENT_LOG_ARCHIVAL_FAILED_WAIT_TIMEOUT %s, must be in range 1s-%s, using default %s",
timeout,
MaxLogArchivalWaitTimeout,
DefaultLogArchivalFailedWaitTimeout,
)
return DefaultLogArchivalFailedWaitTimeout
}

return timeout
}

func (o *RunOptions) GetPreJobHookWarning() string {
if o.PreJobHookPath == "" {
return ""
Expand Down Expand Up @@ -490,9 +589,11 @@ func (job *Job) Teardown(result string, epiloguesExecuted bool, callbackRetryAtt

/*
* For hosted jobs, we use callbacks:
* 1. Send finished callback and log job_finished event
* 2. Wait for archivator to collect all the logs
* 3. Send teardown_finished callback and close the logger
* 1. Send finished callback and log job_finished event.
* 2. Wait for archivator to reach a terminal archival state (completed, or failed
* after a bounded retry window). A job the archivator never reaches stays
* pending and waits indefinitely so its logs are not lost.
* 3. Send teardown_finished callback and close the logger.
*/
func (job *Job) teardownWithCallbacks(result string, callbackRetryAttempts int) error {
err := job.SendFinishedCallback(result, callbackRetryAttempts)
Expand All @@ -504,16 +605,19 @@ func (job *Job) teardownWithCallbacks(result string, callbackRetryAttempts int)
job.Logger.LogJobFinished(result)
log.Debug("Waiting for archivator")

for {
if job.JobLogArchived {
break
} else {
time.Sleep(1000 * time.Millisecond)
}
archivalStatus := job.waitForLogArchival(
job.logArchivalFailedWaitTimeout(),
time.Second,
)
switch archivalStatus {
case JobLogArchivalStatusCompleted:
log.Debug("Archivator finished")
case JobLogArchivalStatusFailed:
log.Warn("Archivator log streaming failed - continuing teardown")
default:
log.Warnf("Unexpected log archival status '%s' - continuing teardown", archivalStatus)
}

log.Debug("Archivator finished")

// The job already finished, but executor is still open.
// We use the open executor to upload the job logs as
// an artifact, in case it is above the acceptable limit.
Expand Down
Loading
Loading