From 26030c31fbc264101e3c9ab56aba5abbd19c1da4 Mon Sep 17 00:00:00 2001 From: davidlin20dev Date: Wed, 17 Jun 2026 17:09:57 -0700 Subject: [PATCH] feat(executor): add garbage collector Prometheus metrics Emit objects_deleted, deletion_errors, and sweep_duration under a dedicated executor:gc: sub-scope, created once in the constructor and updated during each collect() sweep. Unit tests assert the counters move and the sweep is timed. Part of #7455. Signed-off-by: davidlin20dev --- executor/pkg/controller/garbage_collector.go | 30 +++++- .../garbage_collector_metrics_test.go | 97 +++++++++++++++++++ .../pkg/controller/garbage_collector_test.go | 9 +- executor/setup.go | 2 +- go.mod | 2 +- 5 files changed, 133 insertions(+), 7 deletions(-) create mode 100644 executor/pkg/controller/garbage_collector_metrics_test.go diff --git a/executor/pkg/controller/garbage_collector.go b/executor/pkg/controller/garbage_collector.go index 8429fb471b..3f709f0412 100644 --- a/executor/pkg/controller/garbage_collector.go +++ b/executor/pkg/controller/garbage_collector.go @@ -4,26 +4,48 @@ import ( "context" "time" + "github.com/prometheus/client_golang/prometheus" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" flyteorgv1 "github.com/flyteorg/flyte/v2/executor/api/v1" + "github.com/flyteorg/flyte/v2/flytestdlib/promutils" ) +// gcMetrics holds the Prometheus instruments for the garbage collector. They are +// created once (in newGCMetrics) and only updated thereafter, never re-registered. +type gcMetrics struct { + deleted prometheus.Counter + errors prometheus.Counter + sweepTime promutils.StopWatch +} + +// newGCMetrics builds the garbage collector instruments under the given scope. +// Call exactly once per scope to avoid duplicate registration panics. +func newGCMetrics(scope promutils.Scope) gcMetrics { + return gcMetrics{ + deleted: scope.MustNewCounter("objects_deleted", "Total TaskActions deleted by the garbage collector"), + errors: scope.MustNewCounter("deletion_errors", "Total errors encountered while deleting expired TaskActions"), + sweepTime: scope.MustNewStopWatch("sweep_duration", "Duration of a full garbage collection sweep", time.Millisecond), + } +} + // GarbageCollector periodically deletes terminal TaskActions that have exceeded their TTL. // It implements the controller-runtime manager.Runnable interface. type GarbageCollector struct { client client.Client interval time.Duration maxTTL time.Duration + metrics gcMetrics } // NewGarbageCollector creates a new GarbageCollector. -func NewGarbageCollector(c client.Client, interval, maxTTL time.Duration) *GarbageCollector { +func NewGarbageCollector(c client.Client, interval, maxTTL time.Duration, scope promutils.Scope) *GarbageCollector { return &GarbageCollector{ client: c, interval: interval, maxTTL: maxTTL, + metrics: newGCMetrics(scope), } } @@ -55,6 +77,10 @@ const gcPageSize = 500 func (gc *GarbageCollector) collect(ctx context.Context) error { logger := log.FromContext(ctx).WithName("gc") + // Time the full sweep, defer guarantees it records on every return path. + timer := gc.metrics.sweepTime.Start() + defer timer.Stop() + cutoff := time.Now().UTC().Add(-gc.maxTTL).Format(labelTimeFormat) deleted := 0 total := 0 @@ -87,11 +113,13 @@ func (gc *GarbageCollector) collect(ctx context.Context) error { // The minute-precision format is lexicographically ordered, so string comparison works. if completedTime < cutoff { if err := gc.client.Delete(ctx, ta); err != nil { + gc.metrics.errors.Inc() logger.Error(err, "failed to delete expired TaskAction", "name", ta.Name, "namespace", ta.Namespace, "completedTime", completedTime) continue } deleted++ + gc.metrics.deleted.Inc() } } diff --git a/executor/pkg/controller/garbage_collector_metrics_test.go b/executor/pkg/controller/garbage_collector_metrics_test.go new file mode 100644 index 0000000000..e8bc8b39a2 --- /dev/null +++ b/executor/pkg/controller/garbage_collector_metrics_test.go @@ -0,0 +1,97 @@ +package controller + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" + + flyteorgv1 "github.com/flyteorg/flyte/v2/executor/api/v1" + "github.com/flyteorg/flyte/v2/flytestdlib/promutils" +) + +// newExpiredTaskAction builds a terminated TaskAction whose completed time is well past +// any sane maxTTL, so the GC will treat it as expired and delete it. +func newExpiredTaskAction() *flyteorgv1.TaskAction { + expired := time.Now().UTC().Add(-2 * time.Hour).Format(labelTimeFormat) + return &flyteorgv1.TaskAction{ + ObjectMeta: metav1.ObjectMeta{ + Name: "gc-expired", + Namespace: "default", + Labels: map[string]string{ + LabelTerminationStatus: LabelValueTerminated, + LabelCompletedTime: expired, + }, + }, + } +} + +// newGCTestClient returns an in-memory fake client seeded with one expired TaskAction. +// If deleteErr is non-nil, every Delete fails with it, so we can exercise the error path. +func newGCTestClient(t *testing.T, deleteErr error) client.Client { + scheme := runtime.NewScheme() + require.NoError(t, flyteorgv1.AddToScheme(scheme)) + builder := fake.NewClientBuilder().WithScheme(scheme).WithObjects(newExpiredTaskAction()) + if deleteErr != nil { + builder = builder.WithInterceptorFuncs(interceptor.Funcs{ + Delete: func(context.Context, client.WithWatch, client.Object, ...client.DeleteOption) error { + return deleteErr + }, + }) + } + return builder.Build() +} + +// sweepObservations reports how many times the sweep_duration stopwatch has recorded. +// sweep_duration is a Summary, so testutil.ToFloat64 can not read it. +// Instead, we collect the metric and read its sample count directly. +func sweepObservations(t *testing.T, gc *GarbageCollector) uint64 { + t.Helper() + ch := make(chan prometheus.Metric, 1) + gc.metrics.sweepTime.Observer.(prometheus.Collector).Collect(ch) + close(ch) + var m dto.Metric + require.NoError(t, (<-ch).Write(&m)) + return m.GetSummary().GetSampleCount() +} + +// TestGarbageCollectorDeletedMetric: a successful delete moves objects_deleted +// from 0 to 1, records one sweep, and leaves deletion_errors at 0. +func TestGarbageCollectorDeletedMetric(t *testing.T) { + gc := NewGarbageCollector(newGCTestClient(t, nil), time.Minute, time.Hour, promutils.NewTestScope()) + + require.Equal(t, 0.0, testutil.ToFloat64(gc.metrics.deleted)) + require.Equal(t, 0.0, testutil.ToFloat64(gc.metrics.errors)) + + require.NoError(t, gc.collect(context.Background())) + + require.Equal(t, 1.0, testutil.ToFloat64(gc.metrics.deleted)) + require.Equal(t, 0.0, testutil.ToFloat64(gc.metrics.errors)) + require.GreaterOrEqual(t, sweepObservations(t, gc), uint64(1)) +} + +// TestGarbageCollectorDeletionErrorMetric: when a delete fails, deletion_errors moves +// 0 to 1 and objects_deleted stays at 0. +func TestGarbageCollectorDeletionErrorMetric(t *testing.T) { + gc := NewGarbageCollector( + newGCTestClient(t, errors.New("simulated delete failure")), + time.Minute, time.Hour, promutils.NewTestScope(), + ) + + require.Equal(t, 0.0, testutil.ToFloat64(gc.metrics.errors)) + + require.NoError(t, gc.collect(context.Background())) + + require.Equal(t, 1.0, testutil.ToFloat64(gc.metrics.errors)) + require.Equal(t, 0.0, testutil.ToFloat64(gc.metrics.deleted)) +} diff --git a/executor/pkg/controller/garbage_collector_test.go b/executor/pkg/controller/garbage_collector_test.go index c2a967726e..ec6333539d 100644 --- a/executor/pkg/controller/garbage_collector_test.go +++ b/executor/pkg/controller/garbage_collector_test.go @@ -11,6 +11,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" flyteorgv1 "github.com/flyteorg/flyte/v2/executor/api/v1" + "github.com/flyteorg/flyte/v2/flytestdlib/promutils" ) func createTaskAction(ctx context.Context, name string, labels map[string]string) *flyteorgv1.TaskAction { @@ -62,7 +63,7 @@ var _ = Describe("GarbageCollector", func() { LabelCompletedTime: expiredTime, }) - gc := NewGarbageCollector(k8sClient, 1*time.Minute, 1*time.Hour) + gc := NewGarbageCollector(k8sClient, 1*time.Minute, 1*time.Hour, promutils.NewTestScope()) Expect(gc.collect(ctx)).To(Succeed()) ta := &flyteorgv1.TaskAction{} @@ -78,7 +79,7 @@ var _ = Describe("GarbageCollector", func() { LabelCompletedTime: recentTime, }) - gc := NewGarbageCollector(k8sClient, 1*time.Minute, 1*time.Hour) + gc := NewGarbageCollector(k8sClient, 1*time.Minute, 1*time.Hour, promutils.NewTestScope()) Expect(gc.collect(ctx)).To(Succeed()) ta := &flyteorgv1.TaskAction{} @@ -89,7 +90,7 @@ var _ = Describe("GarbageCollector", func() { It("should retain non-terminated TaskActions", func() { createTaskAction(ctx, "gc-active", nil) - gc := NewGarbageCollector(k8sClient, 1*time.Minute, 1*time.Hour) + gc := NewGarbageCollector(k8sClient, 1*time.Minute, 1*time.Hour, promutils.NewTestScope()) Expect(gc.collect(ctx)).To(Succeed()) ta := &flyteorgv1.TaskAction{} @@ -98,7 +99,7 @@ var _ = Describe("GarbageCollector", func() { }) It("should handle empty list gracefully", func() { - gc := NewGarbageCollector(k8sClient, 1*time.Minute, 1*time.Hour) + gc := NewGarbageCollector(k8sClient, 1*time.Minute, 1*time.Hour, promutils.NewTestScope()) Expect(gc.collect(ctx)).To(Succeed()) }) }) diff --git a/executor/setup.go b/executor/setup.go index ef389c15b7..b80d8c7b34 100644 --- a/executor/setup.go +++ b/executor/setup.go @@ -189,7 +189,7 @@ func Setup(ctx context.Context, sc *app.SetupContext) error { if cfg.GC.MaxTTL.Duration <= 0 { return fmt.Errorf("executor: gc.maxTTL must be positive when gc is enabled, got %v", cfg.GC.MaxTTL.Duration) } - gc := controller.NewGarbageCollector(mgr.GetClient(), cfg.GC.Interval.Duration, cfg.GC.MaxTTL.Duration) + gc := controller.NewGarbageCollector(mgr.GetClient(), cfg.GC.Interval.Duration, cfg.GC.MaxTTL.Duration, executorScope.NewSubScope("gc")) if err := mgr.Add(gc); err != nil { return fmt.Errorf("executor: failed to add garbage collector: %w", err) } diff --git a/go.mod b/go.mod index bec4e2a71a..b7020cd694 100644 --- a/go.mod +++ b/go.mod @@ -96,6 +96,7 @@ require ( require ( github.com/Masterminds/semver/v3 v3.5.0 github.com/alicebob/miniredis/v2 v2.38.0 + github.com/prometheus/client_model v0.6.2 ) require ( @@ -189,7 +190,6 @@ require ( github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect - github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/procfs v0.20.1 // indirect github.com/spf13/afero v1.15.0 // indirect github.com/spf13/cast v1.7.1 // indirect