From 4d255250550df7afb149e91602f7011ebf9754bc Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Tue, 9 Jun 2026 22:24:19 -0700 Subject: [PATCH 1/4] Demote steady-state discovery and per-task logs that spam at INFO/ERROR Connector discovery rebuilds its registry every pollInterval (10s) and logged the full cycle at INFO each time; the k8s resolver watcher also dumped entire Endpoints objects via an unformatted %v. The aggregate supported-task-types line now logs only when the set changes; the rest moves to Debug. Also demotes per-task resource-override dumps, the contentMD5 cast miss, and the containerStatus IndexOutOfBound error that fires for every Pending pod, and attaches the task ID to the webapi phase-transition log. Co-Authored-By: Claude Opus 4.8 (1M context) Signed-off-by: Paul Dittamo --- flyteplugins/go/tasks/logs/logging_utils.go | 2 +- .../pluginmachinery/flytek8s/container_helper.go | 4 ++-- .../pluginmachinery/internal/webapi/monitor.go | 2 +- .../go/tasks/plugins/webapi/connector/client.go | 4 ++-- .../go/tasks/plugins/webapi/connector/plugin.go | 13 ++++++++++++- flytestdlib/resolver/k8s_resolver.go | 6 +++--- flytestdlib/storage/stow_store.go | 2 +- 7 files changed, 22 insertions(+), 11 deletions(-) diff --git a/flyteplugins/go/tasks/logs/logging_utils.go b/flyteplugins/go/tasks/logs/logging_utils.go index 83952c8bcd..0f0fd259dd 100644 --- a/flyteplugins/go/tasks/logs/logging_utils.go +++ b/flyteplugins/go/tasks/logs/logging_utils.go @@ -39,7 +39,7 @@ func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, tas } if uint32(len(pod.Status.ContainerStatuses)) <= index { - logger.Errorf(ctx, "containerStatus IndexOutOfBound, requested [%d], but total containerStatuses [%d] in pod phase [%v]", index, len(pod.Status.ContainerStatuses), pod.Status.Phase) + logger.Warnf(ctx, "containerStatus IndexOutOfBound, requested [%d], but total containerStatuses [%d] in pod phase [%v]", index, len(pod.Status.ContainerStatuses), pod.Status.Phase) return nil, nil } else { containerID = pod.Status.ContainerStatuses[index].ContainerID diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/container_helper.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/container_helper.go index 18b1e2654a..c544ee8488 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/container_helper.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/container_helper.go @@ -334,7 +334,7 @@ func AddFlyteCustomizationsToContainer(ctx context.Context, parameters template. SanitizeGPUResourceRequirements(overrideResources, extendedResources.GetGpuAccelerator()) } - logger.Infof(ctx, "ApplyResourceOverrides with Resources [%v], Platform Resources [%v] and Container"+ + logger.Debugf(ctx, "ApplyResourceOverrides with Resources [%v], Platform Resources [%v] and Container"+ " Resources [%v] with mode [%v]", overrideResources, platformResources, container.Resources, mode) switch mode { @@ -353,6 +353,6 @@ func AddFlyteCustomizationsToContainer(ctx context.Context, parameters template. container.Resources = ApplyResourceOverrides(container.Resources, *platformResources, !assignIfUnset) } - logger.Infof(ctx, "Adjusted container resources [%v]", container.Resources) + logger.Debugf(ctx, "Adjusted container resources [%v]", container.Resources) return nil } diff --git a/flyteplugins/go/tasks/pluginmachinery/internal/webapi/monitor.go b/flyteplugins/go/tasks/pluginmachinery/internal/webapi/monitor.go index cd2c6aa086..901464fec5 100644 --- a/flyteplugins/go/tasks/pluginmachinery/internal/webapi/monitor.go +++ b/flyteplugins/go/tasks/pluginmachinery/internal/webapi/monitor.go @@ -54,7 +54,7 @@ func monitor(ctx context.Context, tCtx core.TaskExecutionContext, p Client, cach } if cacheItem.Phase != newPluginPhase { - logger.Infof(ctx, "Moving Phase for from %s to %s", cacheItem.Phase, newPluginPhase) + logger.Debugf(ctx, "Moving phase for %s from %s to %s", cacheItemID, cacheItem.Phase, newPluginPhase) } cacheItem.Phase = newPluginPhase diff --git a/flyteplugins/go/tasks/plugins/webapi/connector/client.go b/flyteplugins/go/tasks/plugins/webapi/connector/client.go index 9068dca5fd..c43830100d 100644 --- a/flyteplugins/go/tasks/plugins/webapi/connector/client.go +++ b/flyteplugins/go/tasks/plugins/webapi/connector/client.go @@ -137,7 +137,7 @@ func updateRegistry( connectorSupportedTaskCategories[supportedCategoryName] = struct{}{} } } - logger.Infof(ctx, "ConnectorDeployment [%v] supports the following task types: [%v]", connectorDeployment.Endpoint, + logger.Debugf(ctx, "ConnectorDeployment [%v] supports the following task types: [%v]", connectorDeployment.Endpoint, strings.Join(maps.Keys(connectorSupportedTaskCategories), ", ")) } } @@ -188,7 +188,7 @@ func getConnectorRegistry(ctx context.Context, cs *ClientSet) Registry { // Update registry with connector apps updateRegistry(ctx, cs, newConnectorRegistry, cfg.ConnectorApps, true) - logger.Infof(ctx, "ConnectorDeployments support the following task types: [%v]", strings.Join(newConnectorRegistry.getSupportedTaskTypes(), ", ")) + logger.Debugf(ctx, "ConnectorDeployments support the following task types: [%v]", strings.Join(newConnectorRegistry.getSupportedTaskTypes(), ", ")) return newConnectorRegistry } diff --git a/flyteplugins/go/tasks/plugins/webapi/connector/plugin.go b/flyteplugins/go/tasks/plugins/webapi/connector/plugin.go index 543672a304..5f59bb687d 100644 --- a/flyteplugins/go/tasks/plugins/webapi/connector/plugin.go +++ b/flyteplugins/go/tasks/plugins/webapi/connector/plugin.go @@ -5,6 +5,7 @@ import ( "encoding/gob" "fmt" "slices" + "strings" "sync" "time" @@ -383,13 +384,23 @@ func (p *Plugin) getAsyncConnectorClient(ctx context.Context, connector *Deploym } func (p *Plugin) watchConnectors(ctx context.Context, connectorService *ConnectorService) { + var lastSupported []string go wait.Until(func() { childCtx, cancel := context.WithCancel(ctx) defer cancel() clientSet := getConnectorClientSets(childCtx) connectorRegistry := getConnectorRegistry(childCtx, clientSet) p.setRegistry(connectorRegistry) - connectorService.SetSupportedTaskType(connectorRegistry.getSupportedTaskTypes()) + supported := connectorRegistry.getSupportedTaskTypes() + connectorService.SetSupportedTaskType(supported) + + // Log the supported task types only when the set changes, to avoid + // re-logging the identical registry on every poll interval. + normalized := slices.Compact(slices.Sorted(slices.Values(supported))) + if !slices.Equal(normalized, lastSupported) { + logger.Infof(childCtx, "ConnectorDeployments support the following task types: [%v]", strings.Join(normalized, ", ")) + lastSupported = normalized + } }, p.cfg.PollInterval.Duration, ctx.Done()) } diff --git a/flytestdlib/resolver/k8s_resolver.go b/flytestdlib/resolver/k8s_resolver.go index c3ccf0f443..c76d7f8d0f 100644 --- a/flytestdlib/resolver/k8s_resolver.go +++ b/flytestdlib/resolver/k8s_resolver.go @@ -130,7 +130,7 @@ func (k *kResolver) ResolveNow(resolver.ResolveNowOptions) {} func (k *kResolver) Close() { k.cancel() k.wg.Wait() - logger.Infof(k.ctx, "k8s resolver: closed") + logger.Debugf(k.ctx, "k8s resolver: closed") } func (k *kResolver) resolve(e *v1.Endpoints) { @@ -156,7 +156,7 @@ func (k *kResolver) run() { k.wg.Add(1) defer k.wg.Done() - logger.Infof(k.ctx, "Starting k8s resolver for target: [%s], service namespace: [%s], service name: [%s]", k.target, k.target.serviceNamespace, k.target.serviceName) + logger.Debugf(k.ctx, "Starting k8s resolver for target: [%s], service namespace: [%s], service name: [%s]", k.target, k.target.serviceNamespace, k.target.serviceName) watcher, err := k.k8sClient.CoreV1().Endpoints(k.target.serviceNamespace).Watch(k.ctx, metav1.ListOptions{FieldSelector: "metadata.name=" + k.target.serviceName}) if err != nil { @@ -179,7 +179,7 @@ func (k *kResolver) run() { case <-k.ctx.Done(): return case event, ok := <-watcher.ResultChan(): - logger.Info(k.ctx, "k8s resolver watcher event response: [%v]", event) + logger.Debugf(k.ctx, "k8s resolver watcher event: [%s]", event.Type) if !ok { logger.Debugf(k.ctx, "k8s resolver: watcher closed") return diff --git a/flytestdlib/storage/stow_store.go b/flytestdlib/storage/stow_store.go index 08a2b7e1e3..d1cda38398 100644 --- a/flytestdlib/storage/stow_store.go +++ b/flytestdlib/storage/stow_store.go @@ -243,7 +243,7 @@ func (s *StowStore) Head(ctx context.Context, reference DataReference) (Metadata } else { contentMD5, ok := metadata[strings.ToLower(FlyteContentMD5)].(string) if !ok { - logger.Infof(ctx, "Failed to cast contentMD5 [%v] to string", contentMD5) + logger.Debugf(ctx, "Failed to cast contentMD5 [%v] to string", contentMD5) } return StowMetadata{ exists: true, From 201cf90086617275cc281b3ad19ddd165f7791e0 Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Tue, 9 Jun 2026 22:33:18 -0700 Subject: [PATCH 2/4] Silence containerStatus log for Pending pods containerStatuses is empty by design until the kubelet starts the containers, so every Pending pod hit this branch on every evaluation and the message read as an error. Pending now logs at Debug; any other phase keeps the Warn since a short containerStatuses there is genuinely unexpected. Co-Authored-By: Claude Opus 4.8 (1M context) Signed-off-by: Paul Dittamo --- flyteplugins/go/tasks/logs/logging_utils.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/flyteplugins/go/tasks/logs/logging_utils.go b/flyteplugins/go/tasks/logs/logging_utils.go index 0f0fd259dd..f8fba5adcf 100644 --- a/flyteplugins/go/tasks/logs/logging_utils.go +++ b/flyteplugins/go/tasks/logs/logging_utils.go @@ -39,7 +39,12 @@ func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, tas } if uint32(len(pod.Status.ContainerStatuses)) <= index { - logger.Warnf(ctx, "containerStatus IndexOutOfBound, requested [%d], but total containerStatuses [%d] in pod phase [%v]", index, len(pod.Status.ContainerStatuses), pod.Status.Phase) + // a Pending pod has no containerStatuses yet; log links are built on a later evaluation + if pod.Status.Phase == v1.PodPending { + logger.Debugf(ctx, "containerStatus not yet available, requested [%d], but total containerStatuses [%d] in pod phase [%v]", index, len(pod.Status.ContainerStatuses), pod.Status.Phase) + } else { + logger.Warnf(ctx, "containerStatus IndexOutOfBound, requested [%d], but total containerStatuses [%d] in pod phase [%v]", index, len(pod.Status.ContainerStatuses), pod.Status.Phase) + } return nil, nil } else { containerID = pod.Status.ContainerStatuses[index].ContainerID From fdfa5f2e4ab74a2f4e6fbee27b44c4246821ce33 Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Tue, 9 Jun 2026 22:57:20 -0700 Subject: [PATCH 3/4] Move supported-task-types change detection into SetSupportedTaskType The setter owns the field and the mutex, so comparing there covers every caller uniformly instead of only the watchConnectors poll loop, and drops the closure-side lastSupported bookkeeping. The stored slice is now normalized (sorted, deduped), which ContainTaskType's slices.Contains lookup is indifferent to. Co-Authored-By: Claude Opus 4.8 (1M context) Signed-off-by: Paul Dittamo --- .../tasks/plugins/webapi/connector/plugin.go | 25 ++++++++----------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/webapi/connector/plugin.go b/flyteplugins/go/tasks/plugins/webapi/connector/plugin.go index 5f59bb687d..29358c2ef1 100644 --- a/flyteplugins/go/tasks/plugins/webapi/connector/plugin.go +++ b/flyteplugins/go/tasks/plugins/webapi/connector/plugin.go @@ -48,10 +48,16 @@ func (p *ConnectorService) ContainTaskType(taskType string) bool { } // SetSupportedTaskType set supportTaskType in the connector service. -func (p *ConnectorService) SetSupportedTaskType(taskTypes []string) { +func (p *ConnectorService) SetSupportedTaskType(ctx context.Context, taskTypes []string) { + normalized := slices.Compact(slices.Sorted(slices.Values(taskTypes))) p.mu.Lock() defer p.mu.Unlock() - p.supportedTaskTypes = taskTypes + // Log the supported task types only when the set changes, to avoid + // re-logging the identical registry on every poll interval. + if !slices.Equal(normalized, p.supportedTaskTypes) { + logger.Infof(ctx, "ConnectorDeployments support the following task types: [%v]", strings.Join(normalized, ", ")) + } + p.supportedTaskTypes = normalized } type RegistryKey struct { @@ -384,23 +390,13 @@ func (p *Plugin) getAsyncConnectorClient(ctx context.Context, connector *Deploym } func (p *Plugin) watchConnectors(ctx context.Context, connectorService *ConnectorService) { - var lastSupported []string go wait.Until(func() { childCtx, cancel := context.WithCancel(ctx) defer cancel() clientSet := getConnectorClientSets(childCtx) connectorRegistry := getConnectorRegistry(childCtx, clientSet) p.setRegistry(connectorRegistry) - supported := connectorRegistry.getSupportedTaskTypes() - connectorService.SetSupportedTaskType(supported) - - // Log the supported task types only when the set changes, to avoid - // re-logging the identical registry on every poll interval. - normalized := slices.Compact(slices.Sorted(slices.Values(supported))) - if !slices.Equal(normalized, lastSupported) { - logger.Infof(childCtx, "ConnectorDeployments support the following task types: [%v]", strings.Join(normalized, ", ")) - lastSupported = normalized - } + connectorService.SetSupportedTaskType(childCtx, connectorRegistry.getSupportedTaskTypes()) }, p.cfg.PollInterval.Duration, ctx.Done()) } @@ -490,8 +486,7 @@ func newConnectorPlugin(connectorService *ConnectorService) webapi.PluginEntry { PluginLoader: func(ctx context.Context, iCtx webapi.PluginSetupContext) (webapi.AsyncPlugin, error) { clientSet := getConnectorClientSets(ctx) connectorRegistry := getConnectorRegistry(ctx, clientSet) - supportedTaskTypes := connectorRegistry.getSupportedTaskTypes() - connectorService.SetSupportedTaskType(supportedTaskTypes) + connectorService.SetSupportedTaskType(ctx, connectorRegistry.getSupportedTaskTypes()) plugin := &Plugin{ metricScope: promutils.NewScope("connector_plugin"), cfg: cfg, From 47692dc60f932e3e7f52de3c7aa52fea2e6fead9 Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Tue, 9 Jun 2026 23:15:06 -0700 Subject: [PATCH 4/4] Add unit test for SetSupportedTaskType normalization Covers the behavior the change-detection depends on: stored set is sorted and deduped, order/duplicate variations of the same set normalize identically, and ContainTaskType tracks adds and removals. Co-Authored-By: Claude Opus 4.8 (1M context) Signed-off-by: Paul Dittamo --- .../plugins/webapi/connector/plugin_test.go | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/flyteplugins/go/tasks/plugins/webapi/connector/plugin_test.go b/flyteplugins/go/tasks/plugins/webapi/connector/plugin_test.go index e63e612bbd..0db8ddeeeb 100644 --- a/flyteplugins/go/tasks/plugins/webapi/connector/plugin_test.go +++ b/flyteplugins/go/tasks/plugins/webapi/connector/plugin_test.go @@ -397,3 +397,25 @@ func TestResourceWrapper_IsTerminal(t *testing.T) { }) } } + +func TestSetSupportedTaskType(t *testing.T) { + ctx := context.TODO() + svc := &ConnectorService{} + + svc.SetSupportedTaskType(ctx, []string{"snowflake", "bigquery", "snowflake"}) + assert.True(t, svc.ContainTaskType("snowflake")) + assert.True(t, svc.ContainTaskType("bigquery")) + assert.False(t, svc.ContainTaskType("databricks")) + assert.Equal(t, []string{"bigquery", "snowflake"}, svc.supportedTaskTypes) + + // same set in a different order with duplicates normalizes to the same value + svc.SetSupportedTaskType(ctx, []string{"bigquery", "snowflake", "bigquery"}) + assert.Equal(t, []string{"bigquery", "snowflake"}, svc.supportedTaskTypes) + + svc.SetSupportedTaskType(ctx, []string{"bigquery"}) + assert.False(t, svc.ContainTaskType("snowflake")) + assert.True(t, svc.ContainTaskType("bigquery")) + + svc.SetSupportedTaskType(ctx, nil) + assert.False(t, svc.ContainTaskType("bigquery")) +}