diff --git a/executor/pkg/plugin/resource_manager.go b/executor/pkg/plugin/resource_manager.go new file mode 100644 index 0000000000..4dd18754c2 --- /dev/null +++ b/executor/pkg/plugin/resource_manager.go @@ -0,0 +1,45 @@ +package plugin + +import ( + "context" + + pluginsCore "github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/core" +) + +// The webapi plugin machinery requires the host to supply a ResourceRegistrar at plugin setup and a +// ResourceManager at task execution, for its allocation-token quota feature. FlytePropeller (v1) +// always supplied them, defaulting to a no-op manager when no quota backend is configured. The v2 +// executor never reimplemented this, so a webapi plugin that declares ResourceQuotas dereferences a +// nil registrar or manager. These no-op types restore the contract and grant every allocation. Swap +// in a real ResourceManager to enforce quotas without touching any plugin. + +// noopResourceRegistrar accepts quota registrations without recording them. +type noopResourceRegistrar struct{} + +var _ pluginsCore.ResourceRegistrar = noopResourceRegistrar{} + +func (noopResourceRegistrar) RegisterResourceQuota(_ context.Context, _ pluginsCore.ResourceNamespace, _ int) error { + return nil +} + +// noopResourceManager grants every allocation request and ignores releases. +type noopResourceManager struct{} + +var _ pluginsCore.ResourceManager = noopResourceManager{} + +func (noopResourceManager) GetID() string { return "executor-noop-resource-manager" } + +func (noopResourceManager) AllocateResource(_ context.Context, _ pluginsCore.ResourceNamespace, _ string, _ pluginsCore.ResourceConstraintsSpec) (pluginsCore.AllocationStatus, error) { + return pluginsCore.AllocationStatusGranted, nil +} + +func (noopResourceManager) ReleaseResource(_ context.Context, _ pluginsCore.ResourceNamespace, _ string) error { + return nil +} + +// NewNoopResourceRegistrar returns a ResourceRegistrar that accepts quota declarations without +// enforcing them. +func NewNoopResourceRegistrar() pluginsCore.ResourceRegistrar { return noopResourceRegistrar{} } + +// NewNoopResourceManager returns a ResourceManager that grants every allocation. +func NewNoopResourceManager() pluginsCore.ResourceManager { return noopResourceManager{} } diff --git a/executor/pkg/plugin/resource_manager_test.go b/executor/pkg/plugin/resource_manager_test.go new file mode 100644 index 0000000000..f599a31228 --- /dev/null +++ b/executor/pkg/plugin/resource_manager_test.go @@ -0,0 +1,30 @@ +package plugin + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + + pluginsCore "github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/core" +) + +func TestNoopResourceRegistrar(t *testing.T) { + r := NewNoopResourceRegistrar() + assert.NotNil(t, r) + // Accepts a quota declaration (the connector ships ResourceQuotas{"default":1000}) without error. + assert.NoError(t, r.RegisterResourceQuota(context.Background(), "default", 1000)) +} + +func TestNoopResourceManager(t *testing.T) { + m := NewNoopResourceManager() + assert.NotNil(t, m) + assert.Equal(t, "executor-noop-resource-manager", m.GetID()) + + // Every allocation is granted, matching v1 propeller with no quota backend configured. + status, err := m.AllocateResource(context.Background(), "default", "token", pluginsCore.ResourceConstraintsSpec{}) + assert.NoError(t, err) + assert.Equal(t, pluginsCore.AllocationStatusGranted, status) + + assert.NoError(t, m.ReleaseResource(context.Background(), "default", "token")) +} diff --git a/executor/pkg/plugin/secret_manager.go b/executor/pkg/plugin/secret_manager.go new file mode 100644 index 0000000000..310262acdd --- /dev/null +++ b/executor/pkg/plugin/secret_manager.go @@ -0,0 +1,25 @@ +package plugin + +import ( + "context" + "fmt" + + pluginsCore "github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/core" +) + +// The webapi plugin machinery resolves task secrets through a SecretManager supplied by the host, at +// both plugin setup and task execution. The v2 executor never wired one up, so a connector task that +// references a secret dereferences a nil SecretManager and panics. This no-op restores the contract. +// It has no secret backend, so Get fails loudly with an error rather than handing a task a blank +// value. Swap in a real SecretManager to resolve secrets. +type noopSecretManager struct{} + +var _ pluginsCore.SecretManager = noopSecretManager{} + +func (noopSecretManager) Get(_ context.Context, key string) (string, error) { + return "", fmt.Errorf("secrets are not supported by the executor's no-op secret manager, cannot resolve %q", key) +} + +// NewNoopSecretManager returns a SecretManager with no backend that fails every lookup with a clear +// error, so a connector task referencing a secret does not silently receive an empty value. +func NewNoopSecretManager() pluginsCore.SecretManager { return noopSecretManager{} } diff --git a/executor/pkg/plugin/secret_manager_test.go b/executor/pkg/plugin/secret_manager_test.go new file mode 100644 index 0000000000..5e09aac319 --- /dev/null +++ b/executor/pkg/plugin/secret_manager_test.go @@ -0,0 +1,19 @@ +package plugin + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestNoopSecretManager(t *testing.T) { + m := NewNoopSecretManager() + assert.NotNil(t, m) + + // No backend: a lookup fails with an error instead of returning an empty secret, so a connector + // task that references a secret fails loudly rather than running with a blank value. + val, err := m.Get(context.Background(), "my-secret") + assert.Error(t, err) + assert.Empty(t, val) +} diff --git a/executor/setup.go b/executor/setup.go index ef389c15b7..556810850e 100644 --- a/executor/setup.go +++ b/executor/setup.go @@ -29,6 +29,7 @@ import ( "github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/catalog" cachecatalog "github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/catalog/cache_service" webhookConfig "github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/secret/config" + connectorplugin "github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/plugins/webapi/connector" "github.com/flyteorg/flyte/v2/flytestdlib/app" "github.com/flyteorg/flyte/v2/flytestdlib/otelutils" "github.com/flyteorg/flyte/v2/flytestdlib/promutils" @@ -66,6 +67,11 @@ func Setup(ctx context.Context, sc *app.SetupContext) error { utilruntime.Must(reg.AddToScheme(scheme)) } + // Register the connector (webapi) backend plugin so task types backed by an external connector + // service are routed to it. This must run before plugin.NewRegistry below, which snapshots the + // core plugins once. + connectorplugin.RegisterConnectorPlugin(&connectorplugin.ConnectorService{}) + var tlsOpts []func(*tls.Config) if !cfg.EnableHTTP2 { tlsOpts = append(tlsOpts, func(c *tls.Config) { @@ -129,7 +135,7 @@ func Setup(ctx context.Context, sc *app.SetupContext) error { } setupCtx := plugin.NewSetupContext( - mgr, nil, nil, nil, nil, + mgr, plugin.NewNoopSecretManager(), plugin.NewNoopResourceRegistrar(), nil, nil, "TaskAction", executorScope.NewSubScope("plugin"), ) @@ -177,6 +183,14 @@ func Setup(ctx context.Context, sc *app.SetupContext) error { reconciler.CatalogClient = asyncCatalogClient reconciler.Catalog = cacheClient reconciler.Recorder = mgr.GetEventRecorder("taskaction-controller") + // Supply a ResourceManager for the webapi allocation-token path, used by connector-backed task + // types that declare ResourceQuotas. It grants every allocation by default, matching + // FlytePropeller with no quota backend. Swap in a real one to enforce quotas. + reconciler.ResourceManager = plugin.NewNoopResourceManager() + // Supply a SecretManager so connector tasks that reference secrets do not nil-deref at execution + // time. It has no backend and fails lookups with a clear error. Swap in a real one to resolve + // secrets. + reconciler.SecretManager = plugin.NewNoopSecretManager() if cfg.MaxSystemFailures < 0 { return fmt.Errorf("executor: maxSystemFailures must be non-negative, got %d", cfg.MaxSystemFailures) } diff --git a/flyteplugins/go/tasks/plugins/webapi/connector/plugin.go b/flyteplugins/go/tasks/plugins/webapi/connector/plugin.go index 543672a304..9114235bb9 100644 --- a/flyteplugins/go/tasks/plugins/webapi/connector/plugin.go +++ b/flyteplugins/go/tasks/plugins/webapi/connector/plugin.go @@ -494,7 +494,6 @@ func newConnectorPlugin(connectorService *ConnectorService) webapi.PluginEntry { } func RegisterConnectorPlugin(connectorService *ConnectorService) { - fmt.Printf("Registering connector plugin...\n") gob.Register(ResourceMetaWrapper{}) gob.Register(ResourceWrapper{}) pluginmachinery.PluginRegistry().RegisterRemotePlugin(newConnectorPlugin(connectorService))