diff --git a/flyteplugins/go/tasks/logs/logging_utils.go b/flyteplugins/go/tasks/logs/logging_utils.go index 83952c8bcd..f8fba5adcf 100644 --- a/flyteplugins/go/tasks/logs/logging_utils.go +++ b/flyteplugins/go/tasks/logs/logging_utils.go @@ -39,7 +39,12 @@ func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, tas } if uint32(len(pod.Status.ContainerStatuses)) <= index { - logger.Errorf(ctx, "containerStatus IndexOutOfBound, requested [%d], but total containerStatuses [%d] in pod phase [%v]", index, len(pod.Status.ContainerStatuses), pod.Status.Phase) + // a Pending pod has no containerStatuses yet; log links are built on a later evaluation + if pod.Status.Phase == v1.PodPending { + logger.Debugf(ctx, "containerStatus not yet available, requested [%d], but total containerStatuses [%d] in pod phase [%v]", index, len(pod.Status.ContainerStatuses), pod.Status.Phase) + } else { + logger.Warnf(ctx, "containerStatus IndexOutOfBound, requested [%d], but total containerStatuses [%d] in pod phase [%v]", index, len(pod.Status.ContainerStatuses), pod.Status.Phase) + } return nil, nil } else { containerID = pod.Status.ContainerStatuses[index].ContainerID diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/container_helper.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/container_helper.go index 18b1e2654a..c544ee8488 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/container_helper.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/container_helper.go @@ -334,7 +334,7 @@ func AddFlyteCustomizationsToContainer(ctx context.Context, parameters template. SanitizeGPUResourceRequirements(overrideResources, extendedResources.GetGpuAccelerator()) } - logger.Infof(ctx, "ApplyResourceOverrides with Resources [%v], Platform Resources [%v] and Container"+ + logger.Debugf(ctx, "ApplyResourceOverrides with Resources [%v], Platform Resources [%v] and Container"+ " Resources [%v] with mode [%v]", overrideResources, platformResources, container.Resources, mode) switch mode { @@ -353,6 +353,6 @@ func AddFlyteCustomizationsToContainer(ctx context.Context, parameters template. container.Resources = ApplyResourceOverrides(container.Resources, *platformResources, !assignIfUnset) } - logger.Infof(ctx, "Adjusted container resources [%v]", container.Resources) + logger.Debugf(ctx, "Adjusted container resources [%v]", container.Resources) return nil } diff --git a/flyteplugins/go/tasks/pluginmachinery/internal/webapi/monitor.go b/flyteplugins/go/tasks/pluginmachinery/internal/webapi/monitor.go index cd2c6aa086..901464fec5 100644 --- a/flyteplugins/go/tasks/pluginmachinery/internal/webapi/monitor.go +++ b/flyteplugins/go/tasks/pluginmachinery/internal/webapi/monitor.go @@ -54,7 +54,7 @@ func monitor(ctx context.Context, tCtx core.TaskExecutionContext, p Client, cach } if cacheItem.Phase != newPluginPhase { - logger.Infof(ctx, "Moving Phase for from %s to %s", cacheItem.Phase, newPluginPhase) + logger.Debugf(ctx, "Moving phase for %s from %s to %s", cacheItemID, cacheItem.Phase, newPluginPhase) } cacheItem.Phase = newPluginPhase diff --git a/flyteplugins/go/tasks/plugins/webapi/connector/client.go b/flyteplugins/go/tasks/plugins/webapi/connector/client.go index 9068dca5fd..c43830100d 100644 --- a/flyteplugins/go/tasks/plugins/webapi/connector/client.go +++ b/flyteplugins/go/tasks/plugins/webapi/connector/client.go @@ -137,7 +137,7 @@ func updateRegistry( connectorSupportedTaskCategories[supportedCategoryName] = struct{}{} } } - logger.Infof(ctx, "ConnectorDeployment [%v] supports the following task types: [%v]", connectorDeployment.Endpoint, + logger.Debugf(ctx, "ConnectorDeployment [%v] supports the following task types: [%v]", connectorDeployment.Endpoint, strings.Join(maps.Keys(connectorSupportedTaskCategories), ", ")) } } @@ -188,7 +188,7 @@ func getConnectorRegistry(ctx context.Context, cs *ClientSet) Registry { // Update registry with connector apps updateRegistry(ctx, cs, newConnectorRegistry, cfg.ConnectorApps, true) - logger.Infof(ctx, "ConnectorDeployments support the following task types: [%v]", strings.Join(newConnectorRegistry.getSupportedTaskTypes(), ", ")) + logger.Debugf(ctx, "ConnectorDeployments support the following task types: [%v]", strings.Join(newConnectorRegistry.getSupportedTaskTypes(), ", ")) return newConnectorRegistry } diff --git a/flyteplugins/go/tasks/plugins/webapi/connector/plugin.go b/flyteplugins/go/tasks/plugins/webapi/connector/plugin.go index 543672a304..29358c2ef1 100644 --- a/flyteplugins/go/tasks/plugins/webapi/connector/plugin.go +++ b/flyteplugins/go/tasks/plugins/webapi/connector/plugin.go @@ -5,6 +5,7 @@ import ( "encoding/gob" "fmt" "slices" + "strings" "sync" "time" @@ -47,10 +48,16 @@ func (p *ConnectorService) ContainTaskType(taskType string) bool { } // SetSupportedTaskType set supportTaskType in the connector service. -func (p *ConnectorService) SetSupportedTaskType(taskTypes []string) { +func (p *ConnectorService) SetSupportedTaskType(ctx context.Context, taskTypes []string) { + normalized := slices.Compact(slices.Sorted(slices.Values(taskTypes))) p.mu.Lock() defer p.mu.Unlock() - p.supportedTaskTypes = taskTypes + // Log the supported task types only when the set changes, to avoid + // re-logging the identical registry on every poll interval. + if !slices.Equal(normalized, p.supportedTaskTypes) { + logger.Infof(ctx, "ConnectorDeployments support the following task types: [%v]", strings.Join(normalized, ", ")) + } + p.supportedTaskTypes = normalized } type RegistryKey struct { @@ -389,7 +396,7 @@ func (p *Plugin) watchConnectors(ctx context.Context, connectorService *Connecto clientSet := getConnectorClientSets(childCtx) connectorRegistry := getConnectorRegistry(childCtx, clientSet) p.setRegistry(connectorRegistry) - connectorService.SetSupportedTaskType(connectorRegistry.getSupportedTaskTypes()) + connectorService.SetSupportedTaskType(childCtx, connectorRegistry.getSupportedTaskTypes()) }, p.cfg.PollInterval.Duration, ctx.Done()) } @@ -479,8 +486,7 @@ func newConnectorPlugin(connectorService *ConnectorService) webapi.PluginEntry { PluginLoader: func(ctx context.Context, iCtx webapi.PluginSetupContext) (webapi.AsyncPlugin, error) { clientSet := getConnectorClientSets(ctx) connectorRegistry := getConnectorRegistry(ctx, clientSet) - supportedTaskTypes := connectorRegistry.getSupportedTaskTypes() - connectorService.SetSupportedTaskType(supportedTaskTypes) + connectorService.SetSupportedTaskType(ctx, connectorRegistry.getSupportedTaskTypes()) plugin := &Plugin{ metricScope: promutils.NewScope("connector_plugin"), cfg: cfg, diff --git a/flyteplugins/go/tasks/plugins/webapi/connector/plugin_test.go b/flyteplugins/go/tasks/plugins/webapi/connector/plugin_test.go index e63e612bbd..0db8ddeeeb 100644 --- a/flyteplugins/go/tasks/plugins/webapi/connector/plugin_test.go +++ b/flyteplugins/go/tasks/plugins/webapi/connector/plugin_test.go @@ -397,3 +397,25 @@ func TestResourceWrapper_IsTerminal(t *testing.T) { }) } } + +func TestSetSupportedTaskType(t *testing.T) { + ctx := context.TODO() + svc := &ConnectorService{} + + svc.SetSupportedTaskType(ctx, []string{"snowflake", "bigquery", "snowflake"}) + assert.True(t, svc.ContainTaskType("snowflake")) + assert.True(t, svc.ContainTaskType("bigquery")) + assert.False(t, svc.ContainTaskType("databricks")) + assert.Equal(t, []string{"bigquery", "snowflake"}, svc.supportedTaskTypes) + + // same set in a different order with duplicates normalizes to the same value + svc.SetSupportedTaskType(ctx, []string{"bigquery", "snowflake", "bigquery"}) + assert.Equal(t, []string{"bigquery", "snowflake"}, svc.supportedTaskTypes) + + svc.SetSupportedTaskType(ctx, []string{"bigquery"}) + assert.False(t, svc.ContainTaskType("snowflake")) + assert.True(t, svc.ContainTaskType("bigquery")) + + svc.SetSupportedTaskType(ctx, nil) + assert.False(t, svc.ContainTaskType("bigquery")) +} diff --git a/flytestdlib/resolver/k8s_resolver.go b/flytestdlib/resolver/k8s_resolver.go index c3ccf0f443..c76d7f8d0f 100644 --- a/flytestdlib/resolver/k8s_resolver.go +++ b/flytestdlib/resolver/k8s_resolver.go @@ -130,7 +130,7 @@ func (k *kResolver) ResolveNow(resolver.ResolveNowOptions) {} func (k *kResolver) Close() { k.cancel() k.wg.Wait() - logger.Infof(k.ctx, "k8s resolver: closed") + logger.Debugf(k.ctx, "k8s resolver: closed") } func (k *kResolver) resolve(e *v1.Endpoints) { @@ -156,7 +156,7 @@ func (k *kResolver) run() { k.wg.Add(1) defer k.wg.Done() - logger.Infof(k.ctx, "Starting k8s resolver for target: [%s], service namespace: [%s], service name: [%s]", k.target, k.target.serviceNamespace, k.target.serviceName) + logger.Debugf(k.ctx, "Starting k8s resolver for target: [%s], service namespace: [%s], service name: [%s]", k.target, k.target.serviceNamespace, k.target.serviceName) watcher, err := k.k8sClient.CoreV1().Endpoints(k.target.serviceNamespace).Watch(k.ctx, metav1.ListOptions{FieldSelector: "metadata.name=" + k.target.serviceName}) if err != nil { @@ -179,7 +179,7 @@ func (k *kResolver) run() { case <-k.ctx.Done(): return case event, ok := <-watcher.ResultChan(): - logger.Info(k.ctx, "k8s resolver watcher event response: [%v]", event) + logger.Debugf(k.ctx, "k8s resolver watcher event: [%s]", event.Type) if !ok { logger.Debugf(k.ctx, "k8s resolver: watcher closed") return diff --git a/flytestdlib/storage/stow_store.go b/flytestdlib/storage/stow_store.go index 08a2b7e1e3..d1cda38398 100644 --- a/flytestdlib/storage/stow_store.go +++ b/flytestdlib/storage/stow_store.go @@ -243,7 +243,7 @@ func (s *StowStore) Head(ctx context.Context, reference DataReference) (Metadata } else { contentMD5, ok := metadata[strings.ToLower(FlyteContentMD5)].(string) if !ok { - logger.Infof(ctx, "Failed to cast contentMD5 [%v] to string", contentMD5) + logger.Debugf(ctx, "Failed to cast contentMD5 [%v] to string", contentMD5) } return StowMetadata{ exists: true,