From 7f5448be0b27280c3622c772ca84c6bef1d1aafe Mon Sep 17 00:00:00 2001 From: Dejan Zele Pejchev Date: Thu, 18 Jun 2026 23:54:58 +0200 Subject: [PATCH 1/3] Wire the connector (webapi) backend plugin into the v2 executor The OSS Flyte 2 executor never calls RegisterConnectorPlugin, so task types backed by external connectors (agents) have no plugin and fall through to the pod plugin. Register it during executor setup, and clear ResourceQuotas since the v2 executor wires no ResourceRegistrar/ResourceManager. Enables routing tasks (e.g. armada) to a connector service over gRPC. Signed-off-by: Dejan Zele Pejchev --- executor/pkg/plugin/resource_manager.go | 45 ++++++++++++++++++++ executor/pkg/plugin/resource_manager_test.go | 30 +++++++++++++ executor/setup.go | 12 +++++- 3 files changed, 86 insertions(+), 1 deletion(-) create mode 100644 executor/pkg/plugin/resource_manager.go create mode 100644 executor/pkg/plugin/resource_manager_test.go diff --git a/executor/pkg/plugin/resource_manager.go b/executor/pkg/plugin/resource_manager.go new file mode 100644 index 0000000000..4dd18754c2 --- /dev/null +++ b/executor/pkg/plugin/resource_manager.go @@ -0,0 +1,45 @@ +package plugin + +import ( + "context" + + pluginsCore "github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/core" +) + +// The webapi plugin machinery requires the host to supply a ResourceRegistrar at plugin setup and a +// ResourceManager at task execution, for its allocation-token quota feature. FlytePropeller (v1) +// always supplied them, defaulting to a no-op manager when no quota backend is configured. The v2 +// executor never reimplemented this, so a webapi plugin that declares ResourceQuotas dereferences a +// nil registrar or manager. These no-op types restore the contract and grant every allocation. Swap +// in a real ResourceManager to enforce quotas without touching any plugin. + +// noopResourceRegistrar accepts quota registrations without recording them. +type noopResourceRegistrar struct{} + +var _ pluginsCore.ResourceRegistrar = noopResourceRegistrar{} + +func (noopResourceRegistrar) RegisterResourceQuota(_ context.Context, _ pluginsCore.ResourceNamespace, _ int) error { + return nil +} + +// noopResourceManager grants every allocation request and ignores releases. +type noopResourceManager struct{} + +var _ pluginsCore.ResourceManager = noopResourceManager{} + +func (noopResourceManager) GetID() string { return "executor-noop-resource-manager" } + +func (noopResourceManager) AllocateResource(_ context.Context, _ pluginsCore.ResourceNamespace, _ string, _ pluginsCore.ResourceConstraintsSpec) (pluginsCore.AllocationStatus, error) { + return pluginsCore.AllocationStatusGranted, nil +} + +func (noopResourceManager) ReleaseResource(_ context.Context, _ pluginsCore.ResourceNamespace, _ string) error { + return nil +} + +// NewNoopResourceRegistrar returns a ResourceRegistrar that accepts quota declarations without +// enforcing them. +func NewNoopResourceRegistrar() pluginsCore.ResourceRegistrar { return noopResourceRegistrar{} } + +// NewNoopResourceManager returns a ResourceManager that grants every allocation. +func NewNoopResourceManager() pluginsCore.ResourceManager { return noopResourceManager{} } diff --git a/executor/pkg/plugin/resource_manager_test.go b/executor/pkg/plugin/resource_manager_test.go new file mode 100644 index 0000000000..f599a31228 --- /dev/null +++ b/executor/pkg/plugin/resource_manager_test.go @@ -0,0 +1,30 @@ +package plugin + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + + pluginsCore "github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/core" +) + +func TestNoopResourceRegistrar(t *testing.T) { + r := NewNoopResourceRegistrar() + assert.NotNil(t, r) + // Accepts a quota declaration (the connector ships ResourceQuotas{"default":1000}) without error. + assert.NoError(t, r.RegisterResourceQuota(context.Background(), "default", 1000)) +} + +func TestNoopResourceManager(t *testing.T) { + m := NewNoopResourceManager() + assert.NotNil(t, m) + assert.Equal(t, "executor-noop-resource-manager", m.GetID()) + + // Every allocation is granted, matching v1 propeller with no quota backend configured. + status, err := m.AllocateResource(context.Background(), "default", "token", pluginsCore.ResourceConstraintsSpec{}) + assert.NoError(t, err) + assert.Equal(t, pluginsCore.AllocationStatusGranted, status) + + assert.NoError(t, m.ReleaseResource(context.Background(), "default", "token")) +} diff --git a/executor/setup.go b/executor/setup.go index ef389c15b7..aede7c3898 100644 --- a/executor/setup.go +++ b/executor/setup.go @@ -29,6 +29,7 @@ import ( "github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/catalog" cachecatalog "github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/catalog/cache_service" webhookConfig "github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/secret/config" + connectorplugin "github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/plugins/webapi/connector" "github.com/flyteorg/flyte/v2/flytestdlib/app" "github.com/flyteorg/flyte/v2/flytestdlib/otelutils" "github.com/flyteorg/flyte/v2/flytestdlib/promutils" @@ -66,6 +67,11 @@ func Setup(ctx context.Context, sc *app.SetupContext) error { utilruntime.Must(reg.AddToScheme(scheme)) } + // Register the connector (webapi) backend plugin so task types backed by an external connector + // service are routed to it. This must run before plugin.NewRegistry below, which snapshots the + // core plugins once. + connectorplugin.RegisterConnectorPlugin(&connectorplugin.ConnectorService{}) + var tlsOpts []func(*tls.Config) if !cfg.EnableHTTP2 { tlsOpts = append(tlsOpts, func(c *tls.Config) { @@ -129,7 +135,7 @@ func Setup(ctx context.Context, sc *app.SetupContext) error { } setupCtx := plugin.NewSetupContext( - mgr, nil, nil, nil, nil, + mgr, nil, plugin.NewNoopResourceRegistrar(), nil, nil, "TaskAction", executorScope.NewSubScope("plugin"), ) @@ -177,6 +183,10 @@ func Setup(ctx context.Context, sc *app.SetupContext) error { reconciler.CatalogClient = asyncCatalogClient reconciler.Catalog = cacheClient reconciler.Recorder = mgr.GetEventRecorder("taskaction-controller") + // Supply a ResourceManager for the webapi allocation-token path, used by connector-backed task + // types that declare ResourceQuotas. It grants every allocation by default, matching + // FlytePropeller with no quota backend. Swap in a real one to enforce quotas. + reconciler.ResourceManager = plugin.NewNoopResourceManager() if cfg.MaxSystemFailures < 0 { return fmt.Errorf("executor: maxSystemFailures must be non-negative, got %d", cfg.MaxSystemFailures) } From 0683c345d3252cdd01958f76ec674c334e0de6ca Mon Sep 17 00:00:00 2001 From: Dejan Zele Pejchev Date: Sat, 20 Jun 2026 01:55:22 +0200 Subject: [PATCH 2/3] Provide a no-op SecretManager for connector tasks in the executor Signed-off-by: Dejan Zele Pejchev --- executor/pkg/plugin/secret_manager.go | 25 ++++++++++++++++++++++ executor/pkg/plugin/secret_manager_test.go | 19 ++++++++++++++++ executor/setup.go | 6 +++++- 3 files changed, 49 insertions(+), 1 deletion(-) create mode 100644 executor/pkg/plugin/secret_manager.go create mode 100644 executor/pkg/plugin/secret_manager_test.go diff --git a/executor/pkg/plugin/secret_manager.go b/executor/pkg/plugin/secret_manager.go new file mode 100644 index 0000000000..310262acdd --- /dev/null +++ b/executor/pkg/plugin/secret_manager.go @@ -0,0 +1,25 @@ +package plugin + +import ( + "context" + "fmt" + + pluginsCore "github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/core" +) + +// The webapi plugin machinery resolves task secrets through a SecretManager supplied by the host, at +// both plugin setup and task execution. The v2 executor never wired one up, so a connector task that +// references a secret dereferences a nil SecretManager and panics. This no-op restores the contract. +// It has no secret backend, so Get fails loudly with an error rather than handing a task a blank +// value. Swap in a real SecretManager to resolve secrets. +type noopSecretManager struct{} + +var _ pluginsCore.SecretManager = noopSecretManager{} + +func (noopSecretManager) Get(_ context.Context, key string) (string, error) { + return "", fmt.Errorf("secrets are not supported by the executor's no-op secret manager, cannot resolve %q", key) +} + +// NewNoopSecretManager returns a SecretManager with no backend that fails every lookup with a clear +// error, so a connector task referencing a secret does not silently receive an empty value. +func NewNoopSecretManager() pluginsCore.SecretManager { return noopSecretManager{} } diff --git a/executor/pkg/plugin/secret_manager_test.go b/executor/pkg/plugin/secret_manager_test.go new file mode 100644 index 0000000000..5e09aac319 --- /dev/null +++ b/executor/pkg/plugin/secret_manager_test.go @@ -0,0 +1,19 @@ +package plugin + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestNoopSecretManager(t *testing.T) { + m := NewNoopSecretManager() + assert.NotNil(t, m) + + // No backend: a lookup fails with an error instead of returning an empty secret, so a connector + // task that references a secret fails loudly rather than running with a blank value. + val, err := m.Get(context.Background(), "my-secret") + assert.Error(t, err) + assert.Empty(t, val) +} diff --git a/executor/setup.go b/executor/setup.go index aede7c3898..556810850e 100644 --- a/executor/setup.go +++ b/executor/setup.go @@ -135,7 +135,7 @@ func Setup(ctx context.Context, sc *app.SetupContext) error { } setupCtx := plugin.NewSetupContext( - mgr, nil, plugin.NewNoopResourceRegistrar(), nil, nil, + mgr, plugin.NewNoopSecretManager(), plugin.NewNoopResourceRegistrar(), nil, nil, "TaskAction", executorScope.NewSubScope("plugin"), ) @@ -187,6 +187,10 @@ func Setup(ctx context.Context, sc *app.SetupContext) error { // types that declare ResourceQuotas. It grants every allocation by default, matching // FlytePropeller with no quota backend. Swap in a real one to enforce quotas. reconciler.ResourceManager = plugin.NewNoopResourceManager() + // Supply a SecretManager so connector tasks that reference secrets do not nil-deref at execution + // time. It has no backend and fails lookups with a clear error. Swap in a real one to resolve + // secrets. + reconciler.SecretManager = plugin.NewNoopSecretManager() if cfg.MaxSystemFailures < 0 { return fmt.Errorf("executor: maxSystemFailures must be non-negative, got %d", cfg.MaxSystemFailures) } From 5cc1e1040e4e13a51a9516d24b5f72a527878413 Mon Sep 17 00:00:00 2001 From: Dejan Zele Pejchev Date: Sat, 20 Jun 2026 01:55:58 +0200 Subject: [PATCH 3/3] Drop debug stdout print from connector plugin registration Signed-off-by: Dejan Zele Pejchev --- flyteplugins/go/tasks/plugins/webapi/connector/plugin.go | 1 - 1 file changed, 1 deletion(-) diff --git a/flyteplugins/go/tasks/plugins/webapi/connector/plugin.go b/flyteplugins/go/tasks/plugins/webapi/connector/plugin.go index 543672a304..9114235bb9 100644 --- a/flyteplugins/go/tasks/plugins/webapi/connector/plugin.go +++ b/flyteplugins/go/tasks/plugins/webapi/connector/plugin.go @@ -494,7 +494,6 @@ func newConnectorPlugin(connectorService *ConnectorService) webapi.PluginEntry { } func RegisterConnectorPlugin(connectorService *ConnectorService) { - fmt.Printf("Registering connector plugin...\n") gob.Register(ResourceMetaWrapper{}) gob.Register(ResourceWrapper{}) pluginmachinery.PluginRegistry().RegisterRemotePlugin(newConnectorPlugin(connectorService))