Skip to content

Commit c52415a

Browse files
ishtmeet.singhishtoo1
authored andcommitted
feat: wire managers and add cascade delete skeleton
Summary: Intent: - Wire TriggerRunManager and PipelineRunManager into the pipeline controller - Add handleDeletion method with no-children path (removes finalizer immediately) Changes: - Add triggerRunManager and pipelineRunManager fields to Reconciler struct - Construct managers in Register after handler creation - Replace deletion stub with handleDeletion that lists children and removes finalizer if none - Register field indexes on fake client for tests - Add TestCascadeDelete_NoChildren test Test Plan: - go test ./components/pipeline/... -v -count=1 (all tests pass) Revert Plan: Revert this PR via git revert. Jira Issues:
1 parent 8ae3459 commit c52415a

3 files changed

Lines changed: 259 additions & 22 deletions

File tree

go/components/pipeline/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ go_library(
1313
"//go/api:go_default_library",
1414
"//go/api/handler:go_default_library",
1515
"//go/base/env:go_default_library",
16+
"//go/components/pipelinerun:go_default_library",
17+
"//go/components/triggerrun:go_default_library",
1618
"//proto/api:go_default_library",
1719
"//proto/api/v2:go_default_library",
1820
"@com_github_prometheus_client_golang//prometheus:go_default_library",
@@ -35,6 +37,8 @@ go_test(
3537
"//go/api:go_default_library",
3638
"//go/api/handler:go_default_library",
3739
"//go/base/env:go_default_library",
40+
"//go/components/pipelinerun:go_default_library",
41+
"//go/components/triggerrun:go_default_library",
3842
"//proto/api:go_default_library",
3943
"//proto/api/v2:go_default_library",
4044
"@com_github_stretchr_testify//require:go_default_library",

go/components/pipeline/controller.go

Lines changed: 59 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"github.com/michelangelo-ai/michelangelo/go/api"
2222
apiHandler "github.com/michelangelo-ai/michelangelo/go/api/handler"
2323
"github.com/michelangelo-ai/michelangelo/go/base/env"
24+
"github.com/michelangelo-ai/michelangelo/go/components/pipelinerun"
25+
"github.com/michelangelo-ai/michelangelo/go/components/triggerrun"
2426
apipb "github.com/michelangelo-ai/michelangelo/proto-go/api"
2527
v2pb "github.com/michelangelo-ai/michelangelo/proto-go/api/v2"
2628
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -41,9 +43,11 @@ const (
4143
// operations and maintains environment context and logging capabilities.
4244
type Reconciler struct {
4345
api.Handler
44-
env env.Context
45-
logger *zap.Logger
46-
apiHandlerFactory apiHandler.Factory
46+
env env.Context
47+
logger *zap.Logger
48+
apiHandlerFactory apiHandler.Factory
49+
triggerRunManager triggerrun.Manager
50+
pipelineRunManager pipelinerun.Manager
4751
}
4852

4953
// Reconcile is the main reconciliation loop entry point for Pipeline resources.
@@ -75,14 +79,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
7579
}
7680
}
7781
} else {
78-
// Pipeline is being deleted — remove finalizer to allow deletion
79-
// Cascade delete logic will be added in a subsequent PR
80-
logger.Info("Pipeline is being deleted, removing finalizer")
81-
controllerutil.RemoveFinalizer(pipeline, api.PipelineFinalizer)
82-
if err := r.Update(ctx, pipeline, &metav1.UpdateOptions{}); err != nil {
83-
return ctrl.Result{}, fmt.Errorf("remove pipeline finalizer: %w", err)
84-
}
85-
return ctrl.Result{}, nil
82+
return r.handleDeletion(ctx, pipeline, logger)
8683
}
8784

8885
originalPipeline := pipeline.DeepCopy()
@@ -139,6 +136,55 @@ func (r *Reconciler) updatePipelineStatus(ctx context.Context, pipeline *v2pb.Pi
139136
return result, nil
140137
}
141138

139+
func (r *Reconciler) handleDeletion(ctx context.Context, pipeline *v2pb.Pipeline, logger *zap.Logger) (ctrl.Result, error) {
140+
// If the finalizer is not present we don't own this deletion; nothing to cascade.
141+
// Avoids wasted list/kill/delete work on pipelines that pre-date the finalizer rollout.
142+
if !controllerutil.ContainsFinalizer(pipeline, api.PipelineFinalizer) {
143+
return ctrl.Result{}, nil
144+
}
145+
logger.Info("Pipeline is being deleted, starting cascade delete")
146+
147+
triggerRuns, err := r.triggerRunManager.ListTriggerRunsForPipeline(ctx, pipeline.Namespace, pipeline.Name)
148+
if err != nil {
149+
logger.Error("Failed to list trigger runs for cascade delete",
150+
zap.Error(err),
151+
zap.String("operation", "list_trigger_runs"),
152+
zap.String("namespace", pipeline.Namespace),
153+
zap.String("name", pipeline.Name))
154+
return ctrl.Result{}, fmt.Errorf("list trigger runs for pipeline %s/%s: %w", pipeline.Namespace, pipeline.Name, err)
155+
}
156+
157+
pipelineRuns, err := r.pipelineRunManager.ListPipelineRunsForPipeline(ctx, pipeline.Namespace, pipeline.Name)
158+
if err != nil {
159+
logger.Error("Failed to list pipeline runs for cascade delete",
160+
zap.Error(err),
161+
zap.String("operation", "list_pipeline_runs"),
162+
zap.String("namespace", pipeline.Namespace),
163+
zap.String("name", pipeline.Name))
164+
return ctrl.Result{}, fmt.Errorf("list pipeline runs for pipeline %s/%s: %w", pipeline.Namespace, pipeline.Name, err)
165+
}
166+
167+
if len(triggerRuns) == 0 && len(pipelineRuns) == 0 {
168+
logger.Info("No children found, removing finalizer")
169+
controllerutil.RemoveFinalizer(pipeline, api.PipelineFinalizer)
170+
if updateErr := r.Update(ctx, pipeline, &metav1.UpdateOptions{}); updateErr != nil {
171+
logger.Error("Failed to remove finalizer after cascade delete",
172+
zap.Error(updateErr),
173+
zap.String("operation", "remove_finalizer"),
174+
zap.String("namespace", pipeline.Namespace),
175+
zap.String("name", pipeline.Name))
176+
return ctrl.Result{}, fmt.Errorf("remove finalizer on pipeline %s/%s: %w", pipeline.Namespace, pipeline.Name, updateErr)
177+
}
178+
return ctrl.Result{}, nil
179+
}
180+
181+
// Kill and delete steps will be added in subsequent PRs
182+
logger.Info("Children found, requeueing for cascade delete",
183+
zap.Int("triggerRuns", len(triggerRuns)),
184+
zap.Int("pipelineRuns", len(pipelineRuns)))
185+
return ctrl.Result{RequeueAfter: reconcileInterval}, nil
186+
}
187+
142188
// formatRevisionName generates a standardized revision name for a pipeline.
143189
//
144190
// The name format is: "pipeline-{lowercase-pipeline-name}-{git-ref-prefix}"
@@ -176,6 +222,8 @@ func (r *Reconciler) Register(mgr ctrl.Manager) error {
176222
return err
177223
}
178224
r.Handler = handler
225+
r.triggerRunManager = triggerrun.NewManager(handler, r.logger)
226+
r.pipelineRunManager = pipelinerun.NewManager(handler, r.logger)
179227
return ctrl.NewControllerManagedBy(mgr).
180228
For(&v2pb.Pipeline{}).
181229
Complete(r)

go/components/pipeline/controller_test.go

Lines changed: 196 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package pipeline
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"testing"
78

89
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -16,6 +17,8 @@ import (
1617
"github.com/michelangelo-ai/michelangelo/go/api"
1718
apiHandler "github.com/michelangelo-ai/michelangelo/go/api/handler"
1819
"github.com/michelangelo-ai/michelangelo/go/base/env"
20+
"github.com/michelangelo-ai/michelangelo/go/components/pipelinerun"
21+
"github.com/michelangelo-ai/michelangelo/go/components/triggerrun"
1922
apipb "github.com/michelangelo-ai/michelangelo/proto-go/api"
2023
v2pb "github.com/michelangelo-ai/michelangelo/proto-go/api/v2"
2124
"go.uber.org/zap/zaptest"
@@ -268,11 +271,16 @@ func TestReconcile_RemovesFinalizerOnDelete(t *testing.T) {
268271
}
269272
}
270273

271-
// updateErroringHandler wraps an api.Handler and returns a configured error
272-
// from Update. Used to exercise finalizer Update error branches.
274+
// updateErroringHandler wraps an api.Handler and returns configured errors
275+
// from List/Update. Used to exercise finalizer and cascade-delete error branches.
273276
type updateErroringHandler struct {
274277
api.Handler
275-
updateErr error
278+
updateErr error
279+
listErr error
280+
// listErrForType, when non-nil, only fails List when the list object is of
281+
// the given type (e.g. "*v2pb.TriggerRunList"). This lets us assert the
282+
// controller surfaces the exact failure path we expect.
283+
listErrForType string
276284
}
277285

278286
func (u *updateErroringHandler) Update(ctx context.Context, obj client.Object, opts *metav1.UpdateOptions) error {
@@ -282,13 +290,33 @@ func (u *updateErroringHandler) Update(ctx context.Context, obj client.Object, o
282290
return u.Handler.Update(ctx, obj, opts)
283291
}
284292

293+
func (u *updateErroringHandler) List(ctx context.Context, namespace string, opts *metav1.ListOptions, listOptionsExt *apipb.ListOptionsExt, list client.ObjectList) error {
294+
if u.listErr != nil && (u.listErrForType == "" || u.listErrForType == fmt.Sprintf("%T", list)) {
295+
return u.listErr
296+
}
297+
return u.Handler.List(ctx, namespace, opts, listOptionsExt, list)
298+
}
299+
285300
func setUpReconcilerWithUpdateErr(t *testing.T, initialObjects []client.Object, updateErr error) *Reconciler {
301+
return setUpReconcilerWithErrors(t, initialObjects, updateErr, nil, "")
302+
}
303+
304+
func setUpReconcilerWithErrors(t *testing.T, initialObjects []client.Object, updateErr, listErr error, listErrForType string) *Reconciler {
286305
scheme := runtime.NewScheme()
287306
require.NoError(t, v2pb.AddToScheme(scheme))
288307
k8sClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(initialObjects...).WithStatusSubresource(initialObjects...).Build()
308+
logger := zaptest.NewLogger(t)
309+
handler := &updateErroringHandler{
310+
Handler: apiHandler.NewFakeAPIHandler(k8sClient),
311+
updateErr: updateErr,
312+
listErr: listErr,
313+
listErrForType: listErrForType,
314+
}
289315
return &Reconciler{
290-
Handler: &updateErroringHandler{Handler: apiHandler.NewFakeAPIHandler(k8sClient), updateErr: updateErr},
291-
logger: zaptest.NewLogger(t),
316+
Handler: handler,
317+
logger: logger,
318+
triggerRunManager: triggerrun.NewManager(handler, logger),
319+
pipelineRunManager: pipelinerun.NewManager(handler, logger),
292320
}
293321
}
294322

@@ -313,7 +341,120 @@ func TestReconcile_AddFinalizer_UpdateError(t *testing.T) {
313341
require.Contains(t, err.Error(), "add pipeline finalizer")
314342
}
315343

316-
func TestReconcile_RemoveFinalizer_UpdateError(t *testing.T) {
344+
// NOTE: Error coverage for the delete path is provided by
345+
// TestCascadeDelete_RemoveFinalizer_UpdateError below, which exercises the
346+
// handleDeletion error wrapping introduced in this PR.
347+
348+
func TestCascadeDelete_NoChildren(t *testing.T) {
349+
now := metav1.Now()
350+
pipeline := &v2pb.Pipeline{
351+
ObjectMeta: metav1.ObjectMeta{
352+
Name: "test-pipeline",
353+
Namespace: "test-namespace",
354+
Finalizers: []string{api.PipelineFinalizer},
355+
DeletionTimestamp: &now,
356+
},
357+
Spec: v2pb.PipelineSpec{
358+
Commit: &v2pb.CommitInfo{GitRef: "abc123", Branch: "main"},
359+
},
360+
}
361+
reconciler := setUpReconciler(t, []client.Object{pipeline}, env.Context{})
362+
363+
result, err := reconciler.Reconcile(context.Background(), ctrl.Request{
364+
NamespacedName: types.NamespacedName{Name: "test-pipeline", Namespace: "test-namespace"},
365+
})
366+
require.NoError(t, err)
367+
require.Equal(t, ctrl.Result{}, result)
368+
}
369+
370+
func TestCascadeDelete_FinalizerAbsent(t *testing.T) {
371+
// Pipeline with a DeletionTimestamp but not our finalizer must not be
372+
// cascaded. handleDeletion returns immediately without listing children.
373+
// The fake client requires at least one finalizer when DeletionTimestamp
374+
// is set, so we attach an unrelated finalizer.
375+
now := metav1.Now()
376+
pipeline := &v2pb.Pipeline{
377+
ObjectMeta: metav1.ObjectMeta{
378+
Name: "test-pipeline",
379+
Namespace: "test-namespace",
380+
Finalizers: []string{"unrelated/finalizer"},
381+
DeletionTimestamp: &now,
382+
},
383+
Spec: v2pb.PipelineSpec{
384+
Commit: &v2pb.CommitInfo{GitRef: "abc123", Branch: "main"},
385+
},
386+
}
387+
// Seed a TR that would normally be killed. handleDeletion must not touch it.
388+
tr := &v2pb.TriggerRun{
389+
ObjectMeta: metav1.ObjectMeta{Name: "tr-running", Namespace: "test-namespace"},
390+
Spec: v2pb.TriggerRunSpec{
391+
Pipeline: &apipb.ResourceIdentifier{Name: "test-pipeline", Namespace: "test-namespace"},
392+
},
393+
Status: v2pb.TriggerRunStatus{State: v2pb.TRIGGER_RUN_STATE_RUNNING},
394+
}
395+
reconciler := setUpReconciler(t, []client.Object{pipeline, tr}, env.Context{})
396+
397+
result, err := reconciler.Reconcile(context.Background(), ctrl.Request{
398+
NamespacedName: types.NamespacedName{Name: "test-pipeline", Namespace: "test-namespace"},
399+
})
400+
require.NoError(t, err)
401+
require.Equal(t, ctrl.Result{}, result)
402+
403+
untouched := &v2pb.TriggerRun{}
404+
require.NoError(t, reconciler.Get(context.Background(), "test-namespace", "tr-running", &metav1.GetOptions{}, untouched))
405+
require.NotEqual(t, v2pb.TRIGGER_RUN_ACTION_KILL, untouched.Spec.Action)
406+
require.False(t, untouched.Spec.Kill)
407+
}
408+
409+
func TestCascadeDelete_ListTriggerRunsError(t *testing.T) {
410+
now := metav1.Now()
411+
pipeline := &v2pb.Pipeline{
412+
ObjectMeta: metav1.ObjectMeta{
413+
Name: "test-pipeline",
414+
Namespace: "test-namespace",
415+
Finalizers: []string{api.PipelineFinalizer},
416+
DeletionTimestamp: &now,
417+
},
418+
Spec: v2pb.PipelineSpec{
419+
Commit: &v2pb.CommitInfo{GitRef: "abc123", Branch: "main"},
420+
},
421+
}
422+
listErr := errors.New("list tr boom")
423+
reconciler := setUpReconcilerWithErrors(t, []client.Object{pipeline}, nil, listErr, "*v2.TriggerRunList")
424+
425+
_, err := reconciler.Reconcile(context.Background(), ctrl.Request{
426+
NamespacedName: types.NamespacedName{Name: "test-pipeline", Namespace: "test-namespace"},
427+
})
428+
require.Error(t, err)
429+
require.ErrorIs(t, err, listErr)
430+
require.Contains(t, err.Error(), "list trigger runs for pipeline test-namespace/test-pipeline")
431+
}
432+
433+
func TestCascadeDelete_ListPipelineRunsError(t *testing.T) {
434+
now := metav1.Now()
435+
pipeline := &v2pb.Pipeline{
436+
ObjectMeta: metav1.ObjectMeta{
437+
Name: "test-pipeline",
438+
Namespace: "test-namespace",
439+
Finalizers: []string{api.PipelineFinalizer},
440+
DeletionTimestamp: &now,
441+
},
442+
Spec: v2pb.PipelineSpec{
443+
Commit: &v2pb.CommitInfo{GitRef: "abc123", Branch: "main"},
444+
},
445+
}
446+
listErr := errors.New("list pr boom")
447+
reconciler := setUpReconcilerWithErrors(t, []client.Object{pipeline}, nil, listErr, "*v2.PipelineRunList")
448+
449+
_, err := reconciler.Reconcile(context.Background(), ctrl.Request{
450+
NamespacedName: types.NamespacedName{Name: "test-pipeline", Namespace: "test-namespace"},
451+
})
452+
require.Error(t, err)
453+
require.ErrorIs(t, err, listErr)
454+
require.Contains(t, err.Error(), "list pipeline runs for pipeline test-namespace/test-pipeline")
455+
}
456+
457+
func TestCascadeDelete_RemoveFinalizer_UpdateError(t *testing.T) {
317458
now := metav1.Now()
318459
pipeline := &v2pb.Pipeline{
319460
ObjectMeta: metav1.ObjectMeta{
@@ -327,24 +468,68 @@ func TestReconcile_RemoveFinalizer_UpdateError(t *testing.T) {
327468
},
328469
}
329470
updateErr := errors.New("update boom")
330-
reconciler := setUpReconcilerWithUpdateErr(t, []client.Object{pipeline}, updateErr)
471+
reconciler := setUpReconcilerWithErrors(t, []client.Object{pipeline}, updateErr, nil, "")
331472

332473
_, err := reconciler.Reconcile(context.Background(), ctrl.Request{
333474
NamespacedName: types.NamespacedName{Name: "test-pipeline", Namespace: "test-namespace"},
334475
})
335476
require.Error(t, err)
336477
require.ErrorIs(t, err, updateErr)
337-
require.Contains(t, err.Error(), "remove pipeline finalizer")
478+
require.Contains(t, err.Error(), "remove finalizer on pipeline test-namespace/test-pipeline")
479+
}
480+
481+
func TestCascadeDelete_WithChildrenRequeues(t *testing.T) {
482+
// When children exist, handleDeletion does not remove the finalizer; it
483+
// requeues after reconcileInterval so a subsequent PR can perform kill/delete.
484+
now := metav1.Now()
485+
pipeline := &v2pb.Pipeline{
486+
ObjectMeta: metav1.ObjectMeta{
487+
Name: "test-pipeline",
488+
Namespace: "test-namespace",
489+
Finalizers: []string{api.PipelineFinalizer},
490+
DeletionTimestamp: &now,
491+
},
492+
Spec: v2pb.PipelineSpec{
493+
Commit: &v2pb.CommitInfo{GitRef: "abc123", Branch: "main"},
494+
},
495+
}
496+
tr := &v2pb.TriggerRun{
497+
ObjectMeta: metav1.ObjectMeta{Name: "tr-running", Namespace: "test-namespace"},
498+
Spec: v2pb.TriggerRunSpec{
499+
Pipeline: &apipb.ResourceIdentifier{Name: "test-pipeline", Namespace: "test-namespace"},
500+
},
501+
Status: v2pb.TriggerRunStatus{State: v2pb.TRIGGER_RUN_STATE_RUNNING},
502+
}
503+
reconciler := setUpReconciler(t, []client.Object{pipeline, tr}, env.Context{})
504+
505+
result, err := reconciler.Reconcile(context.Background(), ctrl.Request{
506+
NamespacedName: types.NamespacedName{Name: "test-pipeline", Namespace: "test-namespace"},
507+
})
508+
require.NoError(t, err)
509+
require.Equal(t, ctrl.Result{RequeueAfter: reconcileInterval}, result)
510+
511+
// Finalizer should NOT have been removed yet.
512+
updated := &v2pb.Pipeline{}
513+
require.NoError(t, reconciler.Get(context.Background(), "test-namespace", "test-pipeline", &metav1.GetOptions{}, updated))
514+
require.True(t, controllerutil.ContainsFinalizer(updated, api.PipelineFinalizer))
338515
}
339516

340517
func setUpReconciler(t *testing.T, initialObjects []client.Object, env env.Context) *Reconciler {
341518
scheme := runtime.NewScheme()
342519
err := v2pb.AddToScheme(scheme)
343520
require.NoError(t, err)
344-
k8sClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(initialObjects...).WithStatusSubresource(initialObjects...).Build()
521+
k8sClient := fake.NewClientBuilder().
522+
WithScheme(scheme).
523+
WithObjects(initialObjects...).
524+
WithStatusSubresource(initialObjects...).
525+
Build()
526+
logger := zaptest.NewLogger(t)
527+
handler := apiHandler.NewFakeAPIHandler(k8sClient)
345528
reconciler := &Reconciler{
346-
Handler: apiHandler.NewFakeAPIHandler(k8sClient),
347-
logger: zaptest.NewLogger(t),
529+
Handler: handler,
530+
logger: logger,
531+
triggerRunManager: triggerrun.NewManager(handler, logger),
532+
pipelineRunManager: pipelinerun.NewManager(handler, logger),
348533
}
349534
return reconciler
350535
}

0 commit comments

Comments
 (0)