From e3ce3c1b98c269350980bf95620607984447a655 Mon Sep 17 00:00:00 2001 From: Rael Garcia Date: Fri, 26 Jun 2026 16:24:25 +0000 Subject: [PATCH 1/3] feat(prow-job-executor): abort gating E2E job on rollout cancellation (AROSLSRE-1339) When the EV2 gating step is cancelled, the executor receives SIGTERM but previously left the postsubmit Prow E2E job running to completion. Propagate the cancellation by aborting the job via Gangway BulkJobStatusChange. Because one EV2 pipeline fans out to several regional E2E jobs that share the same job_name and refs, the bulk selector is region-blind and metav1.Time only serializes to one-second precision. A region-aware uniqueness guard lists concurrent same-job executions and skips the abort (fail-safe) if any sibling region shares the target's start-second, so a sibling region is never aborted. Gated behind --abort-on-cancel (default true). --- tools/prow-job-executor/go.mod | 2 +- tools/prow-job-executor/options.go | 9 +- tools/prow-job-executor/prowjob/abort_test.go | 398 ++++++++++++++++++ tools/prow-job-executor/prowjob/client.go | 305 +++++++++++++- tools/prow-job-executor/prowjob/monitor.go | 45 +- 5 files changed, 751 insertions(+), 8 deletions(-) create mode 100644 tools/prow-job-executor/prowjob/abort_test.go diff --git a/tools/prow-job-executor/go.mod b/tools/prow-job-executor/go.mod index 4158c71..7dfdd3e 100644 --- a/tools/prow-job-executor/go.mod +++ b/tools/prow-job-executor/go.mod @@ -9,6 +9,7 @@ require ( github.com/go-logr/logr v1.4.3 github.com/google/uuid v1.6.0 github.com/spf13/cobra v1.10.2 + google.golang.org/protobuf v1.36.11 k8s.io/apimachinery v0.35.3 sigs.k8s.io/prow v0.0.0-20251030184004-0e4d5be4200d sigs.k8s.io/yaml v1.6.0 @@ -170,7 +171,6 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect google.golang.org/grpc v1.78.0 // indirect - google.golang.org/protobuf v1.36.11 // indirect gopkg.in/evanphx/json-patch.v4 v4.13.0 // indirect gopkg.in/fsnotify.v1 v1.4.7 // indirect gopkg.in/inf.v0 v0.9.1 // indirect diff --git a/tools/prow-job-executor/options.go b/tools/prow-job-executor/options.go index 4a6012f..9d7fd31 100644 --- a/tools/prow-job-executor/options.go +++ b/tools/prow-job-executor/options.go @@ -65,6 +65,7 @@ func DefaultExecuteOptions() *RawExecuteOptions { BaseRef: defaultBaseRef, Org: defaultOrg, Repo: defaultRepo, + AbortOnCancel: true, } } @@ -83,6 +84,7 @@ func (o *RawExecuteOptions) BindFlags(cmd *cobra.Command) error { cmd.Flags().StringVar(&o.ProwURL, "prow-url", o.ProwURL, "Prow API URL for job status monitoring") cmd.Flags().BoolVar(&o.DryRun, "dry-run", o.DryRun, "Print which job would be started, but do not start one.") cmd.Flags().BoolVar(&o.GatePromotion, "gate-promotion", o.GatePromotion, "Exit with an error code if the job fails.") + cmd.Flags().BoolVar(&o.AbortOnCancel, "abort-on-cancel", o.AbortOnCancel, "Abort the running Prow job if the executor is cancelled (e.g. the rollout is cancelled and the process receives SIGTERM).") cmd.Flags().StringVar(&o.BaseSha, "base-sha", o.BaseSha, "Git commit SHA to test against. When set, the job is triggered as a postsubmit with this specific commit instead of HEAD.") cmd.Flags().StringVar(&o.BaseRef, "base-ref", o.BaseRef, "Git base ref (branch) for the postsubmit job (requires --base-sha)") cmd.Flags().StringVar(&o.Org, "org", o.Org, "GitHub org for the postsubmit job (requires --base-sha)") @@ -121,6 +123,7 @@ type RawExecuteOptions struct { ProwURL string DryRun bool GatePromotion bool + AbortOnCancel bool // Git ref options for postsubmit execution pinned to a specific commit. // When BaseSha is set, the job is triggered as a postsubmit instead of a periodic. @@ -160,6 +163,7 @@ type completedExecuteOptions struct { ProwURL string DryRun bool GatePromotion bool + AbortOnCancel bool // Git ref options for postsubmit execution BaseSha string @@ -291,6 +295,7 @@ func (o *ValidatedExecuteOptions) Complete(ctx context.Context) (*ExecuteOptions ProwURL: o.ProwURL, DryRun: o.DryRun, GatePromotion: o.GatePromotion, + AbortOnCancel: o.AbortOnCancel, BaseSha: o.BaseSha, BaseRef: o.BaseRef, Org: o.Org, @@ -309,7 +314,7 @@ func (o *ExecuteOptions) Execute(ctx context.Context) error { client := prowjob.NewClient(o.ProwToken, o.GangwayURL, o.ProwURL) // Create job monitor - monitor := prowjob.NewMonitor(client, o.PollInterval, o.Timeout, o.DryRun, o.GatePromotion) + monitor := prowjob.NewMonitor(client, o.PollInterval, o.Timeout, o.DryRun, o.GatePromotion, o.AbortOnCancel) // Prepare environment variables, including the region envs := make(map[string]string) @@ -509,7 +514,7 @@ func (o *ValidatedMonitorOptions) Complete(ctx context.Context) (*MonitorOptions func (o *MonitorOptions) Monitor(ctx context.Context, logger logr.Logger) error { // Create Prow client and monitor client := prowjob.NewClient(o.ProwToken, o.GangwayURL, o.ProwURL) - monitor := prowjob.NewMonitor(client, o.PollInterval, o.Timeout, false, false) + monitor := prowjob.NewMonitor(client, o.PollInterval, o.Timeout, false, false, false) // Monitor existing job using shared polling logic logger.Info("Starting to monitor existing job", "jobExecutionID", o.JobExecutionID) diff --git a/tools/prow-job-executor/prowjob/abort_test.go b/tools/prow-job-executor/prowjob/abort_test.go new file mode 100644 index 0000000..0471a5a --- /dev/null +++ b/tools/prow-job-executor/prowjob/abort_test.go @@ -0,0 +1,398 @@ +// Copyright 2025 Microsoft Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package prowjob + +import ( + "context" + "io" + "net/http" + "net/http/httptest" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/go-logr/logr" + "google.golang.org/protobuf/encoding/protojson" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + prowjobs "sigs.k8s.io/prow/pkg/apis/prowjobs/v1" + prowgangway "sigs.k8s.io/prow/pkg/gangway" + "sigs.k8s.io/yaml" +) + +const testJobName = "branch-ci-Azure-ARO-HCP-main-e2e-integration-e2e-parallel" + +// abortFixture is a test Prow/Gangway server. It serves ProwJob status (the Deck +// "?prowjob=" endpoint), the Gangway ListJobExecutions endpoint +// ("?job_name=<>&status=<>"), and records any bulk status-change (abort) requests. +type abortFixture struct { + srv *httptest.Server + jobs map[string]*prowjobs.ProwJob + bulkCount int32 + mu sync.Mutex + lastBulk *prowgangway.BulkJobStatusChangeRequest +} + +func newAbortFixture(t *testing.T, jobs map[string]*prowjobs.ProwJob) *abortFixture { + t.Helper() + f := &abortFixture{jobs: jobs} + f.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodPost && r.URL.Path == bulkStatusChangePath { + atomic.AddInt32(&f.bulkCount, 1) + raw, _ := io.ReadAll(r.Body) + var parsed prowgangway.BulkJobStatusChangeRequest + if err := protojson.Unmarshal(raw, &parsed); err == nil { + f.mu.Lock() + f.lastBulk = &parsed + f.mu.Unlock() + } + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("{}")) + return + } + + q := r.URL.Query() + if id := q.Get("prowjob"); id != "" { + job, ok := f.jobs[id] + if !ok { + w.WriteHeader(http.StatusNotFound) + return + } + body, err := yaml.Marshal(job) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) + _, _ = w.Write(body) + return + } + + if jobName := q.Get("job_name"); jobName != "" { + var want prowgangway.JobExecutionStatus + if name := q.Get("status"); name != "" { + want = prowgangway.JobExecutionStatus(prowgangway.JobExecutionStatus_value[name]) + } + execs := &prowgangway.JobExecutions{} + for id, j := range f.jobs { + if j.Spec.Job != jobName { + continue + } + st := prowgangway.TranslateProwJobStatus(&j.Status) + if want != prowgangway.JobExecutionStatus_JOB_EXECUTION_STATUS_UNSPECIFIED && st != want { + continue + } + execs.JobExecution = append(execs.JobExecution, &prowgangway.JobExecution{ + Id: id, + JobName: j.Spec.Job, + JobStatus: st, + JobType: prowgangway.TranslateProwJobType(j.Spec.Type), + }) + } + out, err := protojson.Marshal(execs) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) + _, _ = w.Write(out) + return + } + + w.WriteHeader(http.StatusBadRequest) + })) + t.Cleanup(f.srv.Close) + return f +} + +func (f *abortFixture) client() *Client { + return NewClient("test-token", f.srv.URL, f.srv.URL) +} + +func (f *abortFixture) bulkRequests() int32 { + return atomic.LoadInt32(&f.bulkCount) +} + +func testJob(state prowjobs.ProwJobState, jobType prowjobs.ProwJobType, refs *prowjobs.Refs, start time.Time, region string) *prowjobs.ProwJob { + pj := &prowjobs.ProwJob{ + Spec: prowjobs.ProwJobSpec{ + Job: testJobName, + Type: jobType, + Refs: refs, + }, + Status: prowjobs.ProwJobStatus{ + State: state, + URL: "https://prow.ci.openshift.org/view/job", + }, + } + if region != "" { + pj.Annotations = map[string]string{ev2RolloutRegionAnnotation: region} + } + if !start.IsZero() { + pj.Status.StartTime = metav1.NewTime(start) + } + return pj +} + +func postsubmitRefs() *prowjobs.Refs { + return &prowjobs.Refs{Org: "Azure", Repo: "ARO-HCP", BaseRef: "main", BaseSHA: "deadbeef"} +} + +func TestDeriveBulkURL(t *testing.T) { + tests := []struct { + name string + in string + want string + }{ + { + name: "executions endpoint", + in: "https://gangway-ci.example.com/v1/executions", + want: "https://gangway-ci.example.com/v1/bulk-job-status-update", + }, + { + name: "strips query", + in: "https://gangway-ci.example.com/v1/executions?foo=bar", + want: "https://gangway-ci.example.com/v1/bulk-job-status-update", + }, + { + name: "host only", + in: "http://127.0.0.1:8080", + want: "http://127.0.0.1:8080/v1/bulk-job-status-update", + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + if got := deriveBulkURL(tc.in); got != tc.want { + t.Fatalf("deriveBulkURL(%q) = %q, want %q", tc.in, got, tc.want) + } + }) + } +} + +func TestIsTerminalState(t *testing.T) { + terminal := []prowjobs.ProwJobState{ + prowjobs.SuccessState, prowjobs.FailureState, prowjobs.AbortedState, prowjobs.ErrorState, + } + for _, s := range terminal { + if !isTerminalState(s) { + t.Errorf("isTerminalState(%q) = false, want true", s) + } + } + nonTerminal := []prowjobs.ProwJobState{ + prowjobs.TriggeredState, prowjobs.PendingState, prowjobs.SchedulingState, + } + for _, s := range nonTerminal { + if isTerminalState(s) { + t.Errorf("isTerminalState(%q) = true, want false", s) + } + } +} + +func TestAbortJobRunningJob(t *testing.T) { + start := time.Now().Truncate(time.Second) + f := newAbortFixture(t, map[string]*prowjobs.ProwJob{ + "job-exec-123": testJob(prowjobs.PendingState, prowjobs.PostsubmitJob, postsubmitRefs(), start, "eastus"), + }) + + if err := f.client().AbortJob(testContext(), "job-exec-123"); err != nil { + t.Fatalf("AbortJob returned error: %v", err) + } + + if got := f.bulkRequests(); got != 1 { + t.Fatalf("expected exactly 1 bulk request, got %d", got) + } + + f.mu.Lock() + req := f.lastBulk + f.mu.Unlock() + if req == nil { + t.Fatal("bulk request body was not captured") + } + if req.GetJobStatusChange().GetCurrent() != prowgangway.JobExecutionStatus_PENDING { + t.Errorf("current = %v, want PENDING", req.GetJobStatusChange().GetCurrent()) + } + if req.GetJobStatusChange().GetDesired() != prowgangway.JobExecutionStatus_ABORTED { + t.Errorf("desired = %v, want ABORTED", req.GetJobStatusChange().GetDesired()) + } + if req.GetJobType() != prowgangway.JobExecutionType_POSTSUBMIT { + t.Errorf("jobType = %v, want POSTSUBMIT", req.GetJobType()) + } + if req.GetRefs().GetOrg() != "Azure" || req.GetRefs().GetRepo() != "ARO-HCP" { + t.Errorf("refs = %v, want org=Azure repo=ARO-HCP", req.GetRefs()) + } + if !req.GetStartedAfter().AsTime().Equal(start) || !req.GetStartedBefore().AsTime().Equal(start) { + t.Errorf("window = [%v, %v], want both = %v", req.GetStartedAfter().AsTime(), req.GetStartedBefore().AsTime(), start) + } +} + +func TestAbortJobTerminalIsNoop(t *testing.T) { + start := time.Now().Truncate(time.Second) + f := newAbortFixture(t, map[string]*prowjobs.ProwJob{ + "job-exec-123": testJob(prowjobs.SuccessState, prowjobs.PostsubmitJob, postsubmitRefs(), start, "eastus"), + }) + + if err := f.client().AbortJob(testContext(), "job-exec-123"); err != nil { + t.Fatalf("AbortJob returned error: %v", err) + } + if got := f.bulkRequests(); got != 0 { + t.Fatalf("expected no bulk request for terminal job, got %d", got) + } +} + +func TestAbortJobNoStartTimeIsNoop(t *testing.T) { + f := newAbortFixture(t, map[string]*prowjobs.ProwJob{ + "job-exec-123": testJob(prowjobs.TriggeredState, prowjobs.PostsubmitJob, postsubmitRefs(), time.Time{}, "eastus"), + }) + + if err := f.client().AbortJob(testContext(), "job-exec-123"); err != nil { + t.Fatalf("AbortJob returned error: %v", err) + } + if got := f.bulkRequests(); got != 0 { + t.Fatalf("expected no bulk request when start time is unknown, got %d", got) + } +} + +// TestAbortJobSkipsWhenRegionSharesWindow verifies that when another region's +// execution of the same job started in the same second (and is therefore +// indistinguishable to the region-blind, second-precise bulk API), AbortJob +// refuses to fire rather than cancel the sibling region's job. +func TestAbortJobSkipsWhenRegionSharesWindow(t *testing.T) { + start := time.Now().Truncate(time.Second) + f := newAbortFixture(t, map[string]*prowjobs.ProwJob{ + "exec-eastus": testJob(prowjobs.PendingState, prowjobs.PostsubmitJob, postsubmitRefs(), start, "eastus"), + "exec-westus": testJob(prowjobs.PendingState, prowjobs.PostsubmitJob, postsubmitRefs(), start, "westus"), + }) + + if err := f.client().AbortJob(testContext(), "exec-eastus"); err != nil { + t.Fatalf("AbortJob returned error: %v", err) + } + if got := f.bulkRequests(); got != 0 { + t.Fatalf("expected no bulk request when a sibling region shares the window, got %d", got) + } +} + +// TestAbortJobAbortsWhenRegionsDifferInTime verifies that concurrent regional +// executions which started in different seconds do not block the abort: the +// target is isolable, so exactly one bulk request is issued for our window. +func TestAbortJobAbortsWhenRegionsDifferInTime(t *testing.T) { + start := time.Now().Truncate(time.Second) + f := newAbortFixture(t, map[string]*prowjobs.ProwJob{ + "exec-eastus": testJob(prowjobs.PendingState, prowjobs.PostsubmitJob, postsubmitRefs(), start, "eastus"), + "exec-westus": testJob(prowjobs.PendingState, prowjobs.PostsubmitJob, postsubmitRefs(), start.Add(5*time.Second), "westus"), + }) + + if err := f.client().AbortJob(testContext(), "exec-eastus"); err != nil { + t.Fatalf("AbortJob returned error: %v", err) + } + if got := f.bulkRequests(); got != 1 { + t.Fatalf("expected exactly 1 bulk request, got %d", got) + } + + f.mu.Lock() + req := f.lastBulk + f.mu.Unlock() + if req == nil { + t.Fatal("bulk request body was not captured") + } + if !req.GetStartedAfter().AsTime().Equal(start) { + t.Errorf("window anchored at %v, want %v (the target region's start)", req.GetStartedAfter().AsTime(), start) + } +} + +// waitCtx wraps a discard-logger context with a cancel function. +func waitCtx() (context.Context, context.CancelFunc) { + return context.WithCancel(logr.NewContext(context.Background(), logr.Discard())) +} + +func TestWaitForCompletionAbortsOnCancel(t *testing.T) { + start := time.Now().Truncate(time.Second) + f := newAbortFixture(t, map[string]*prowjobs.ProwJob{ + "job-exec-123": testJob(prowjobs.PendingState, prowjobs.PostsubmitJob, postsubmitRefs(), start, "eastus"), + }) + + m := NewMonitor(f.client(), 5*time.Millisecond, time.Hour, false, true, true) + + ctx, cancel := waitCtx() + errCh := make(chan error, 1) + go func() { + errCh <- m.WaitForCompletion(ctx, logr.Discard(), "job-exec-123") + }() + + // Let the monitor observe the job at least once, then cancel. + time.Sleep(30 * time.Millisecond) + cancel() + + select { + case err := <-errCh: + if err == nil { + t.Fatal("expected an error on cancellation, got nil") + } + case <-time.After(2 * time.Second): + t.Fatal("WaitForCompletion did not return after cancellation") + } + + if got := f.bulkRequests(); got != 1 { + t.Fatalf("expected exactly 1 abort request, got %d", got) + } +} + +func TestWaitForCompletionNoAbortWhenDisabled(t *testing.T) { + start := time.Now().Truncate(time.Second) + f := newAbortFixture(t, map[string]*prowjobs.ProwJob{ + "job-exec-123": testJob(prowjobs.PendingState, prowjobs.PostsubmitJob, postsubmitRefs(), start, "eastus"), + }) + + m := NewMonitor(f.client(), 5*time.Millisecond, time.Hour, false, true, false) + + ctx, cancel := waitCtx() + errCh := make(chan error, 1) + go func() { + errCh <- m.WaitForCompletion(ctx, logr.Discard(), "job-exec-123") + }() + + time.Sleep(30 * time.Millisecond) + cancel() + + select { + case <-errCh: + case <-time.After(2 * time.Second): + t.Fatal("WaitForCompletion did not return after cancellation") + } + + if got := f.bulkRequests(); got != 0 { + t.Fatalf("expected no abort request when abort-on-cancel is disabled, got %d", got) + } +} + +func TestWaitForCompletionTimeoutDoesNotAbort(t *testing.T) { + start := time.Now().Truncate(time.Second) + f := newAbortFixture(t, map[string]*prowjobs.ProwJob{ + "job-exec-123": testJob(prowjobs.PendingState, prowjobs.PostsubmitJob, postsubmitRefs(), start, "eastus"), + }) + + // Short monitor timeout, parent context never cancelled: this is an internal + // timeout, which must not abort the job. + m := NewMonitor(f.client(), 5*time.Millisecond, 20*time.Millisecond, false, true, true) + + err := m.WaitForCompletion(testContext(), logr.Discard(), "job-exec-123") + if err == nil { + t.Fatal("expected timeout error, got nil") + } + if got := f.bulkRequests(); got != 0 { + t.Fatalf("expected no abort request on internal timeout, got %d", got) + } +} diff --git a/tools/prow-job-executor/prowjob/client.go b/tools/prow-job-executor/prowjob/client.go index 9abd1c2..8d91465 100644 --- a/tools/prow-job-executor/prowjob/client.go +++ b/tools/prow-job-executor/prowjob/client.go @@ -25,9 +25,12 @@ import ( "io" "net" "net/http" + "net/url" "time" "github.com/go-logr/logr" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/types/known/timestamppb" "k8s.io/apimachinery/pkg/util/wait" @@ -38,6 +41,10 @@ import ( "github.com/Azure/ARO-Tools/tools/prow-job-executor/internal/retry" ) +// bulkStatusChangePath is the Gangway REST route for BulkJobStatusChange; it +// shares the host of the executions endpoint. +const bulkStatusChangePath = "/v1/bulk-job-status-update" + // jobSubmissionResponse represents the minimal JSON response from Gangway API for job submission type jobSubmissionResponse struct { ID string `json:"id"` @@ -48,6 +55,7 @@ type Client struct { token string client *http.Client gangwayURL string + bulkURL string prowURL string submitBackoff wait.Backoff } @@ -57,6 +65,7 @@ func NewClient(token, gangwayURL, prowURL string) *Client { return &Client{ token: token, gangwayURL: gangwayURL, + bulkURL: deriveBulkURL(gangwayURL), prowURL: prowURL, client: &http.Client{ Timeout: 30 * time.Second, @@ -78,7 +87,20 @@ func NewClient(token, gangwayURL, prowURL string) *Client { } } -// SubmitJob submits a job to Prow and returns the job execution ID, retrying on +// deriveBulkURL returns the Gangway bulk job status-change endpoint that shares +// the host of the executions endpoint. If gangwayURL cannot be parsed the input +// is returned unchanged so the caller still surfaces a clear HTTP error instead +// of panicking. +func deriveBulkURL(gangwayURL string) string { + u, err := url.Parse(gangwayURL) + if err != nil { + return gangwayURL + } + u.Path = bulkStatusChangePath + u.RawQuery = "" + return u.String() +} + // transient errors with exponential backoff. // // The Gangway API enforces a low request-rate limit (~9 requests/minute per client @@ -202,6 +224,287 @@ func (c *Client) getJobStatusOnce(ctx context.Context, prowExecutionID string) ( return &prowJob, nil } +// ev2RolloutRegionAnnotation is the ProwJob annotation carrying the EV2 rollout +// region. It mirrors the key set by the executor when submitting the job and is +// used here only for logging, since the Gangway abort API cannot filter on it. +const ev2RolloutRegionAnnotation = "ev2.rollout/region" + +// AbortJob requests cancellation of a running Prow job identified by its +// execution ID. +// +// The Gangway API exposes no per-execution abort. The only available mechanism +// is BulkJobStatusChange, which selects jobs by type, refs (org/repo), state and +// a StartTime window; it cannot target a single execution ID, and crucially it +// cannot filter by region. The same EV2 pipeline fans a rollout out to several +// regions concurrently, and every regional execution of an environment shares +// the same job name and refs - the region lives only in an annotation/env var +// that the selector ignores. ProwJob StartTime is also serialized at one-second +// precision, so a [StartTime, StartTime] window matches every execution that +// started in that same second. +// +// To avoid cancelling another region's job, AbortJob first enumerates the +// concurrent executions of the same job in the same state and verifies that our +// target is the only one whose start time falls in the bulk window. If a sibling +// (e.g. another region of the same rollout) shares that window we cannot isolate +// our execution, so the abort is skipped rather than risk cancelling the wrong +// region's E2E run. +// +// Aborting is best-effort and idempotent: a terminal job is a no-op, a job with +// no recorded StartTime is skipped, and an un-isolable job is skipped. +func (c *Client) AbortJob(ctx context.Context, prowExecutionID string) error { + logger, err := logr.FromContext(ctx) + if err != nil { + return err + } + + job, err := c.GetJobStatus(ctx, prowExecutionID) + if err != nil { + return fmt.Errorf("failed to look up job %s before aborting: %w", prowExecutionID, err) + } + + state := job.Status.State + region := job.Annotations[ev2RolloutRegionAnnotation] + logger = logger.WithValues("prowExecutionID", prowExecutionID, "jobName", job.Spec.Job, "status", string(state), "region", region) + + if isTerminalState(state) { + logger.Info("Job already in a terminal state, nothing to abort") + return nil + } + + if job.Status.StartTime.IsZero() { + // Without a recorded StartTime the bulk selector cannot be bounded at all, + // so we refuse to issue an abort that could cancel sibling jobs sharing the + // same type and refs. + logger.Info("Job has no recorded start time yet; skipping abort to avoid affecting other jobs") + return nil + } + + // Region-aware isolation check: ensure no other concurrent execution of the + // same job in the same state shares the StartTime window we are about to + // abort. Because the bulk API is region-blind and second-precise, aborting + // when a sibling shares the window would cancel that sibling too. + isolated, err := c.abortWindowIsIsolated(ctx, logger, job, prowExecutionID) + if err != nil { + // If we cannot prove isolation we err on the side of caution and leave the + // job running rather than risk cancelling another region's execution. + logger.Error(err, "Could not verify the abort would be isolated to this execution; skipping abort") + return nil + } + if !isolated { + return nil + } + + var refs *prowgangway.Refs + if job.Spec.Refs != nil { + refs, err = prowgangway.FromCrdRefs(job.Spec.Refs) + if err != nil { + return fmt.Errorf("failed to convert refs for job %s: %w", prowExecutionID, err) + } + } + + // Pin the window to the StartTime; isMatchingCondition treats the bounds + // inclusively, so [StartTime, StartTime] is the tightest window the API allows. + start := timestamppb.New(job.Status.StartTime.Time) + request := &prowgangway.BulkJobStatusChangeRequest{ + JobStatusChange: &prowgangway.JobStatusChange{ + Current: prowgangway.TranslateProwJobStatus(&job.Status), + Desired: prowgangway.JobExecutionStatus_ABORTED, + }, + JobType: prowgangway.TranslateProwJobType(job.Spec.Type), + Refs: refs, + StartedAfter: start, + StartedBefore: start, + } + + logger.Info("Requesting abort of Prow job") + if err := c.postBulkStatusChange(ctx, request); err != nil { + return fmt.Errorf("failed to abort job %s: %w", prowExecutionID, err) + } + logger.Info("Abort request sent for Prow job") + return nil +} + +// abortWindowIsIsolated reports whether the target job is the only concurrent +// execution of the same job (across all regions) whose StartTime falls within +// the one-second bulk-abort window. It returns false (with a clear log) when a +// sibling shares the window, so the caller can skip the abort instead of +// cancelling another region's job. +func (c *Client) abortWindowIsIsolated(ctx context.Context, logger logr.Logger, job *prowjobs.ProwJob, prowExecutionID string) (bool, error) { + targetSecond := job.Status.StartTime.Truncate(time.Second) + currentStatus := prowgangway.TranslateProwJobStatus(&job.Status) + + executions, err := c.ListExecutions(ctx, job.Spec.Job, currentStatus) + if err != nil { + return false, fmt.Errorf("failed to list concurrent executions of job %q: %w", job.Spec.Job, err) + } + + for _, exec := range executions { + siblingID := exec.GetId() + if siblingID == "" || siblingID == prowExecutionID { + continue + } + + sibling, err := c.GetJobStatus(ctx, siblingID) + if err != nil { + // We could not inspect a concurrent execution, so we cannot rule out a + // shared window; treat that as un-isolable. + return false, fmt.Errorf("failed to inspect concurrent execution %s: %w", siblingID, err) + } + if sibling.Status.StartTime.IsZero() { + // Not started yet, so it cannot be in our window. + continue + } + if sibling.Status.StartTime.Truncate(time.Second).Equal(targetSecond) { + logger.Info("Skipping abort: a concurrent execution shares the abort window and cannot be isolated; leaving the Prow job running", + "conflictingExecutionID", siblingID, + "conflictingRegion", sibling.Annotations[ev2RolloutRegionAnnotation], + "startWindow", targetSecond.UTC().Format(time.RFC3339)) + return false, nil + } + } + + return true, nil +} + +// ListExecutions returns the executions of the given job name filtered to the +// provided status, via the Gangway ListJobExecutions endpoint. +func (c *Client) ListExecutions(ctx context.Context, jobName string, status prowgangway.JobExecutionStatus) ([]*prowgangway.JobExecution, error) { + logger, err := logr.FromContext(ctx) + if err != nil { + return nil, err + } + + query := url.Values{} + query.Set("job_name", jobName) + if status != prowgangway.JobExecutionStatus_JOB_EXECUTION_STATUS_UNSPECIFIED { + query.Set("status", status.String()) + } + requestURL := c.gangwayURL + "?" + query.Encode() + + req, err := http.NewRequestWithContext(ctx, "GET", requestURL, nil) + if err != nil { + return nil, fmt.Errorf("failed to create HTTP request: %w", err) + } + req.Header.Set("Authorization", "Bearer "+c.token) + + resp, err := c.client.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to list executions: %w", err) + } + defer func() { + if err := resp.Body.Close(); err != nil { + logger.Error(err, "failed to close body") + } + }() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, &httpStatusError{statusCode: resp.StatusCode, err: fmt.Errorf("list executions failed with status %d: %s", resp.StatusCode, string(body))} + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response body: %w", err) + } + + var executions prowgangway.JobExecutions + if err := protojson.Unmarshal(body, &executions); err != nil { + return nil, fmt.Errorf("failed to decode executions list: %w", err) + } + return executions.GetJobExecution(), nil +} + +// isTerminalState reports whether a ProwJob state is final and therefore not +// abortable. +func isTerminalState(state prowjobs.ProwJobState) bool { + switch state { + case prowjobs.SuccessState, prowjobs.FailureState, prowjobs.AbortedState, prowjobs.ErrorState: + return true + default: + return false + } +} + +// postBulkStatusChange POSTs a BulkJobStatusChangeRequest to Gangway, retrying on +// transient errors with a short exponential backoff. +func (c *Client) postBulkStatusChange(ctx context.Context, request *prowgangway.BulkJobStatusChangeRequest) error { + logger, err := logr.FromContext(ctx) + if err != nil { + return err + } + + // protojson is required here: the Gangway REST gateway expects proto3 JSON, + // where enums are strings and Timestamps are RFC 3339 strings. Plain + // encoding/json would emit integer enums and {seconds,nanos} timestamps that + // the gateway rejects. + data, err := protojson.Marshal(request) + if err != nil { + return fmt.Errorf("failed to marshal bulk status change request: %w", err) + } + + backoff := wait.Backoff{ + Duration: time.Second, + Factor: 2.0, + Jitter: 0.1, + Steps: 3, + Cap: 10 * time.Second, + } + + var lastErr error + condition := func(ctx context.Context) (bool, error) { + if err := c.postBulkStatusChangeOnce(ctx, data); err != nil { + lastErr = err + if !isRetryableError(err) { + return false, err + } + logger.Info("Bulk status change failed with a transient error, will retry", "error", err.Error()) + return false, nil + } + return true, nil + } + + if err := wait.ExponentialBackoffWithContext(ctx, backoff, condition); err != nil { + if lastErr != nil { + return fmt.Errorf("bulk status change failed after retries: %w", lastErr) + } + return err + } + return nil +} + +// postBulkStatusChangeOnce performs a single bulk status-change request without +// retry logic. +func (c *Client) postBulkStatusChangeOnce(ctx context.Context, data []byte) error { + logger, err := logr.FromContext(ctx) + if err != nil { + return err + } + + req, err := http.NewRequestWithContext(ctx, "POST", c.bulkURL, bytes.NewBuffer(data)) + if err != nil { + return fmt.Errorf("failed to create HTTP request: %w", err) + } + req.Header.Set("Authorization", "Bearer "+c.token) + req.Header.Set("Content-Type", "application/json") + + resp, err := c.client.Do(req) + if err != nil { + return fmt.Errorf("failed to send bulk status change: %w", err) + } + defer func() { + if err := resp.Body.Close(); err != nil { + logger.Error(err, "failed to close body") + } + }() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + err := fmt.Errorf("bulk status change failed with status %d: %s", resp.StatusCode, string(body)) + return &httpStatusError{statusCode: resp.StatusCode, err: err} + } + return nil +} + // httpStatusError wraps errors with HTTP status code information type httpStatusError struct { statusCode int diff --git a/tools/prow-job-executor/prowjob/monitor.go b/tools/prow-job-executor/prowjob/monitor.go index 800a18f..1b6dbe0 100644 --- a/tools/prow-job-executor/prowjob/monitor.go +++ b/tools/prow-job-executor/prowjob/monitor.go @@ -25,6 +25,11 @@ import ( prowgangway "sigs.k8s.io/prow/pkg/gangway" ) +// abortTimeout bounds the best-effort abort issued when monitoring is cancelled. +// It must stay well within the process's shutdown grace period so the request can +// be sent before the container is killed. +const abortTimeout = 30 * time.Second + // Monitor handles job execution and monitoring type Monitor struct { client *Client @@ -32,22 +37,28 @@ type Monitor struct { timeout time.Duration dryRun bool gatePromotion bool + abortOnCancel bool } // NewMonitor creates a new job monitor with the specified polling interval and timeout. -func NewMonitor(client *Client, pollInterval, timeout time.Duration, dryRun, gatePromotion bool) *Monitor { +func NewMonitor(client *Client, pollInterval, timeout time.Duration, dryRun, gatePromotion, abortOnCancel bool) *Monitor { return &Monitor{ client: client, pollInterval: pollInterval, timeout: timeout, dryRun: dryRun, gatePromotion: gatePromotion, + abortOnCancel: abortOnCancel, } } // WaitForCompletion polls job status until completion func (m *Monitor) WaitForCompletion(ctx context.Context, logger logr.Logger, prowExecutionID string) error { - ctx, cancel := context.WithTimeout(ctx, m.timeout) + // Bound monitoring by the configured timeout while keeping a handle on the + // caller's context, so an external cancellation (e.g. EV2/ACI sending SIGTERM + // when the rollout is cancelled) can be told apart from our own timeout. + parent := ctx + monCtx, cancel := context.WithTimeout(parent, m.timeout) defer cancel() // Create ticker for polling interval @@ -56,7 +67,7 @@ func (m *Monitor) WaitForCompletion(ctx context.Context, logger logr.Logger, pro // Check status immediately, then poll at intervals for { - job, err := m.client.GetJobStatus(ctx, prowExecutionID) + job, err := m.client.GetJobStatus(monCtx, prowExecutionID) if err != nil { logger.Error(err, "Failed to get job status after retries, will continue polling") } else { @@ -98,7 +109,13 @@ func (m *Monitor) WaitForCompletion(ctx context.Context, logger logr.Logger, pro } select { - case <-ctx.Done(): + case <-monCtx.Done(): + // Distinguish caller cancellation (rollout cancelled) from the + // monitor's own timeout: only the former should abort the Prow job. + if parent.Err() != nil { + m.handleCancellation(parent, logger, prowExecutionID) + return fmt.Errorf("job monitoring cancelled for job %s: %w", prowExecutionID, context.Cause(parent)) + } if job != nil { return fmt.Errorf("job monitoring timed out after %v - job %s may still be running, check Prow UI: %s", m.timeout, prowExecutionID, job.Status.URL) } @@ -109,6 +126,26 @@ func (m *Monitor) WaitForCompletion(ctx context.Context, logger logr.Logger, pro } } +// handleCancellation makes a best-effort attempt to abort the Prow job after the +// monitoring context was cancelled by the caller (rollout cancellation). The +// abort runs on a fresh, short-lived context derived from the cancelled parent +// (values preserved, cancellation dropped) so the request can still be sent +// during the process's shutdown grace period. +func (m *Monitor) handleCancellation(parent context.Context, logger logr.Logger, prowExecutionID string) { + if !m.abortOnCancel { + logger.Info("Monitoring cancelled; abort-on-cancel disabled, leaving Prow job running", "prowExecutionID", prowExecutionID) + return + } + + logger.Info("Monitoring cancelled; handling Prow job abort", "prowExecutionID", prowExecutionID) + abortCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), abortTimeout) + defer cancel() + + if err := m.client.AbortJob(abortCtx, prowExecutionID); err != nil { + logger.Error(err, "Failed to abort Prow job after cancellation", "prowExecutionID", prowExecutionID) + } +} + // ExecuteAndWait submits a job and waits for completion func (m *Monitor) ExecuteAndWait(ctx context.Context, logger logr.Logger, request *prowgangway.CreateJobExecutionRequest) error { // Submit job From a16ebeb975c2a88dd55139990bb6768d77fe96f7 Mon Sep 17 00:00:00 2001 From: Rael Garcia Date: Fri, 26 Jun 2026 17:05:14 +0000 Subject: [PATCH 2/3] fix(prow-job-executor): harden abort logger context and list-URL building Address review feedback on the cancellation-abort path: - handleCancellation now re-attaches the in-hand logger to the abort context via logr.NewContext. The client methods extract the logger with logr.FromContext and fail if it is absent, so the abort must not depend on the parent context carrying one. Added a regression test that aborts from a logger-less context. - ListExecutions builds its request URL through net/url instead of string concatenation, so a --gangway-url that already carries a query string or trailing '?' no longer corrupts the request. --- tools/prow-job-executor/prowjob/abort_test.go | 35 +++++++++++++++++++ tools/prow-job-executor/prowjob/client.go | 7 +++- tools/prow-job-executor/prowjob/monitor.go | 4 +++ 3 files changed, 45 insertions(+), 1 deletion(-) diff --git a/tools/prow-job-executor/prowjob/abort_test.go b/tools/prow-job-executor/prowjob/abort_test.go index 0471a5a..a8e28c8 100644 --- a/tools/prow-job-executor/prowjob/abort_test.go +++ b/tools/prow-job-executor/prowjob/abort_test.go @@ -350,6 +350,41 @@ func TestWaitForCompletionAbortsOnCancel(t *testing.T) { } } +// TestWaitForCompletionAbortsWhenContextLacksLogger guards the fix where +// handleCancellation re-attaches the in-hand logger to the abort context: the +// client methods extract the logger via logr.FromContext, so the abort must +// still fire even when the parent context carries no logger of its own. +func TestWaitForCompletionAbortsWhenContextLacksLogger(t *testing.T) { + start := time.Now().Truncate(time.Second) + f := newAbortFixture(t, map[string]*prowjobs.ProwJob{ + "job-exec-123": testJob(prowjobs.PendingState, prowjobs.PostsubmitJob, postsubmitRefs(), start, "eastus"), + }) + + m := NewMonitor(f.client(), 5*time.Millisecond, time.Hour, false, true, true) + + // Context deliberately has no logger attached. + ctx, cancel := context.WithCancel(context.Background()) + errCh := make(chan error, 1) + go func() { + errCh <- m.WaitForCompletion(ctx, logr.Discard(), "job-exec-123") + }() + + time.Sleep(30 * time.Millisecond) + cancel() + + select { + case err := <-errCh: + if err == nil { + t.Fatal("expected an error on cancellation, got nil") + } + case <-time.After(2 * time.Second): + t.Fatal("WaitForCompletion did not return after cancellation") + } + + if got := f.bulkRequests(); got != 1 { + t.Fatalf("expected exactly 1 abort request despite logger-less context, got %d", got) + } +} func TestWaitForCompletionNoAbortWhenDisabled(t *testing.T) { start := time.Now().Truncate(time.Second) f := newAbortFixture(t, map[string]*prowjobs.ProwJob{ diff --git a/tools/prow-job-executor/prowjob/client.go b/tools/prow-job-executor/prowjob/client.go index 8d91465..443cada 100644 --- a/tools/prow-job-executor/prowjob/client.go +++ b/tools/prow-job-executor/prowjob/client.go @@ -379,7 +379,12 @@ func (c *Client) ListExecutions(ctx context.Context, jobName string, status prow if status != prowgangway.JobExecutionStatus_JOB_EXECUTION_STATUS_UNSPECIFIED { query.Set("status", status.String()) } - requestURL := c.gangwayURL + "?" + query.Encode() + u, err := url.Parse(c.gangwayURL) + if err != nil { + return nil, fmt.Errorf("failed to parse gangway URL %q: %w", c.gangwayURL, err) + } + u.RawQuery = query.Encode() + requestURL := u.String() req, err := http.NewRequestWithContext(ctx, "GET", requestURL, nil) if err != nil { diff --git a/tools/prow-job-executor/prowjob/monitor.go b/tools/prow-job-executor/prowjob/monitor.go index 1b6dbe0..00c8722 100644 --- a/tools/prow-job-executor/prowjob/monitor.go +++ b/tools/prow-job-executor/prowjob/monitor.go @@ -138,7 +138,11 @@ func (m *Monitor) handleCancellation(parent context.Context, logger logr.Logger, } logger.Info("Monitoring cancelled; handling Prow job abort", "prowExecutionID", prowExecutionID) + // Derive a fresh, short-lived context (values preserved, cancellation dropped) + // and re-attach the logger explicitly: the client methods extract it via + // logr.FromContext, so the abort must not depend on the parent carrying one. abortCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), abortTimeout) + abortCtx = logr.NewContext(abortCtx, logger) defer cancel() if err := m.client.AbortJob(abortCtx, prowExecutionID); err != nil { From 34701020905f7fa71c08424a148801558fd902d8 Mon Sep 17 00:00:00 2001 From: Rael Garcia Date: Mon, 29 Jun 2026 07:05:34 +0000 Subject: [PATCH 3/3] fix(prow-job-executor): preserve base path in bulk URL and require refs to abort Second-round review feedback: - deriveBulkURL now preserves any base-path prefix on --gangway-url, swapping the /v1/executions route for /v1/bulk-job-status-update instead of replacing the whole path. A prefixed URL (host/gangway/v1/executions) no longer drops the prefix and 404s the abort. - AbortJob refuses to abort when the job has no Spec.Refs. Refs are part of the bulk selector and the API cannot filter by job name, so a nil-refs abort could over-select. Skip instead. - Added tests for the prefixed-URL derivation and the no-refs no-op. --- tools/prow-job-executor/prowjob/abort_test.go | 22 +++++++++++++ tools/prow-job-executor/prowjob/client.go | 32 +++++++++++++------ 2 files changed, 44 insertions(+), 10 deletions(-) diff --git a/tools/prow-job-executor/prowjob/abort_test.go b/tools/prow-job-executor/prowjob/abort_test.go index a8e28c8..40c9e27 100644 --- a/tools/prow-job-executor/prowjob/abort_test.go +++ b/tools/prow-job-executor/prowjob/abort_test.go @@ -173,6 +173,11 @@ func TestDeriveBulkURL(t *testing.T) { in: "http://127.0.0.1:8080", want: "http://127.0.0.1:8080/v1/bulk-job-status-update", }, + { + name: "preserves base-path prefix", + in: "https://gangway-ci.example.com/gangway/v1/executions", + want: "https://gangway-ci.example.com/gangway/v1/bulk-job-status-update", + }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { @@ -266,6 +271,23 @@ func TestAbortJobNoStartTimeIsNoop(t *testing.T) { } } +// TestAbortJobNoRefsIsNoop verifies that a job with no refs is not aborted: +// refs are part of the bulk selector and the API cannot filter by job name, so +// aborting without them risks matching unrelated jobs sharing type/state. +func TestAbortJobNoRefsIsNoop(t *testing.T) { + start := time.Now().Truncate(time.Second) + f := newAbortFixture(t, map[string]*prowjobs.ProwJob{ + "job-exec-123": testJob(prowjobs.PendingState, prowjobs.PostsubmitJob, nil, start, "eastus"), + }) + + if err := f.client().AbortJob(testContext(), "job-exec-123"); err != nil { + t.Fatalf("AbortJob returned error: %v", err) + } + if got := f.bulkRequests(); got != 0 { + t.Fatalf("expected no bulk request when refs are missing, got %d", got) + } +} + // TestAbortJobSkipsWhenRegionSharesWindow verifies that when another region's // execution of the same job started in the same second (and is therefore // indistinguishable to the region-blind, second-precise bulk API), AbortJob diff --git a/tools/prow-job-executor/prowjob/client.go b/tools/prow-job-executor/prowjob/client.go index 443cada..cc51c78 100644 --- a/tools/prow-job-executor/prowjob/client.go +++ b/tools/prow-job-executor/prowjob/client.go @@ -26,6 +26,7 @@ import ( "net" "net/http" "net/url" + "strings" "time" "github.com/go-logr/logr" @@ -45,6 +46,11 @@ import ( // shares the host of the executions endpoint. const bulkStatusChangePath = "/v1/bulk-job-status-update" +// executionsPath is the Gangway REST route the executor is configured against +// (--gangway-url). It is used to derive the bulk route while preserving any +// base-path prefix the URL may carry. +const executionsPath = "/v1/executions" + // jobSubmissionResponse represents the minimal JSON response from Gangway API for job submission type jobSubmissionResponse struct { ID string `json:"id"` @@ -88,15 +94,17 @@ func NewClient(token, gangwayURL, prowURL string) *Client { } // deriveBulkURL returns the Gangway bulk job status-change endpoint that shares -// the host of the executions endpoint. If gangwayURL cannot be parsed the input -// is returned unchanged so the caller still surfaces a clear HTTP error instead -// of panicking. +// the host of the executions endpoint. Any base-path prefix on the configured +// URL is preserved (e.g. https://host/gangway/v1/executions yields +// https://host/gangway/v1/bulk-job-status-update). If gangwayURL cannot be +// parsed the input is returned unchanged so the caller still surfaces a clear +// HTTP error instead of panicking. func deriveBulkURL(gangwayURL string) string { u, err := url.Parse(gangwayURL) if err != nil { return gangwayURL } - u.Path = bulkStatusChangePath + u.Path = strings.TrimSuffix(u.Path, executionsPath) + bulkStatusChangePath u.RawQuery = "" return u.String() } @@ -294,12 +302,16 @@ func (c *Client) AbortJob(ctx context.Context, prowExecutionID string) error { return nil } - var refs *prowgangway.Refs - if job.Spec.Refs != nil { - refs, err = prowgangway.FromCrdRefs(job.Spec.Refs) - if err != nil { - return fmt.Errorf("failed to convert refs for job %s: %w", prowExecutionID, err) - } + if job.Spec.Refs == nil { + // refs (org/repo) are part of the bulk selector and the API cannot filter + // by job name; aborting with nil refs could match a much broader set of + // jobs sharing only type/state. Refuse rather than risk over-selecting. + logger.Info("Job has no refs; skipping abort to avoid selecting unrelated jobs") + return nil + } + refs, err := prowgangway.FromCrdRefs(job.Spec.Refs) + if err != nil { + return fmt.Errorf("failed to convert refs for job %s: %w", prowExecutionID, err) } // Pin the window to the StartTime; isMatchingCondition treats the bounds