diff --git a/tools/prow-job-executor/go.mod b/tools/prow-job-executor/go.mod index 4158c71..7dfdd3e 100644 --- a/tools/prow-job-executor/go.mod +++ b/tools/prow-job-executor/go.mod @@ -9,6 +9,7 @@ require ( github.com/go-logr/logr v1.4.3 github.com/google/uuid v1.6.0 github.com/spf13/cobra v1.10.2 + google.golang.org/protobuf v1.36.11 k8s.io/apimachinery v0.35.3 sigs.k8s.io/prow v0.0.0-20251030184004-0e4d5be4200d sigs.k8s.io/yaml v1.6.0 @@ -170,7 +171,6 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect google.golang.org/grpc v1.78.0 // indirect - google.golang.org/protobuf v1.36.11 // indirect gopkg.in/evanphx/json-patch.v4 v4.13.0 // indirect gopkg.in/fsnotify.v1 v1.4.7 // indirect gopkg.in/inf.v0 v0.9.1 // indirect diff --git a/tools/prow-job-executor/options.go b/tools/prow-job-executor/options.go index 4a6012f..9d7fd31 100644 --- a/tools/prow-job-executor/options.go +++ b/tools/prow-job-executor/options.go @@ -65,6 +65,7 @@ func DefaultExecuteOptions() *RawExecuteOptions { BaseRef: defaultBaseRef, Org: defaultOrg, Repo: defaultRepo, + AbortOnCancel: true, } } @@ -83,6 +84,7 @@ func (o *RawExecuteOptions) BindFlags(cmd *cobra.Command) error { cmd.Flags().StringVar(&o.ProwURL, "prow-url", o.ProwURL, "Prow API URL for job status monitoring") cmd.Flags().BoolVar(&o.DryRun, "dry-run", o.DryRun, "Print which job would be started, but do not start one.") cmd.Flags().BoolVar(&o.GatePromotion, "gate-promotion", o.GatePromotion, "Exit with an error code if the job fails.") + cmd.Flags().BoolVar(&o.AbortOnCancel, "abort-on-cancel", o.AbortOnCancel, "Abort the running Prow job if the executor is cancelled (e.g. the rollout is cancelled and the process receives SIGTERM).") cmd.Flags().StringVar(&o.BaseSha, "base-sha", o.BaseSha, "Git commit SHA to test against. When set, the job is triggered as a postsubmit with this specific commit instead of HEAD.") cmd.Flags().StringVar(&o.BaseRef, "base-ref", o.BaseRef, "Git base ref (branch) for the postsubmit job (requires --base-sha)") cmd.Flags().StringVar(&o.Org, "org", o.Org, "GitHub org for the postsubmit job (requires --base-sha)") @@ -121,6 +123,7 @@ type RawExecuteOptions struct { ProwURL string DryRun bool GatePromotion bool + AbortOnCancel bool // Git ref options for postsubmit execution pinned to a specific commit. // When BaseSha is set, the job is triggered as a postsubmit instead of a periodic. @@ -160,6 +163,7 @@ type completedExecuteOptions struct { ProwURL string DryRun bool GatePromotion bool + AbortOnCancel bool // Git ref options for postsubmit execution BaseSha string @@ -291,6 +295,7 @@ func (o *ValidatedExecuteOptions) Complete(ctx context.Context) (*ExecuteOptions ProwURL: o.ProwURL, DryRun: o.DryRun, GatePromotion: o.GatePromotion, + AbortOnCancel: o.AbortOnCancel, BaseSha: o.BaseSha, BaseRef: o.BaseRef, Org: o.Org, @@ -309,7 +314,7 @@ func (o *ExecuteOptions) Execute(ctx context.Context) error { client := prowjob.NewClient(o.ProwToken, o.GangwayURL, o.ProwURL) // Create job monitor - monitor := prowjob.NewMonitor(client, o.PollInterval, o.Timeout, o.DryRun, o.GatePromotion) + monitor := prowjob.NewMonitor(client, o.PollInterval, o.Timeout, o.DryRun, o.GatePromotion, o.AbortOnCancel) // Prepare environment variables, including the region envs := make(map[string]string) @@ -509,7 +514,7 @@ func (o *ValidatedMonitorOptions) Complete(ctx context.Context) (*MonitorOptions func (o *MonitorOptions) Monitor(ctx context.Context, logger logr.Logger) error { // Create Prow client and monitor client := prowjob.NewClient(o.ProwToken, o.GangwayURL, o.ProwURL) - monitor := prowjob.NewMonitor(client, o.PollInterval, o.Timeout, false, false) + monitor := prowjob.NewMonitor(client, o.PollInterval, o.Timeout, false, false, false) // Monitor existing job using shared polling logic logger.Info("Starting to monitor existing job", "jobExecutionID", o.JobExecutionID) diff --git a/tools/prow-job-executor/prowjob/abort_test.go b/tools/prow-job-executor/prowjob/abort_test.go new file mode 100644 index 0000000..40c9e27 --- /dev/null +++ b/tools/prow-job-executor/prowjob/abort_test.go @@ -0,0 +1,455 @@ +// Copyright 2025 Microsoft Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package prowjob + +import ( + "context" + "io" + "net/http" + "net/http/httptest" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/go-logr/logr" + "google.golang.org/protobuf/encoding/protojson" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + prowjobs "sigs.k8s.io/prow/pkg/apis/prowjobs/v1" + prowgangway "sigs.k8s.io/prow/pkg/gangway" + "sigs.k8s.io/yaml" +) + +const testJobName = "branch-ci-Azure-ARO-HCP-main-e2e-integration-e2e-parallel" + +// abortFixture is a test Prow/Gangway server. It serves ProwJob status (the Deck +// "?prowjob=" endpoint), the Gangway ListJobExecutions endpoint +// ("?job_name=<>&status=<>"), and records any bulk status-change (abort) requests. +type abortFixture struct { + srv *httptest.Server + jobs map[string]*prowjobs.ProwJob + bulkCount int32 + mu sync.Mutex + lastBulk *prowgangway.BulkJobStatusChangeRequest +} + +func newAbortFixture(t *testing.T, jobs map[string]*prowjobs.ProwJob) *abortFixture { + t.Helper() + f := &abortFixture{jobs: jobs} + f.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodPost && r.URL.Path == bulkStatusChangePath { + atomic.AddInt32(&f.bulkCount, 1) + raw, _ := io.ReadAll(r.Body) + var parsed prowgangway.BulkJobStatusChangeRequest + if err := protojson.Unmarshal(raw, &parsed); err == nil { + f.mu.Lock() + f.lastBulk = &parsed + f.mu.Unlock() + } + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("{}")) + return + } + + q := r.URL.Query() + if id := q.Get("prowjob"); id != "" { + job, ok := f.jobs[id] + if !ok { + w.WriteHeader(http.StatusNotFound) + return + } + body, err := yaml.Marshal(job) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) + _, _ = w.Write(body) + return + } + + if jobName := q.Get("job_name"); jobName != "" { + var want prowgangway.JobExecutionStatus + if name := q.Get("status"); name != "" { + want = prowgangway.JobExecutionStatus(prowgangway.JobExecutionStatus_value[name]) + } + execs := &prowgangway.JobExecutions{} + for id, j := range f.jobs { + if j.Spec.Job != jobName { + continue + } + st := prowgangway.TranslateProwJobStatus(&j.Status) + if want != prowgangway.JobExecutionStatus_JOB_EXECUTION_STATUS_UNSPECIFIED && st != want { + continue + } + execs.JobExecution = append(execs.JobExecution, &prowgangway.JobExecution{ + Id: id, + JobName: j.Spec.Job, + JobStatus: st, + JobType: prowgangway.TranslateProwJobType(j.Spec.Type), + }) + } + out, err := protojson.Marshal(execs) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) + _, _ = w.Write(out) + return + } + + w.WriteHeader(http.StatusBadRequest) + })) + t.Cleanup(f.srv.Close) + return f +} + +func (f *abortFixture) client() *Client { + return NewClient("test-token", f.srv.URL, f.srv.URL) +} + +func (f *abortFixture) bulkRequests() int32 { + return atomic.LoadInt32(&f.bulkCount) +} + +func testJob(state prowjobs.ProwJobState, jobType prowjobs.ProwJobType, refs *prowjobs.Refs, start time.Time, region string) *prowjobs.ProwJob { + pj := &prowjobs.ProwJob{ + Spec: prowjobs.ProwJobSpec{ + Job: testJobName, + Type: jobType, + Refs: refs, + }, + Status: prowjobs.ProwJobStatus{ + State: state, + URL: "https://prow.ci.openshift.org/view/job", + }, + } + if region != "" { + pj.Annotations = map[string]string{ev2RolloutRegionAnnotation: region} + } + if !start.IsZero() { + pj.Status.StartTime = metav1.NewTime(start) + } + return pj +} + +func postsubmitRefs() *prowjobs.Refs { + return &prowjobs.Refs{Org: "Azure", Repo: "ARO-HCP", BaseRef: "main", BaseSHA: "deadbeef"} +} + +func TestDeriveBulkURL(t *testing.T) { + tests := []struct { + name string + in string + want string + }{ + { + name: "executions endpoint", + in: "https://gangway-ci.example.com/v1/executions", + want: "https://gangway-ci.example.com/v1/bulk-job-status-update", + }, + { + name: "strips query", + in: "https://gangway-ci.example.com/v1/executions?foo=bar", + want: "https://gangway-ci.example.com/v1/bulk-job-status-update", + }, + { + name: "host only", + in: "http://127.0.0.1:8080", + want: "http://127.0.0.1:8080/v1/bulk-job-status-update", + }, + { + name: "preserves base-path prefix", + in: "https://gangway-ci.example.com/gangway/v1/executions", + want: "https://gangway-ci.example.com/gangway/v1/bulk-job-status-update", + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + if got := deriveBulkURL(tc.in); got != tc.want { + t.Fatalf("deriveBulkURL(%q) = %q, want %q", tc.in, got, tc.want) + } + }) + } +} + +func TestIsTerminalState(t *testing.T) { + terminal := []prowjobs.ProwJobState{ + prowjobs.SuccessState, prowjobs.FailureState, prowjobs.AbortedState, prowjobs.ErrorState, + } + for _, s := range terminal { + if !isTerminalState(s) { + t.Errorf("isTerminalState(%q) = false, want true", s) + } + } + nonTerminal := []prowjobs.ProwJobState{ + prowjobs.TriggeredState, prowjobs.PendingState, prowjobs.SchedulingState, + } + for _, s := range nonTerminal { + if isTerminalState(s) { + t.Errorf("isTerminalState(%q) = true, want false", s) + } + } +} + +func TestAbortJobRunningJob(t *testing.T) { + start := time.Now().Truncate(time.Second) + f := newAbortFixture(t, map[string]*prowjobs.ProwJob{ + "job-exec-123": testJob(prowjobs.PendingState, prowjobs.PostsubmitJob, postsubmitRefs(), start, "eastus"), + }) + + if err := f.client().AbortJob(testContext(), "job-exec-123"); err != nil { + t.Fatalf("AbortJob returned error: %v", err) + } + + if got := f.bulkRequests(); got != 1 { + t.Fatalf("expected exactly 1 bulk request, got %d", got) + } + + f.mu.Lock() + req := f.lastBulk + f.mu.Unlock() + if req == nil { + t.Fatal("bulk request body was not captured") + } + if req.GetJobStatusChange().GetCurrent() != prowgangway.JobExecutionStatus_PENDING { + t.Errorf("current = %v, want PENDING", req.GetJobStatusChange().GetCurrent()) + } + if req.GetJobStatusChange().GetDesired() != prowgangway.JobExecutionStatus_ABORTED { + t.Errorf("desired = %v, want ABORTED", req.GetJobStatusChange().GetDesired()) + } + if req.GetJobType() != prowgangway.JobExecutionType_POSTSUBMIT { + t.Errorf("jobType = %v, want POSTSUBMIT", req.GetJobType()) + } + if req.GetRefs().GetOrg() != "Azure" || req.GetRefs().GetRepo() != "ARO-HCP" { + t.Errorf("refs = %v, want org=Azure repo=ARO-HCP", req.GetRefs()) + } + if !req.GetStartedAfter().AsTime().Equal(start) || !req.GetStartedBefore().AsTime().Equal(start) { + t.Errorf("window = [%v, %v], want both = %v", req.GetStartedAfter().AsTime(), req.GetStartedBefore().AsTime(), start) + } +} + +func TestAbortJobTerminalIsNoop(t *testing.T) { + start := time.Now().Truncate(time.Second) + f := newAbortFixture(t, map[string]*prowjobs.ProwJob{ + "job-exec-123": testJob(prowjobs.SuccessState, prowjobs.PostsubmitJob, postsubmitRefs(), start, "eastus"), + }) + + if err := f.client().AbortJob(testContext(), "job-exec-123"); err != nil { + t.Fatalf("AbortJob returned error: %v", err) + } + if got := f.bulkRequests(); got != 0 { + t.Fatalf("expected no bulk request for terminal job, got %d", got) + } +} + +func TestAbortJobNoStartTimeIsNoop(t *testing.T) { + f := newAbortFixture(t, map[string]*prowjobs.ProwJob{ + "job-exec-123": testJob(prowjobs.TriggeredState, prowjobs.PostsubmitJob, postsubmitRefs(), time.Time{}, "eastus"), + }) + + if err := f.client().AbortJob(testContext(), "job-exec-123"); err != nil { + t.Fatalf("AbortJob returned error: %v", err) + } + if got := f.bulkRequests(); got != 0 { + t.Fatalf("expected no bulk request when start time is unknown, got %d", got) + } +} + +// TestAbortJobNoRefsIsNoop verifies that a job with no refs is not aborted: +// refs are part of the bulk selector and the API cannot filter by job name, so +// aborting without them risks matching unrelated jobs sharing type/state. +func TestAbortJobNoRefsIsNoop(t *testing.T) { + start := time.Now().Truncate(time.Second) + f := newAbortFixture(t, map[string]*prowjobs.ProwJob{ + "job-exec-123": testJob(prowjobs.PendingState, prowjobs.PostsubmitJob, nil, start, "eastus"), + }) + + if err := f.client().AbortJob(testContext(), "job-exec-123"); err != nil { + t.Fatalf("AbortJob returned error: %v", err) + } + if got := f.bulkRequests(); got != 0 { + t.Fatalf("expected no bulk request when refs are missing, got %d", got) + } +} + +// TestAbortJobSkipsWhenRegionSharesWindow verifies that when another region's +// execution of the same job started in the same second (and is therefore +// indistinguishable to the region-blind, second-precise bulk API), AbortJob +// refuses to fire rather than cancel the sibling region's job. +func TestAbortJobSkipsWhenRegionSharesWindow(t *testing.T) { + start := time.Now().Truncate(time.Second) + f := newAbortFixture(t, map[string]*prowjobs.ProwJob{ + "exec-eastus": testJob(prowjobs.PendingState, prowjobs.PostsubmitJob, postsubmitRefs(), start, "eastus"), + "exec-westus": testJob(prowjobs.PendingState, prowjobs.PostsubmitJob, postsubmitRefs(), start, "westus"), + }) + + if err := f.client().AbortJob(testContext(), "exec-eastus"); err != nil { + t.Fatalf("AbortJob returned error: %v", err) + } + if got := f.bulkRequests(); got != 0 { + t.Fatalf("expected no bulk request when a sibling region shares the window, got %d", got) + } +} + +// TestAbortJobAbortsWhenRegionsDifferInTime verifies that concurrent regional +// executions which started in different seconds do not block the abort: the +// target is isolable, so exactly one bulk request is issued for our window. +func TestAbortJobAbortsWhenRegionsDifferInTime(t *testing.T) { + start := time.Now().Truncate(time.Second) + f := newAbortFixture(t, map[string]*prowjobs.ProwJob{ + "exec-eastus": testJob(prowjobs.PendingState, prowjobs.PostsubmitJob, postsubmitRefs(), start, "eastus"), + "exec-westus": testJob(prowjobs.PendingState, prowjobs.PostsubmitJob, postsubmitRefs(), start.Add(5*time.Second), "westus"), + }) + + if err := f.client().AbortJob(testContext(), "exec-eastus"); err != nil { + t.Fatalf("AbortJob returned error: %v", err) + } + if got := f.bulkRequests(); got != 1 { + t.Fatalf("expected exactly 1 bulk request, got %d", got) + } + + f.mu.Lock() + req := f.lastBulk + f.mu.Unlock() + if req == nil { + t.Fatal("bulk request body was not captured") + } + if !req.GetStartedAfter().AsTime().Equal(start) { + t.Errorf("window anchored at %v, want %v (the target region's start)", req.GetStartedAfter().AsTime(), start) + } +} + +// waitCtx wraps a discard-logger context with a cancel function. +func waitCtx() (context.Context, context.CancelFunc) { + return context.WithCancel(logr.NewContext(context.Background(), logr.Discard())) +} + +func TestWaitForCompletionAbortsOnCancel(t *testing.T) { + start := time.Now().Truncate(time.Second) + f := newAbortFixture(t, map[string]*prowjobs.ProwJob{ + "job-exec-123": testJob(prowjobs.PendingState, prowjobs.PostsubmitJob, postsubmitRefs(), start, "eastus"), + }) + + m := NewMonitor(f.client(), 5*time.Millisecond, time.Hour, false, true, true) + + ctx, cancel := waitCtx() + errCh := make(chan error, 1) + go func() { + errCh <- m.WaitForCompletion(ctx, logr.Discard(), "job-exec-123") + }() + + // Let the monitor observe the job at least once, then cancel. + time.Sleep(30 * time.Millisecond) + cancel() + + select { + case err := <-errCh: + if err == nil { + t.Fatal("expected an error on cancellation, got nil") + } + case <-time.After(2 * time.Second): + t.Fatal("WaitForCompletion did not return after cancellation") + } + + if got := f.bulkRequests(); got != 1 { + t.Fatalf("expected exactly 1 abort request, got %d", got) + } +} + +// TestWaitForCompletionAbortsWhenContextLacksLogger guards the fix where +// handleCancellation re-attaches the in-hand logger to the abort context: the +// client methods extract the logger via logr.FromContext, so the abort must +// still fire even when the parent context carries no logger of its own. +func TestWaitForCompletionAbortsWhenContextLacksLogger(t *testing.T) { + start := time.Now().Truncate(time.Second) + f := newAbortFixture(t, map[string]*prowjobs.ProwJob{ + "job-exec-123": testJob(prowjobs.PendingState, prowjobs.PostsubmitJob, postsubmitRefs(), start, "eastus"), + }) + + m := NewMonitor(f.client(), 5*time.Millisecond, time.Hour, false, true, true) + + // Context deliberately has no logger attached. + ctx, cancel := context.WithCancel(context.Background()) + errCh := make(chan error, 1) + go func() { + errCh <- m.WaitForCompletion(ctx, logr.Discard(), "job-exec-123") + }() + + time.Sleep(30 * time.Millisecond) + cancel() + + select { + case err := <-errCh: + if err == nil { + t.Fatal("expected an error on cancellation, got nil") + } + case <-time.After(2 * time.Second): + t.Fatal("WaitForCompletion did not return after cancellation") + } + + if got := f.bulkRequests(); got != 1 { + t.Fatalf("expected exactly 1 abort request despite logger-less context, got %d", got) + } +} +func TestWaitForCompletionNoAbortWhenDisabled(t *testing.T) { + start := time.Now().Truncate(time.Second) + f := newAbortFixture(t, map[string]*prowjobs.ProwJob{ + "job-exec-123": testJob(prowjobs.PendingState, prowjobs.PostsubmitJob, postsubmitRefs(), start, "eastus"), + }) + + m := NewMonitor(f.client(), 5*time.Millisecond, time.Hour, false, true, false) + + ctx, cancel := waitCtx() + errCh := make(chan error, 1) + go func() { + errCh <- m.WaitForCompletion(ctx, logr.Discard(), "job-exec-123") + }() + + time.Sleep(30 * time.Millisecond) + cancel() + + select { + case <-errCh: + case <-time.After(2 * time.Second): + t.Fatal("WaitForCompletion did not return after cancellation") + } + + if got := f.bulkRequests(); got != 0 { + t.Fatalf("expected no abort request when abort-on-cancel is disabled, got %d", got) + } +} + +func TestWaitForCompletionTimeoutDoesNotAbort(t *testing.T) { + start := time.Now().Truncate(time.Second) + f := newAbortFixture(t, map[string]*prowjobs.ProwJob{ + "job-exec-123": testJob(prowjobs.PendingState, prowjobs.PostsubmitJob, postsubmitRefs(), start, "eastus"), + }) + + // Short monitor timeout, parent context never cancelled: this is an internal + // timeout, which must not abort the job. + m := NewMonitor(f.client(), 5*time.Millisecond, 20*time.Millisecond, false, true, true) + + err := m.WaitForCompletion(testContext(), logr.Discard(), "job-exec-123") + if err == nil { + t.Fatal("expected timeout error, got nil") + } + if got := f.bulkRequests(); got != 0 { + t.Fatalf("expected no abort request on internal timeout, got %d", got) + } +} diff --git a/tools/prow-job-executor/prowjob/client.go b/tools/prow-job-executor/prowjob/client.go index 9abd1c2..cc51c78 100644 --- a/tools/prow-job-executor/prowjob/client.go +++ b/tools/prow-job-executor/prowjob/client.go @@ -25,9 +25,13 @@ import ( "io" "net" "net/http" + "net/url" + "strings" "time" "github.com/go-logr/logr" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/types/known/timestamppb" "k8s.io/apimachinery/pkg/util/wait" @@ -38,6 +42,15 @@ import ( "github.com/Azure/ARO-Tools/tools/prow-job-executor/internal/retry" ) +// bulkStatusChangePath is the Gangway REST route for BulkJobStatusChange; it +// shares the host of the executions endpoint. +const bulkStatusChangePath = "/v1/bulk-job-status-update" + +// executionsPath is the Gangway REST route the executor is configured against +// (--gangway-url). It is used to derive the bulk route while preserving any +// base-path prefix the URL may carry. +const executionsPath = "/v1/executions" + // jobSubmissionResponse represents the minimal JSON response from Gangway API for job submission type jobSubmissionResponse struct { ID string `json:"id"` @@ -48,6 +61,7 @@ type Client struct { token string client *http.Client gangwayURL string + bulkURL string prowURL string submitBackoff wait.Backoff } @@ -57,6 +71,7 @@ func NewClient(token, gangwayURL, prowURL string) *Client { return &Client{ token: token, gangwayURL: gangwayURL, + bulkURL: deriveBulkURL(gangwayURL), prowURL: prowURL, client: &http.Client{ Timeout: 30 * time.Second, @@ -78,7 +93,22 @@ func NewClient(token, gangwayURL, prowURL string) *Client { } } -// SubmitJob submits a job to Prow and returns the job execution ID, retrying on +// deriveBulkURL returns the Gangway bulk job status-change endpoint that shares +// the host of the executions endpoint. Any base-path prefix on the configured +// URL is preserved (e.g. https://host/gangway/v1/executions yields +// https://host/gangway/v1/bulk-job-status-update). If gangwayURL cannot be +// parsed the input is returned unchanged so the caller still surfaces a clear +// HTTP error instead of panicking. +func deriveBulkURL(gangwayURL string) string { + u, err := url.Parse(gangwayURL) + if err != nil { + return gangwayURL + } + u.Path = strings.TrimSuffix(u.Path, executionsPath) + bulkStatusChangePath + u.RawQuery = "" + return u.String() +} + // transient errors with exponential backoff. // // The Gangway API enforces a low request-rate limit (~9 requests/minute per client @@ -202,6 +232,296 @@ func (c *Client) getJobStatusOnce(ctx context.Context, prowExecutionID string) ( return &prowJob, nil } +// ev2RolloutRegionAnnotation is the ProwJob annotation carrying the EV2 rollout +// region. It mirrors the key set by the executor when submitting the job and is +// used here only for logging, since the Gangway abort API cannot filter on it. +const ev2RolloutRegionAnnotation = "ev2.rollout/region" + +// AbortJob requests cancellation of a running Prow job identified by its +// execution ID. +// +// The Gangway API exposes no per-execution abort. The only available mechanism +// is BulkJobStatusChange, which selects jobs by type, refs (org/repo), state and +// a StartTime window; it cannot target a single execution ID, and crucially it +// cannot filter by region. The same EV2 pipeline fans a rollout out to several +// regions concurrently, and every regional execution of an environment shares +// the same job name and refs - the region lives only in an annotation/env var +// that the selector ignores. ProwJob StartTime is also serialized at one-second +// precision, so a [StartTime, StartTime] window matches every execution that +// started in that same second. +// +// To avoid cancelling another region's job, AbortJob first enumerates the +// concurrent executions of the same job in the same state and verifies that our +// target is the only one whose start time falls in the bulk window. If a sibling +// (e.g. another region of the same rollout) shares that window we cannot isolate +// our execution, so the abort is skipped rather than risk cancelling the wrong +// region's E2E run. +// +// Aborting is best-effort and idempotent: a terminal job is a no-op, a job with +// no recorded StartTime is skipped, and an un-isolable job is skipped. +func (c *Client) AbortJob(ctx context.Context, prowExecutionID string) error { + logger, err := logr.FromContext(ctx) + if err != nil { + return err + } + + job, err := c.GetJobStatus(ctx, prowExecutionID) + if err != nil { + return fmt.Errorf("failed to look up job %s before aborting: %w", prowExecutionID, err) + } + + state := job.Status.State + region := job.Annotations[ev2RolloutRegionAnnotation] + logger = logger.WithValues("prowExecutionID", prowExecutionID, "jobName", job.Spec.Job, "status", string(state), "region", region) + + if isTerminalState(state) { + logger.Info("Job already in a terminal state, nothing to abort") + return nil + } + + if job.Status.StartTime.IsZero() { + // Without a recorded StartTime the bulk selector cannot be bounded at all, + // so we refuse to issue an abort that could cancel sibling jobs sharing the + // same type and refs. + logger.Info("Job has no recorded start time yet; skipping abort to avoid affecting other jobs") + return nil + } + + // Region-aware isolation check: ensure no other concurrent execution of the + // same job in the same state shares the StartTime window we are about to + // abort. Because the bulk API is region-blind and second-precise, aborting + // when a sibling shares the window would cancel that sibling too. + isolated, err := c.abortWindowIsIsolated(ctx, logger, job, prowExecutionID) + if err != nil { + // If we cannot prove isolation we err on the side of caution and leave the + // job running rather than risk cancelling another region's execution. + logger.Error(err, "Could not verify the abort would be isolated to this execution; skipping abort") + return nil + } + if !isolated { + return nil + } + + if job.Spec.Refs == nil { + // refs (org/repo) are part of the bulk selector and the API cannot filter + // by job name; aborting with nil refs could match a much broader set of + // jobs sharing only type/state. Refuse rather than risk over-selecting. + logger.Info("Job has no refs; skipping abort to avoid selecting unrelated jobs") + return nil + } + refs, err := prowgangway.FromCrdRefs(job.Spec.Refs) + if err != nil { + return fmt.Errorf("failed to convert refs for job %s: %w", prowExecutionID, err) + } + + // Pin the window to the StartTime; isMatchingCondition treats the bounds + // inclusively, so [StartTime, StartTime] is the tightest window the API allows. + start := timestamppb.New(job.Status.StartTime.Time) + request := &prowgangway.BulkJobStatusChangeRequest{ + JobStatusChange: &prowgangway.JobStatusChange{ + Current: prowgangway.TranslateProwJobStatus(&job.Status), + Desired: prowgangway.JobExecutionStatus_ABORTED, + }, + JobType: prowgangway.TranslateProwJobType(job.Spec.Type), + Refs: refs, + StartedAfter: start, + StartedBefore: start, + } + + logger.Info("Requesting abort of Prow job") + if err := c.postBulkStatusChange(ctx, request); err != nil { + return fmt.Errorf("failed to abort job %s: %w", prowExecutionID, err) + } + logger.Info("Abort request sent for Prow job") + return nil +} + +// abortWindowIsIsolated reports whether the target job is the only concurrent +// execution of the same job (across all regions) whose StartTime falls within +// the one-second bulk-abort window. It returns false (with a clear log) when a +// sibling shares the window, so the caller can skip the abort instead of +// cancelling another region's job. +func (c *Client) abortWindowIsIsolated(ctx context.Context, logger logr.Logger, job *prowjobs.ProwJob, prowExecutionID string) (bool, error) { + targetSecond := job.Status.StartTime.Truncate(time.Second) + currentStatus := prowgangway.TranslateProwJobStatus(&job.Status) + + executions, err := c.ListExecutions(ctx, job.Spec.Job, currentStatus) + if err != nil { + return false, fmt.Errorf("failed to list concurrent executions of job %q: %w", job.Spec.Job, err) + } + + for _, exec := range executions { + siblingID := exec.GetId() + if siblingID == "" || siblingID == prowExecutionID { + continue + } + + sibling, err := c.GetJobStatus(ctx, siblingID) + if err != nil { + // We could not inspect a concurrent execution, so we cannot rule out a + // shared window; treat that as un-isolable. + return false, fmt.Errorf("failed to inspect concurrent execution %s: %w", siblingID, err) + } + if sibling.Status.StartTime.IsZero() { + // Not started yet, so it cannot be in our window. + continue + } + if sibling.Status.StartTime.Truncate(time.Second).Equal(targetSecond) { + logger.Info("Skipping abort: a concurrent execution shares the abort window and cannot be isolated; leaving the Prow job running", + "conflictingExecutionID", siblingID, + "conflictingRegion", sibling.Annotations[ev2RolloutRegionAnnotation], + "startWindow", targetSecond.UTC().Format(time.RFC3339)) + return false, nil + } + } + + return true, nil +} + +// ListExecutions returns the executions of the given job name filtered to the +// provided status, via the Gangway ListJobExecutions endpoint. +func (c *Client) ListExecutions(ctx context.Context, jobName string, status prowgangway.JobExecutionStatus) ([]*prowgangway.JobExecution, error) { + logger, err := logr.FromContext(ctx) + if err != nil { + return nil, err + } + + query := url.Values{} + query.Set("job_name", jobName) + if status != prowgangway.JobExecutionStatus_JOB_EXECUTION_STATUS_UNSPECIFIED { + query.Set("status", status.String()) + } + u, err := url.Parse(c.gangwayURL) + if err != nil { + return nil, fmt.Errorf("failed to parse gangway URL %q: %w", c.gangwayURL, err) + } + u.RawQuery = query.Encode() + requestURL := u.String() + + req, err := http.NewRequestWithContext(ctx, "GET", requestURL, nil) + if err != nil { + return nil, fmt.Errorf("failed to create HTTP request: %w", err) + } + req.Header.Set("Authorization", "Bearer "+c.token) + + resp, err := c.client.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to list executions: %w", err) + } + defer func() { + if err := resp.Body.Close(); err != nil { + logger.Error(err, "failed to close body") + } + }() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, &httpStatusError{statusCode: resp.StatusCode, err: fmt.Errorf("list executions failed with status %d: %s", resp.StatusCode, string(body))} + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response body: %w", err) + } + + var executions prowgangway.JobExecutions + if err := protojson.Unmarshal(body, &executions); err != nil { + return nil, fmt.Errorf("failed to decode executions list: %w", err) + } + return executions.GetJobExecution(), nil +} + +// isTerminalState reports whether a ProwJob state is final and therefore not +// abortable. +func isTerminalState(state prowjobs.ProwJobState) bool { + switch state { + case prowjobs.SuccessState, prowjobs.FailureState, prowjobs.AbortedState, prowjobs.ErrorState: + return true + default: + return false + } +} + +// postBulkStatusChange POSTs a BulkJobStatusChangeRequest to Gangway, retrying on +// transient errors with a short exponential backoff. +func (c *Client) postBulkStatusChange(ctx context.Context, request *prowgangway.BulkJobStatusChangeRequest) error { + logger, err := logr.FromContext(ctx) + if err != nil { + return err + } + + // protojson is required here: the Gangway REST gateway expects proto3 JSON, + // where enums are strings and Timestamps are RFC 3339 strings. Plain + // encoding/json would emit integer enums and {seconds,nanos} timestamps that + // the gateway rejects. + data, err := protojson.Marshal(request) + if err != nil { + return fmt.Errorf("failed to marshal bulk status change request: %w", err) + } + + backoff := wait.Backoff{ + Duration: time.Second, + Factor: 2.0, + Jitter: 0.1, + Steps: 3, + Cap: 10 * time.Second, + } + + var lastErr error + condition := func(ctx context.Context) (bool, error) { + if err := c.postBulkStatusChangeOnce(ctx, data); err != nil { + lastErr = err + if !isRetryableError(err) { + return false, err + } + logger.Info("Bulk status change failed with a transient error, will retry", "error", err.Error()) + return false, nil + } + return true, nil + } + + if err := wait.ExponentialBackoffWithContext(ctx, backoff, condition); err != nil { + if lastErr != nil { + return fmt.Errorf("bulk status change failed after retries: %w", lastErr) + } + return err + } + return nil +} + +// postBulkStatusChangeOnce performs a single bulk status-change request without +// retry logic. +func (c *Client) postBulkStatusChangeOnce(ctx context.Context, data []byte) error { + logger, err := logr.FromContext(ctx) + if err != nil { + return err + } + + req, err := http.NewRequestWithContext(ctx, "POST", c.bulkURL, bytes.NewBuffer(data)) + if err != nil { + return fmt.Errorf("failed to create HTTP request: %w", err) + } + req.Header.Set("Authorization", "Bearer "+c.token) + req.Header.Set("Content-Type", "application/json") + + resp, err := c.client.Do(req) + if err != nil { + return fmt.Errorf("failed to send bulk status change: %w", err) + } + defer func() { + if err := resp.Body.Close(); err != nil { + logger.Error(err, "failed to close body") + } + }() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + err := fmt.Errorf("bulk status change failed with status %d: %s", resp.StatusCode, string(body)) + return &httpStatusError{statusCode: resp.StatusCode, err: err} + } + return nil +} + // httpStatusError wraps errors with HTTP status code information type httpStatusError struct { statusCode int diff --git a/tools/prow-job-executor/prowjob/monitor.go b/tools/prow-job-executor/prowjob/monitor.go index 800a18f..00c8722 100644 --- a/tools/prow-job-executor/prowjob/monitor.go +++ b/tools/prow-job-executor/prowjob/monitor.go @@ -25,6 +25,11 @@ import ( prowgangway "sigs.k8s.io/prow/pkg/gangway" ) +// abortTimeout bounds the best-effort abort issued when monitoring is cancelled. +// It must stay well within the process's shutdown grace period so the request can +// be sent before the container is killed. +const abortTimeout = 30 * time.Second + // Monitor handles job execution and monitoring type Monitor struct { client *Client @@ -32,22 +37,28 @@ type Monitor struct { timeout time.Duration dryRun bool gatePromotion bool + abortOnCancel bool } // NewMonitor creates a new job monitor with the specified polling interval and timeout. -func NewMonitor(client *Client, pollInterval, timeout time.Duration, dryRun, gatePromotion bool) *Monitor { +func NewMonitor(client *Client, pollInterval, timeout time.Duration, dryRun, gatePromotion, abortOnCancel bool) *Monitor { return &Monitor{ client: client, pollInterval: pollInterval, timeout: timeout, dryRun: dryRun, gatePromotion: gatePromotion, + abortOnCancel: abortOnCancel, } } // WaitForCompletion polls job status until completion func (m *Monitor) WaitForCompletion(ctx context.Context, logger logr.Logger, prowExecutionID string) error { - ctx, cancel := context.WithTimeout(ctx, m.timeout) + // Bound monitoring by the configured timeout while keeping a handle on the + // caller's context, so an external cancellation (e.g. EV2/ACI sending SIGTERM + // when the rollout is cancelled) can be told apart from our own timeout. + parent := ctx + monCtx, cancel := context.WithTimeout(parent, m.timeout) defer cancel() // Create ticker for polling interval @@ -56,7 +67,7 @@ func (m *Monitor) WaitForCompletion(ctx context.Context, logger logr.Logger, pro // Check status immediately, then poll at intervals for { - job, err := m.client.GetJobStatus(ctx, prowExecutionID) + job, err := m.client.GetJobStatus(monCtx, prowExecutionID) if err != nil { logger.Error(err, "Failed to get job status after retries, will continue polling") } else { @@ -98,7 +109,13 @@ func (m *Monitor) WaitForCompletion(ctx context.Context, logger logr.Logger, pro } select { - case <-ctx.Done(): + case <-monCtx.Done(): + // Distinguish caller cancellation (rollout cancelled) from the + // monitor's own timeout: only the former should abort the Prow job. + if parent.Err() != nil { + m.handleCancellation(parent, logger, prowExecutionID) + return fmt.Errorf("job monitoring cancelled for job %s: %w", prowExecutionID, context.Cause(parent)) + } if job != nil { return fmt.Errorf("job monitoring timed out after %v - job %s may still be running, check Prow UI: %s", m.timeout, prowExecutionID, job.Status.URL) } @@ -109,6 +126,30 @@ func (m *Monitor) WaitForCompletion(ctx context.Context, logger logr.Logger, pro } } +// handleCancellation makes a best-effort attempt to abort the Prow job after the +// monitoring context was cancelled by the caller (rollout cancellation). The +// abort runs on a fresh, short-lived context derived from the cancelled parent +// (values preserved, cancellation dropped) so the request can still be sent +// during the process's shutdown grace period. +func (m *Monitor) handleCancellation(parent context.Context, logger logr.Logger, prowExecutionID string) { + if !m.abortOnCancel { + logger.Info("Monitoring cancelled; abort-on-cancel disabled, leaving Prow job running", "prowExecutionID", prowExecutionID) + return + } + + logger.Info("Monitoring cancelled; handling Prow job abort", "prowExecutionID", prowExecutionID) + // Derive a fresh, short-lived context (values preserved, cancellation dropped) + // and re-attach the logger explicitly: the client methods extract it via + // logr.FromContext, so the abort must not depend on the parent carrying one. + abortCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), abortTimeout) + abortCtx = logr.NewContext(abortCtx, logger) + defer cancel() + + if err := m.client.AbortJob(abortCtx, prowExecutionID); err != nil { + logger.Error(err, "Failed to abort Prow job after cancellation", "prowExecutionID", prowExecutionID) + } +} + // ExecuteAndWait submits a job and waits for completion func (m *Monitor) ExecuteAndWait(ctx context.Context, logger logr.Logger, request *prowgangway.CreateJobExecutionRequest) error { // Submit job