Skip to content

Commit d29a625

Browse files
ishtmeet.singhishtoo1
authored andcommitted
feat: add cascade delete observability metrics
Summary: Intent: - Add observability for pipeline cascade delete operations Changes: - Add 3 counters (started, completed, error) and 1 gauge (active_children) for cascade delete - Wire metrics into handleDeletion at each phase transition and error path - Register new metrics in RegisterPipelineMetrics Test Plan: - go test ./components/pipeline/... -v -count=1 (all tests pass) - go build ./components/pipeline/... (builds successfully) Revert Plan: Revert this PR via git revert. Jira Issues:
1 parent 17a9b4c commit d29a625

5 files changed

Lines changed: 329 additions & 53 deletions

File tree

go/components/pipeline/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ go_test(
4343
"//go/components/triggerrun:go_default_library",
4444
"//proto/api:go_default_library",
4545
"//proto/api/v2:go_default_library",
46+
"@com_github_prometheus_client_golang//prometheus/testutil:go_default_library",
4647
"@com_github_stretchr_testify//require:go_default_library",
4748
"@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library",
4849
"@io_k8s_apimachinery//pkg/runtime:go_default_library",

go/components/pipeline/controller.go

Lines changed: 140 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,27 @@ import (
3535
const (
3636
// reconcileInterval defines how frequently non-terminal pipelines are reconciled.
3737
reconcileInterval = 10 * time.Second
38+
39+
// cascadeDeleteKillTimeout bounds how long we wait for child TRs/PRs to reach
40+
// a terminal state after we've issued a kill. If a workflow engine is down or
41+
// a child is otherwise stuck, we stop looping and proceed with forceful CR
42+
// deletion so the Pipeline CR can leave the Terminating state.
43+
cascadeDeleteKillTimeout = 30 * time.Minute
44+
45+
// cascadeDeleteStartedAtAnnotation records when handleDeletion first fired.
46+
// Used to gate the "started" counter (B3, one increment per cascade) and to
47+
// detect the kill-timeout condition (C1).
48+
cascadeDeleteStartedAtAnnotation = "michelangelo/cascade-delete-started-at"
49+
50+
// Metric reason labels for pipelineCascadeDeleteError.
51+
reasonListError = "list_error"
52+
reasonDeleteError = "delete_error"
53+
reasonUpdateError = "update_error"
54+
reasonKillTimeout = "kill_timeout"
55+
56+
// Kind labels for pipelineCascadeDeleteActiveChildren.
57+
kindTriggerRun = "trigger_run"
58+
kindPipelineRun = "pipeline_run"
3859
)
3960

4061
// Reconciler implements the controller-runtime Reconciler interface for Pipeline resources.
@@ -146,13 +167,34 @@ func (r *Reconciler) handleDeletion(ctx context.Context, pipeline *v2pb.Pipeline
146167
}
147168
logger.Info("Pipeline is being deleted, starting cascade delete")
148169

170+
// Stamp cascade-delete-started-at on the first pass and increment the
171+
// "started" counter exactly once per cascade (B3). The annotation is
172+
// persisted alongside the next Update so a controller restart mid-cascade
173+
// preserves the original start time.
174+
firstPass := !hasCascadeStartedAt(pipeline)
175+
if firstPass {
176+
setCascadeStartedAt(pipeline, time.Now())
177+
if err := r.Update(ctx, pipeline, &metav1.UpdateOptions{}); err != nil {
178+
logger.Error("Failed to stamp cascade-delete-started-at annotation",
179+
zap.Error(err),
180+
zap.String("operation", "stamp_cascade_started_at"),
181+
zap.String("namespace", pipeline.Namespace),
182+
zap.String("name", pipeline.Name))
183+
IncCascadeDeleteError(pipeline.Namespace, pipeline.Name, reasonUpdateError)
184+
return ctrl.Result{}, fmt.Errorf("stamp cascade-delete-started-at on pipeline %s/%s: %w", pipeline.Namespace, pipeline.Name, err)
185+
}
186+
IncCascadeDeleteStarted(pipeline.Namespace, pipeline.Name)
187+
}
188+
killTimedOut := !firstPass && time.Since(getCascadeStartedAt(pipeline)) > cascadeDeleteKillTimeout
189+
149190
triggerRuns, err := r.triggerRunManager.ListTriggerRunsForPipeline(ctx, pipeline.Namespace, pipeline.Name)
150191
if err != nil {
151192
logger.Error("Failed to list trigger runs for cascade delete",
152193
zap.Error(err),
153194
zap.String("operation", "list_trigger_runs"),
154195
zap.String("namespace", pipeline.Namespace),
155196
zap.String("name", pipeline.Name))
197+
IncCascadeDeleteError(pipeline.Namespace, pipeline.Name, reasonListError)
156198
return ctrl.Result{}, fmt.Errorf("list trigger runs for pipeline %s/%s: %w", pipeline.Namespace, pipeline.Name, err)
157199
}
158200

@@ -163,71 +205,90 @@ func (r *Reconciler) handleDeletion(ctx context.Context, pipeline *v2pb.Pipeline
163205
zap.String("operation", "list_pipeline_runs"),
164206
zap.String("namespace", pipeline.Namespace),
165207
zap.String("name", pipeline.Name))
208+
IncCascadeDeleteError(pipeline.Namespace, pipeline.Name, reasonListError)
166209
return ctrl.Result{}, fmt.Errorf("list pipeline runs for pipeline %s/%s: %w", pipeline.Namespace, pipeline.Name, err)
167210
}
168211

169212
if len(triggerRuns) == 0 && len(pipelineRuns) == 0 {
170213
logger.Info("No children found, removing finalizer")
171214
controllerutil.RemoveFinalizer(pipeline, api.PipelineFinalizer)
172-
if updateErr := r.Update(ctx, pipeline, &metav1.UpdateOptions{}); updateErr != nil {
215+
IncCascadeDeleteCompleted(pipeline.Namespace, pipeline.Name)
216+
if err := r.Update(ctx, pipeline, &metav1.UpdateOptions{}); err != nil {
173217
logger.Error("Failed to remove finalizer after cascade delete",
174-
zap.Error(updateErr),
218+
zap.Error(err),
175219
zap.String("operation", "remove_finalizer"),
176220
zap.String("namespace", pipeline.Namespace),
177221
zap.String("name", pipeline.Name))
178-
return ctrl.Result{}, fmt.Errorf("remove finalizer on pipeline %s/%s: %w", pipeline.Namespace, pipeline.Name, updateErr)
222+
IncCascadeDeleteError(pipeline.Namespace, pipeline.Name, reasonUpdateError)
223+
return ctrl.Result{}, fmt.Errorf("remove finalizer on pipeline %s/%s: %w", pipeline.Namespace, pipeline.Name, err)
179224
}
180225
return ctrl.Result{}, nil
181226
}
182227

183-
// Kill active TriggerRuns (best-effort)
184-
activeTRs, err := r.triggerRunManager.ListActiveTriggerRunsForPipeline(ctx, pipeline.Namespace, pipeline.Name)
185-
if err != nil {
186-
logger.Error("Failed to list active trigger runs for cascade delete",
187-
zap.Error(err),
188-
zap.String("operation", "list_active_trigger_runs"),
189-
zap.String("namespace", pipeline.Namespace),
190-
zap.String("name", pipeline.Name))
191-
return ctrl.Result{}, fmt.Errorf("list active trigger runs for pipeline %s/%s: %w", pipeline.Namespace, pipeline.Name, err)
192-
}
193-
if len(activeTRs) > 0 {
194-
for _, tr := range activeTRs {
195-
if killErr := r.triggerRunManager.KillTriggerRun(ctx, tr); killErr != nil {
196-
logger.Error("Failed to kill trigger run during cascade delete",
197-
zap.Error(killErr),
198-
zap.String("operation", "kill_trigger_run"),
199-
zap.String("namespace", tr.Namespace),
200-
zap.String("name", tr.Name))
228+
// Kill active TriggerRuns (best-effort) unless we've already timed out waiting.
229+
if !killTimedOut {
230+
activeTRs, err := r.triggerRunManager.ListActiveTriggerRunsForPipeline(ctx, pipeline.Namespace, pipeline.Name)
231+
if err != nil {
232+
logger.Error("Failed to list active trigger runs for cascade delete",
233+
zap.Error(err),
234+
zap.String("operation", "list_active_trigger_runs"),
235+
zap.String("namespace", pipeline.Namespace),
236+
zap.String("name", pipeline.Name))
237+
IncCascadeDeleteError(pipeline.Namespace, pipeline.Name, reasonListError)
238+
return ctrl.Result{}, fmt.Errorf("list active trigger runs for pipeline %s/%s: %w", pipeline.Namespace, pipeline.Name, err)
239+
}
240+
SetCascadeDeleteActiveChildren(pipeline.Namespace, pipeline.Name, kindTriggerRun, len(activeTRs))
241+
if len(activeTRs) > 0 {
242+
for _, tr := range activeTRs {
243+
if killErr := r.triggerRunManager.KillTriggerRun(ctx, tr); killErr != nil {
244+
logger.Error("Failed to kill trigger run during cascade delete",
245+
zap.Error(killErr),
246+
zap.String("operation", "kill_trigger_run"),
247+
zap.String("namespace", tr.Namespace),
248+
zap.String("name", tr.Name))
249+
}
201250
}
251+
return ctrl.Result{RequeueAfter: reconcileInterval}, nil
202252
}
203-
return ctrl.Result{RequeueAfter: reconcileInterval}, nil
204-
}
205253

206-
// Kill active PipelineRuns (best-effort)
207-
activePRs, err := r.pipelineRunManager.ListActivePipelineRunsForPipeline(ctx, pipeline.Namespace, pipeline.Name)
208-
if err != nil {
209-
logger.Error("Failed to list active pipeline runs for cascade delete",
210-
zap.Error(err),
211-
zap.String("operation", "list_active_pipeline_runs"),
212-
zap.String("namespace", pipeline.Namespace),
213-
zap.String("name", pipeline.Name))
214-
return ctrl.Result{}, fmt.Errorf("list active pipeline runs for pipeline %s/%s: %w", pipeline.Namespace, pipeline.Name, err)
215-
}
216-
if len(activePRs) > 0 {
217-
for _, pr := range activePRs {
218-
if err := r.pipelineRunManager.KillPipelineRun(ctx, pr); err != nil {
219-
logger.Error("Failed to kill pipeline run during cascade delete",
220-
zap.Error(err),
221-
zap.String("operation", "kill_pipeline_run"),
222-
zap.String("namespace", pr.Namespace),
223-
zap.String("name", pr.Name))
254+
// Kill active PipelineRuns (best-effort)
255+
activePRs, err := r.pipelineRunManager.ListActivePipelineRunsForPipeline(ctx, pipeline.Namespace, pipeline.Name)
256+
if err != nil {
257+
logger.Error("Failed to list active pipeline runs for cascade delete",
258+
zap.Error(err),
259+
zap.String("operation", "list_active_pipeline_runs"),
260+
zap.String("namespace", pipeline.Namespace),
261+
zap.String("name", pipeline.Name))
262+
IncCascadeDeleteError(pipeline.Namespace, pipeline.Name, reasonListError)
263+
return ctrl.Result{}, fmt.Errorf("list active pipeline runs for pipeline %s/%s: %w", pipeline.Namespace, pipeline.Name, err)
264+
}
265+
SetCascadeDeleteActiveChildren(pipeline.Namespace, pipeline.Name, kindPipelineRun, len(activePRs))
266+
if len(activePRs) > 0 {
267+
for _, pr := range activePRs {
268+
if killErr := r.pipelineRunManager.KillPipelineRun(ctx, pr); killErr != nil {
269+
logger.Error("Failed to kill pipeline run during cascade delete",
270+
zap.Error(killErr),
271+
zap.String("operation", "kill_pipeline_run"),
272+
zap.String("namespace", pr.Namespace),
273+
zap.String("name", pr.Name))
274+
}
224275
}
276+
return ctrl.Result{RequeueAfter: reconcileInterval}, nil
225277
}
226-
return ctrl.Result{RequeueAfter: reconcileInterval}, nil
278+
} else {
279+
// Kill timed out. Emit a loud warning, record the metric, and proceed with
280+
// forceful deletion so the Pipeline CR is not stuck in Terminating forever.
281+
logger.Warn("cascade delete kill timed out, proceeding with forceful CR deletion",
282+
zap.Duration("timeout", cascadeDeleteKillTimeout),
283+
zap.Int("triggerRuns", len(triggerRuns)),
284+
zap.Int("pipelineRuns", len(pipelineRuns)))
285+
IncCascadeDeleteError(pipeline.Namespace, pipeline.Name, reasonKillTimeout)
227286
}
228287

229-
// All children are terminal — delete them (fatal: error causes requeue)
230-
logger.Info("All children terminal, deleting",
288+
SetCascadeDeleteActiveChildren(pipeline.Namespace, pipeline.Name, kindTriggerRun, 0)
289+
SetCascadeDeleteActiveChildren(pipeline.Namespace, pipeline.Name, kindPipelineRun, 0)
290+
291+
logger.Info("All children terminal (or kill timed out), deleting",
231292
zap.Int("triggerRuns", len(triggerRuns)),
232293
zap.Int("pipelineRuns", len(pipelineRuns)))
233294

@@ -237,6 +298,7 @@ func (r *Reconciler) handleDeletion(ctx context.Context, pipeline *v2pb.Pipeline
237298
zap.String("operation", "delete_all_trigger_runs"),
238299
zap.String("namespace", pipeline.Namespace),
239300
zap.String("name", pipeline.Name))
301+
IncCascadeDeleteError(pipeline.Namespace, pipeline.Name, reasonDeleteError)
240302
return ctrl.Result{}, fmt.Errorf("delete trigger runs for pipeline %s/%s: %w", pipeline.Namespace, pipeline.Name, err)
241303
}
242304
if err := r.pipelineRunManager.DeleteAllPipelineRuns(ctx, pipeline.Namespace, pipeline.Name); err != nil {
@@ -245,6 +307,7 @@ func (r *Reconciler) handleDeletion(ctx context.Context, pipeline *v2pb.Pipeline
245307
zap.String("operation", "delete_all_pipeline_runs"),
246308
zap.String("namespace", pipeline.Namespace),
247309
zap.String("name", pipeline.Name))
310+
IncCascadeDeleteError(pipeline.Namespace, pipeline.Name, reasonDeleteError)
248311
return ctrl.Result{}, fmt.Errorf("delete pipeline runs for pipeline %s/%s: %w", pipeline.Namespace, pipeline.Name, err)
249312
}
250313

@@ -264,18 +327,51 @@ func (r *Reconciler) handleDeletion(ctx context.Context, pipeline *v2pb.Pipeline
264327
}
265328

266329
logger.Info("All children deleted, removing finalizer")
330+
IncCascadeDeleteCompleted(pipeline.Namespace, pipeline.Name)
267331
controllerutil.RemoveFinalizer(pipeline, api.PipelineFinalizer)
268332
if err := r.Update(ctx, pipeline, &metav1.UpdateOptions{}); err != nil {
269333
logger.Error("Failed to remove finalizer after cascade delete",
270334
zap.Error(err),
271335
zap.String("operation", "remove_finalizer"),
272336
zap.String("namespace", pipeline.Namespace),
273337
zap.String("name", pipeline.Name))
338+
IncCascadeDeleteError(pipeline.Namespace, pipeline.Name, reasonUpdateError)
274339
return ctrl.Result{}, fmt.Errorf("remove finalizer on pipeline %s/%s: %w", pipeline.Namespace, pipeline.Name, err)
275340
}
276341
return ctrl.Result{}, nil
277342
}
278343

344+
// hasCascadeStartedAt reports whether the cascade-delete-started-at annotation is set.
345+
func hasCascadeStartedAt(pipeline *v2pb.Pipeline) bool {
346+
_, ok := pipeline.GetAnnotations()[cascadeDeleteStartedAtAnnotation]
347+
return ok
348+
}
349+
350+
// setCascadeStartedAt stamps the pipeline with the current time in RFC3339Nano.
351+
func setCascadeStartedAt(pipeline *v2pb.Pipeline, t time.Time) {
352+
ann := pipeline.GetAnnotations()
353+
if ann == nil {
354+
ann = map[string]string{}
355+
}
356+
ann[cascadeDeleteStartedAtAnnotation] = t.UTC().Format(time.RFC3339Nano)
357+
pipeline.SetAnnotations(ann)
358+
}
359+
360+
// getCascadeStartedAt returns the parsed started-at timestamp, or the current
361+
// time if the annotation is missing or malformed (which effectively prevents a
362+
// false timeout on the first pass that writes it).
363+
func getCascadeStartedAt(pipeline *v2pb.Pipeline) time.Time {
364+
v, ok := pipeline.GetAnnotations()[cascadeDeleteStartedAtAnnotation]
365+
if !ok {
366+
return time.Now()
367+
}
368+
t, err := time.Parse(time.RFC3339Nano, v)
369+
if err != nil {
370+
return time.Now()
371+
}
372+
return t
373+
}
374+
279375
// formatRevisionName generates a standardized revision name for a pipeline.
280376
//
281377
// The name format is: "pipeline-{lowercase-pipeline-name}-{git-ref-prefix}"

0 commit comments

Comments
 (0)