Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions executor/pkg/plugin/resource_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package plugin

import (
"context"

pluginsCore "github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/core"
)

// The webapi plugin machinery requires the host to supply a ResourceRegistrar at plugin setup and a
// ResourceManager at task execution, for its allocation-token quota feature. FlytePropeller (v1)
// always supplied them, defaulting to a no-op manager when no quota backend is configured. The v2
// executor never reimplemented this, so a webapi plugin that declares ResourceQuotas dereferences a
// nil registrar or manager. These no-op types restore the contract and grant every allocation. Swap
// in a real ResourceManager to enforce quotas without touching any plugin.

// noopResourceRegistrar accepts quota registrations without recording them.
type noopResourceRegistrar struct{}

var _ pluginsCore.ResourceRegistrar = noopResourceRegistrar{}

func (noopResourceRegistrar) RegisterResourceQuota(_ context.Context, _ pluginsCore.ResourceNamespace, _ int) error {
return nil
}

// noopResourceManager grants every allocation request and ignores releases.
type noopResourceManager struct{}

var _ pluginsCore.ResourceManager = noopResourceManager{}

func (noopResourceManager) GetID() string { return "executor-noop-resource-manager" }

func (noopResourceManager) AllocateResource(_ context.Context, _ pluginsCore.ResourceNamespace, _ string, _ pluginsCore.ResourceConstraintsSpec) (pluginsCore.AllocationStatus, error) {
return pluginsCore.AllocationStatusGranted, nil
}

func (noopResourceManager) ReleaseResource(_ context.Context, _ pluginsCore.ResourceNamespace, _ string) error {
return nil
}

// NewNoopResourceRegistrar returns a ResourceRegistrar that accepts quota declarations without
// enforcing them.
func NewNoopResourceRegistrar() pluginsCore.ResourceRegistrar { return noopResourceRegistrar{} }

// NewNoopResourceManager returns a ResourceManager that grants every allocation.
func NewNoopResourceManager() pluginsCore.ResourceManager { return noopResourceManager{} }
30 changes: 30 additions & 0 deletions executor/pkg/plugin/resource_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package plugin

import (
"context"
"testing"

"github.com/stretchr/testify/assert"

pluginsCore "github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/core"
)

func TestNoopResourceRegistrar(t *testing.T) {
r := NewNoopResourceRegistrar()
assert.NotNil(t, r)
// Accepts a quota declaration (the connector ships ResourceQuotas{"default":1000}) without error.
assert.NoError(t, r.RegisterResourceQuota(context.Background(), "default", 1000))
}

func TestNoopResourceManager(t *testing.T) {
m := NewNoopResourceManager()
assert.NotNil(t, m)
assert.Equal(t, "executor-noop-resource-manager", m.GetID())

// Every allocation is granted, matching v1 propeller with no quota backend configured.
status, err := m.AllocateResource(context.Background(), "default", "token", pluginsCore.ResourceConstraintsSpec{})
assert.NoError(t, err)
assert.Equal(t, pluginsCore.AllocationStatusGranted, status)

assert.NoError(t, m.ReleaseResource(context.Background(), "default", "token"))
}
25 changes: 25 additions & 0 deletions executor/pkg/plugin/secret_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package plugin

import (
"context"
"fmt"

pluginsCore "github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/core"
)

// The webapi plugin machinery resolves task secrets through a SecretManager supplied by the host, at
// both plugin setup and task execution. The v2 executor never wired one up, so a connector task that
// references a secret dereferences a nil SecretManager and panics. This no-op restores the contract.
// It has no secret backend, so Get fails loudly with an error rather than handing a task a blank
// value. Swap in a real SecretManager to resolve secrets.
type noopSecretManager struct{}

var _ pluginsCore.SecretManager = noopSecretManager{}

func (noopSecretManager) Get(_ context.Context, key string) (string, error) {
return "", fmt.Errorf("secrets are not supported by the executor's no-op secret manager, cannot resolve %q", key)
}

// NewNoopSecretManager returns a SecretManager with no backend that fails every lookup with a clear
// error, so a connector task referencing a secret does not silently receive an empty value.
func NewNoopSecretManager() pluginsCore.SecretManager { return noopSecretManager{} }
19 changes: 19 additions & 0 deletions executor/pkg/plugin/secret_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package plugin

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
)

func TestNoopSecretManager(t *testing.T) {
m := NewNoopSecretManager()
assert.NotNil(t, m)

// No backend: a lookup fails with an error instead of returning an empty secret, so a connector
// task that references a secret fails loudly rather than running with a blank value.
val, err := m.Get(context.Background(), "my-secret")
assert.Error(t, err)
assert.Empty(t, val)
}
16 changes: 15 additions & 1 deletion executor/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/catalog"
cachecatalog "github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/catalog/cache_service"
webhookConfig "github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/secret/config"
connectorplugin "github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/plugins/webapi/connector"
"github.com/flyteorg/flyte/v2/flytestdlib/app"
"github.com/flyteorg/flyte/v2/flytestdlib/otelutils"
"github.com/flyteorg/flyte/v2/flytestdlib/promutils"
Expand Down Expand Up @@ -66,6 +67,11 @@ func Setup(ctx context.Context, sc *app.SetupContext) error {
utilruntime.Must(reg.AddToScheme(scheme))
}

// Register the connector (webapi) backend plugin so task types backed by an external connector
// service are routed to it. This must run before plugin.NewRegistry below, which snapshots the
// core plugins once.
connectorplugin.RegisterConnectorPlugin(&connectorplugin.ConnectorService{})

Comment thread
dejanzele marked this conversation as resolved.
var tlsOpts []func(*tls.Config)
if !cfg.EnableHTTP2 {
tlsOpts = append(tlsOpts, func(c *tls.Config) {
Expand Down Expand Up @@ -129,7 +135,7 @@ func Setup(ctx context.Context, sc *app.SetupContext) error {
}

setupCtx := plugin.NewSetupContext(
mgr, nil, nil, nil, nil,
mgr, plugin.NewNoopSecretManager(), plugin.NewNoopResourceRegistrar(), nil, nil,
"TaskAction",
executorScope.NewSubScope("plugin"),
)
Comment thread
dejanzele marked this conversation as resolved.
Expand Down Expand Up @@ -177,6 +183,14 @@ func Setup(ctx context.Context, sc *app.SetupContext) error {
reconciler.CatalogClient = asyncCatalogClient
reconciler.Catalog = cacheClient
reconciler.Recorder = mgr.GetEventRecorder("taskaction-controller")
// Supply a ResourceManager for the webapi allocation-token path, used by connector-backed task
// types that declare ResourceQuotas. It grants every allocation by default, matching
// FlytePropeller with no quota backend. Swap in a real one to enforce quotas.
reconciler.ResourceManager = plugin.NewNoopResourceManager()
Comment thread
dejanzele marked this conversation as resolved.
// Supply a SecretManager so connector tasks that reference secrets do not nil-deref at execution
// time. It has no backend and fails lookups with a clear error. Swap in a real one to resolve
// secrets.
reconciler.SecretManager = plugin.NewNoopSecretManager()
if cfg.MaxSystemFailures < 0 {
return fmt.Errorf("executor: maxSystemFailures must be non-negative, got %d", cfg.MaxSystemFailures)
}
Expand Down
1 change: 0 additions & 1 deletion flyteplugins/go/tasks/plugins/webapi/connector/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,6 @@ func newConnectorPlugin(connectorService *ConnectorService) webapi.PluginEntry {
}

func RegisterConnectorPlugin(connectorService *ConnectorService) {
fmt.Printf("Registering connector plugin...\n")
gob.Register(ResourceMetaWrapper{})
gob.Register(ResourceWrapper{})
pluginmachinery.PluginRegistry().RegisterRemotePlugin(newConnectorPlugin(connectorService))
Expand Down
Loading