Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion flyteplugins/go/tasks/logs/logging_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@ func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, tas
}

if uint32(len(pod.Status.ContainerStatuses)) <= index {
logger.Errorf(ctx, "containerStatus IndexOutOfBound, requested [%d], but total containerStatuses [%d] in pod phase [%v]", index, len(pod.Status.ContainerStatuses), pod.Status.Phase)
// a Pending pod has no containerStatuses yet; log links are built on a later evaluation
if pod.Status.Phase == v1.PodPending {
logger.Debugf(ctx, "containerStatus not yet available, requested [%d], but total containerStatuses [%d] in pod phase [%v]", index, len(pod.Status.ContainerStatuses), pod.Status.Phase)
} else {
logger.Warnf(ctx, "containerStatus IndexOutOfBound, requested [%d], but total containerStatuses [%d] in pod phase [%v]", index, len(pod.Status.ContainerStatuses), pod.Status.Phase)
}
return nil, nil
} else {
containerID = pod.Status.ContainerStatuses[index].ContainerID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func AddFlyteCustomizationsToContainer(ctx context.Context, parameters template.
SanitizeGPUResourceRequirements(overrideResources, extendedResources.GetGpuAccelerator())
}

logger.Infof(ctx, "ApplyResourceOverrides with Resources [%v], Platform Resources [%v] and Container"+
logger.Debugf(ctx, "ApplyResourceOverrides with Resources [%v], Platform Resources [%v] and Container"+
" Resources [%v] with mode [%v]", overrideResources, platformResources, container.Resources, mode)

switch mode {
Expand All @@ -353,6 +353,6 @@ func AddFlyteCustomizationsToContainer(ctx context.Context, parameters template.
container.Resources = ApplyResourceOverrides(container.Resources, *platformResources, !assignIfUnset)
}

logger.Infof(ctx, "Adjusted container resources [%v]", container.Resources)
logger.Debugf(ctx, "Adjusted container resources [%v]", container.Resources)
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func monitor(ctx context.Context, tCtx core.TaskExecutionContext, p Client, cach
}

if cacheItem.Phase != newPluginPhase {
logger.Infof(ctx, "Moving Phase for from %s to %s", cacheItem.Phase, newPluginPhase)
logger.Debugf(ctx, "Moving phase for %s from %s to %s", cacheItemID, cacheItem.Phase, newPluginPhase)
}

cacheItem.Phase = newPluginPhase
Expand Down
4 changes: 2 additions & 2 deletions flyteplugins/go/tasks/plugins/webapi/connector/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func updateRegistry(
connectorSupportedTaskCategories[supportedCategoryName] = struct{}{}
}
}
logger.Infof(ctx, "ConnectorDeployment [%v] supports the following task types: [%v]", connectorDeployment.Endpoint,
logger.Debugf(ctx, "ConnectorDeployment [%v] supports the following task types: [%v]", connectorDeployment.Endpoint,
strings.Join(maps.Keys(connectorSupportedTaskCategories), ", "))
}
}
Expand Down Expand Up @@ -188,7 +188,7 @@ func getConnectorRegistry(ctx context.Context, cs *ClientSet) Registry {
// Update registry with connector apps
updateRegistry(ctx, cs, newConnectorRegistry, cfg.ConnectorApps, true)

logger.Infof(ctx, "ConnectorDeployments support the following task types: [%v]", strings.Join(newConnectorRegistry.getSupportedTaskTypes(), ", "))
logger.Debugf(ctx, "ConnectorDeployments support the following task types: [%v]", strings.Join(newConnectorRegistry.getSupportedTaskTypes(), ", "))
return newConnectorRegistry
}

Expand Down
16 changes: 11 additions & 5 deletions flyteplugins/go/tasks/plugins/webapi/connector/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/gob"
"fmt"
"slices"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -47,10 +48,16 @@ func (p *ConnectorService) ContainTaskType(taskType string) bool {
}

// SetSupportedTaskType set supportTaskType in the connector service.
func (p *ConnectorService) SetSupportedTaskType(taskTypes []string) {
func (p *ConnectorService) SetSupportedTaskType(ctx context.Context, taskTypes []string) {
normalized := slices.Compact(slices.Sorted(slices.Values(taskTypes)))
p.mu.Lock()
Comment on lines +51 to 53

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we apply its suggestion?

defer p.mu.Unlock()
p.supportedTaskTypes = taskTypes
// Log the supported task types only when the set changes, to avoid
// re-logging the identical registry on every poll interval.
if !slices.Equal(normalized, p.supportedTaskTypes) {
logger.Infof(ctx, "ConnectorDeployments support the following task types: [%v]", strings.Join(normalized, ", "))
}
p.supportedTaskTypes = normalized
}

type RegistryKey struct {
Expand Down Expand Up @@ -389,7 +396,7 @@ func (p *Plugin) watchConnectors(ctx context.Context, connectorService *Connecto
clientSet := getConnectorClientSets(childCtx)
connectorRegistry := getConnectorRegistry(childCtx, clientSet)
p.setRegistry(connectorRegistry)
connectorService.SetSupportedTaskType(connectorRegistry.getSupportedTaskTypes())
connectorService.SetSupportedTaskType(childCtx, connectorRegistry.getSupportedTaskTypes())
}, p.cfg.PollInterval.Duration, ctx.Done())
}

Expand Down Expand Up @@ -479,8 +486,7 @@ func newConnectorPlugin(connectorService *ConnectorService) webapi.PluginEntry {
PluginLoader: func(ctx context.Context, iCtx webapi.PluginSetupContext) (webapi.AsyncPlugin, error) {
clientSet := getConnectorClientSets(ctx)
connectorRegistry := getConnectorRegistry(ctx, clientSet)
supportedTaskTypes := connectorRegistry.getSupportedTaskTypes()
connectorService.SetSupportedTaskType(supportedTaskTypes)
connectorService.SetSupportedTaskType(ctx, connectorRegistry.getSupportedTaskTypes())
plugin := &Plugin{
metricScope: promutils.NewScope("connector_plugin"),
cfg: cfg,
Expand Down
22 changes: 22 additions & 0 deletions flyteplugins/go/tasks/plugins/webapi/connector/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,3 +397,25 @@ func TestResourceWrapper_IsTerminal(t *testing.T) {
})
}
}

func TestSetSupportedTaskType(t *testing.T) {
ctx := context.TODO()
svc := &ConnectorService{}

svc.SetSupportedTaskType(ctx, []string{"snowflake", "bigquery", "snowflake"})
assert.True(t, svc.ContainTaskType("snowflake"))
assert.True(t, svc.ContainTaskType("bigquery"))
assert.False(t, svc.ContainTaskType("databricks"))
assert.Equal(t, []string{"bigquery", "snowflake"}, svc.supportedTaskTypes)

// same set in a different order with duplicates normalizes to the same value
svc.SetSupportedTaskType(ctx, []string{"bigquery", "snowflake", "bigquery"})
assert.Equal(t, []string{"bigquery", "snowflake"}, svc.supportedTaskTypes)

svc.SetSupportedTaskType(ctx, []string{"bigquery"})
assert.False(t, svc.ContainTaskType("snowflake"))
assert.True(t, svc.ContainTaskType("bigquery"))

svc.SetSupportedTaskType(ctx, nil)
assert.False(t, svc.ContainTaskType("bigquery"))
}
6 changes: 3 additions & 3 deletions flytestdlib/resolver/k8s_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (k *kResolver) ResolveNow(resolver.ResolveNowOptions) {}
func (k *kResolver) Close() {
k.cancel()
k.wg.Wait()
logger.Infof(k.ctx, "k8s resolver: closed")
logger.Debugf(k.ctx, "k8s resolver: closed")
}

func (k *kResolver) resolve(e *v1.Endpoints) {
Expand All @@ -156,7 +156,7 @@ func (k *kResolver) run() {
k.wg.Add(1)
defer k.wg.Done()

logger.Infof(k.ctx, "Starting k8s resolver for target: [%s], service namespace: [%s], service name: [%s]", k.target, k.target.serviceNamespace, k.target.serviceName)
logger.Debugf(k.ctx, "Starting k8s resolver for target: [%s], service namespace: [%s], service name: [%s]", k.target, k.target.serviceNamespace, k.target.serviceName)

watcher, err := k.k8sClient.CoreV1().Endpoints(k.target.serviceNamespace).Watch(k.ctx, metav1.ListOptions{FieldSelector: "metadata.name=" + k.target.serviceName})
if err != nil {
Expand All @@ -179,7 +179,7 @@ func (k *kResolver) run() {
case <-k.ctx.Done():
return
case event, ok := <-watcher.ResultChan():
logger.Info(k.ctx, "k8s resolver watcher event response: [%v]", event)
logger.Debugf(k.ctx, "k8s resolver watcher event: [%s]", event.Type)
if !ok {
logger.Debugf(k.ctx, "k8s resolver: watcher closed")
return
Comment on lines 181 to 185
Comment on lines 181 to 185
Expand Down
2 changes: 1 addition & 1 deletion flytestdlib/storage/stow_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (s *StowStore) Head(ctx context.Context, reference DataReference) (Metadata
} else {
contentMD5, ok := metadata[strings.ToLower(FlyteContentMD5)].(string)
if !ok {
logger.Infof(ctx, "Failed to cast contentMD5 [%v] to string", contentMD5)
logger.Debugf(ctx, "Failed to cast contentMD5 [%v] to string", contentMD5)
Comment on lines 244 to +246
}
Comment on lines 244 to 247
Comment on lines 244 to 247
return StowMetadata{
exists: true,
Expand Down
Loading