From 65c9b5594837733d3f884558d6aa71a588ecfa6d Mon Sep 17 00:00:00 2001 From: Kyle Felter Date: Wed, 10 Jun 2026 03:39:54 -0500 Subject: [PATCH] feat: Add NICo Core gRPC passthrough for admin operations Signed-off-by: Kyle Felter --- .../pkg/api/handler/nicocorepassthrough.go | 279 ++++++++++++++++++ .../api/handler/nicocorepassthrough_test.go | 31 ++ .../api/pkg/api/model/nicocorepassthrough.go | 45 +++ rest-api/api/pkg/api/routes.go | 14 + .../pkg/nicopassthrough/nicopassthrough.go | 123 ++++++++ .../nicopassthrough/nicopassthrough_test.go | 66 +++++ rest-api/flow/cmd/serve.go | 9 + rest-api/flow/internal/nicoapi/passthrough.go | 187 ++++++++++++ .../flow/internal/nicoapi/passthrough_test.go | 123 ++++++++ .../activity/activity_test.go | 8 +- .../temporalworkflow/activity/passthrough.go | 62 ++++ .../temporalworkflow/activity/registry.go | 4 + .../activity/registry_test.go | 9 +- .../temporalworkflow/manager/manager.go | 6 +- .../temporalworkflow/workflow/passthrough.go | 51 ++++ rest-api/openapi/spec.yaml | 138 +++++++++ 16 files changed, 1146 insertions(+), 9 deletions(-) create mode 100644 rest-api/api/pkg/api/handler/nicocorepassthrough.go create mode 100644 rest-api/api/pkg/api/handler/nicocorepassthrough_test.go create mode 100644 rest-api/api/pkg/api/model/nicocorepassthrough.go create mode 100644 rest-api/common/pkg/nicopassthrough/nicopassthrough.go create mode 100644 rest-api/common/pkg/nicopassthrough/nicopassthrough_test.go create mode 100644 rest-api/flow/internal/nicoapi/passthrough.go create mode 100644 rest-api/flow/internal/nicoapi/passthrough_test.go create mode 100644 rest-api/flow/internal/task/executor/temporalworkflow/activity/passthrough.go create mode 100644 rest-api/flow/internal/task/executor/temporalworkflow/workflow/passthrough.go diff --git a/rest-api/api/pkg/api/handler/nicocorepassthrough.go b/rest-api/api/pkg/api/handler/nicocorepassthrough.go new file mode 100644 index 0000000000..8674b1db8d --- /dev/null +++ b/rest-api/api/pkg/api/handler/nicocorepassthrough.go @@ -0,0 +1,279 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package handler + +import ( + "context" + "errors" + "fmt" + "net/http" + + "github.com/google/uuid" + "github.com/labstack/echo/v4" + "github.com/rs/zerolog" + temporalEnums "go.temporal.io/api/enums/v1" + tClient "go.temporal.io/sdk/client" + tp "go.temporal.io/sdk/temporal" + + "github.com/NVIDIA/infra-controller/rest-api/api/internal/config" + "github.com/NVIDIA/infra-controller/rest-api/api/pkg/api/handler/util/common" + "github.com/NVIDIA/infra-controller/rest-api/api/pkg/api/model" + sc "github.com/NVIDIA/infra-controller/rest-api/api/pkg/client/site" + auth "github.com/NVIDIA/infra-controller/rest-api/auth/pkg/authorization" + npt "github.com/NVIDIA/infra-controller/rest-api/common/pkg/nicopassthrough" + cutil "github.com/NVIDIA/infra-controller/rest-api/common/pkg/util" + cdb "github.com/NVIDIA/infra-controller/rest-api/db/pkg/db" + cdbm "github.com/NVIDIA/infra-controller/rest-api/db/pkg/db/model" +) + +// nicoCorePassthroughBase carries the shared dependencies and authorization +// logic for the NICo Core gRPC passthrough handlers. +// +// The passthrough exposes the full NICo Core (forge.Forge) gRPC surface to +// Provider Admins through a single mechanism instead of a hand-written REST +// endpoint per operation. The request is gated here, then run on the site's +// Flow worker — which holds the direct mutual-TLS gRPC connection to Core — via +// a Temporal workflow. +type nicoCorePassthroughBase struct { + dbSession *cdb.Session + scp *sc.ClientPool + cfg *config.Config + tracerSpan *cutil.TracerSpan +} + +// authorize validates the caller is a Provider Admin for org, resolves the site +// from the required ?siteId query parameter, confirms it belongs to the org's +// Infrastructure Provider and has NICo Flow enabled, and returns the per-site +// Temporal client. A non-nil error is the echo response the caller must return. +func (b nicoCorePassthroughBase) authorize( + ctx context.Context, + c echo.Context, + logger zerolog.Logger, + org string, + dbUser *cdbm.User, +) (tClient.Client, error) { + if dbUser == nil { + logger.Error().Msg("invalid User object found in request context") + return nil, cutil.NewAPIErrorResponse(c, http.StatusInternalServerError, "Failed to retrieve current user", nil) + } + + ok, err := auth.ValidateOrgMembership(dbUser, org) + if !ok { + if err != nil { + logger.Error().Err(err).Msg("error validating org membership for User in request") + } else { + logger.Warn().Msg("could not validate org membership for user, access denied") + } + return nil, cutil.NewAPIErrorResponse(c, http.StatusForbidden, fmt.Sprintf("Failed to validate membership for org: %s", org), nil) + } + + // The passthrough is an internal/break-glass surface: Provider Admin only. + if ok := auth.ValidateUserRoles(dbUser, org, nil, auth.ProviderAdminRole); !ok { + logger.Warn().Msg("user does not have Provider Admin role, access denied") + return nil, cutil.NewAPIErrorResponse(c, http.StatusForbidden, "User does not have Provider Admin role with org", nil) + } + + provider, err := common.GetInfrastructureProviderForOrg(ctx, nil, b.dbSession, org) + if err != nil { + logger.Warn().Err(err).Msg("error getting infrastructure provider for org") + return nil, cutil.NewAPIErrorResponse(c, http.StatusBadRequest, "Failed to retrieve Infrastructure Provider for org", nil) + } + + siteStrID := c.QueryParam("siteId") + if siteStrID == "" { + return nil, cutil.NewAPIErrorResponse(c, http.StatusBadRequest, "siteId query parameter is required", nil) + } + + site, err := common.GetSiteFromIDString(ctx, nil, siteStrID, b.dbSession) + if err != nil { + if errors.Is(err, cdb.ErrDoesNotExist) { + return nil, cutil.NewAPIErrorResponse(c, http.StatusBadRequest, "Site specified in request does not exist", nil) + } + logger.Error().Err(err).Msg("error retrieving Site from DB") + return nil, cutil.NewAPIErrorResponse(c, http.StatusInternalServerError, "Failed to retrieve Site due to DB error", nil) + } + + if site.InfrastructureProviderID != provider.ID { + return nil, cutil.NewAPIErrorResponse(c, http.StatusForbidden, "Site specified in request doesn't belong to current org's Provider", nil) + } + + siteConfig := &cdbm.SiteConfig{} + if site.Config != nil { + siteConfig = site.Config + } + if !siteConfig.Flow { + logger.Warn().Msg("site does not have NICo Flow enabled") + return nil, cutil.NewAPIErrorResponse(c, http.StatusPreconditionFailed, "Site does not have NICo Flow enabled", nil) + } + + stc, err := b.scp.GetClientByID(site.ID) + if err != nil { + logger.Error().Err(err).Msg("failed to retrieve Temporal client for Site") + return nil, cutil.NewAPIErrorResponse(c, http.StatusInternalServerError, "Failed to retrieve client for Site", nil) + } + return stc, nil +} + +// runWorkflow starts the passthrough workflow on the site's Flow task queue and +// returns its result. A non-nil error is the echo response to return. +func (b nicoCorePassthroughBase) runWorkflow( + ctx context.Context, + c echo.Context, + logger zerolog.Logger, + stc tClient.Client, + req npt.Request, +) (npt.Response, error) { + workflowID := fmt.Sprintf("nico-core-passthrough-%s-%s", npt.MethodName(req.Method), uuid.NewString()) + if req.List { + workflowID = fmt.Sprintf("nico-core-passthrough-methods-%s", uuid.NewString()) + } + + workflowOptions := tClient.StartWorkflowOptions{ + ID: workflowID, + WorkflowExecutionTimeout: cutil.WorkflowExecutionTimeout, + TaskQueue: npt.TaskQueue, + WorkflowIDReusePolicy: temporalEnums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, + } + + wfCtx, cancel := context.WithTimeout(ctx, cutil.WorkflowContextTimeout) + defer cancel() + + we, err := stc.ExecuteWorkflow(wfCtx, workflowOptions, npt.WorkflowName, req) + if err != nil { + logger.Error().Err(err).Msg("failed to execute NICo Core passthrough workflow") + return npt.Response{}, cutil.NewAPIErrorResponse(c, http.StatusInternalServerError, "Failed to execute NICo Core passthrough", nil) + } + + var resp npt.Response + if err := we.Get(wfCtx, &resp); err != nil { + var timeoutErr *tp.TimeoutError + if errors.As(err, &timeoutErr) || errors.Is(err, context.DeadlineExceeded) || wfCtx.Err() != nil { + return npt.Response{}, common.TerminateWorkflowOnTimeOut(c, logger, stc, workflowID, err, "NICoCorePassthrough", npt.WorkflowName) + } + code, werr := common.UnwrapWorkflowError(err) + logger.Error().Err(werr).Msg("NICo Core passthrough workflow failed") + return npt.Response{}, cutil.NewAPIErrorResponse(c, code, fmt.Sprintf("NICo Core passthrough failed: %s", werr), nil) + } + return resp, nil +} + +// ~~~~~ Invoke handler ~~~~~ // + +// InvokeNICoCorePassthroughHandler invokes a single NICo Core gRPC method. +type InvokeNICoCorePassthroughHandler struct { + nicoCorePassthroughBase +} + +// NewInvokeNICoCorePassthroughHandler returns a handler for invoking NICo Core +// methods through the passthrough. +func NewInvokeNICoCorePassthroughHandler(dbSession *cdb.Session, scp *sc.ClientPool, cfg *config.Config) InvokeNICoCorePassthroughHandler { + return InvokeNICoCorePassthroughHandler{ + nicoCorePassthroughBase{dbSession: dbSession, scp: scp, cfg: cfg, tracerSpan: cutil.NewTracerSpan()}, + } +} + +// Handle godoc +// @Summary Invoke a NICo Core gRPC method (Provider Admin) +// @Description Invoke any NICo Core (forge.Forge) gRPC method by name with a protojson request. Read methods require Provider Admin; mutations additionally require allowMutation=true. +// @Tags nico-core-passthrough +// @Accept json +// @Produce json +// @Security ApiKeyAuth +// @Param org path string true "Name of NGC organization" +// @Param siteId query string true "ID of the Site" +// @Param request body model.APINICoCorePassthroughRequest true "Method and protojson request" +// @Success 200 {object} model.APINICoCorePassthroughResponse +// @Router /v2/org/{org}/nico/core/passthrough [post] +func (h InvokeNICoCorePassthroughHandler) Handle(c echo.Context) error { + org, dbUser, ctx, logger, handlerSpan := common.SetupHandler("NICoCorePassthrough", "Invoke", c, h.tracerSpan) + if handlerSpan != nil { + defer handlerSpan.End() + } + + var apiReq model.APINICoCorePassthroughRequest + if err := c.Bind(&apiReq); err != nil { + return cutil.NewAPIErrorResponse(c, http.StatusBadRequest, "Invalid request body", nil) + } + if apiReq.Method == "" { + return cutil.NewAPIErrorResponse(c, http.StatusBadRequest, "method is required", nil) + } + + stc, errResp := h.authorize(ctx, c, logger, org, dbUser) + if errResp != nil { + return errResp + } + + // Gate write/destructive methods behind an explicit opt-in, keeping + // break-glass mutations separate from read parity (epic #1927). + if npt.IsMutation(apiReq.Method) && !apiReq.AllowMutation { + return cutil.NewAPIErrorResponse( + c, http.StatusForbidden, + fmt.Sprintf("Method %q is a mutation; set allowMutation=true to permit it", npt.MethodName(apiReq.Method)), + nil, + ) + } + + resp, errResp := h.runWorkflow(ctx, c, logger, stc, npt.Request{ + Method: apiReq.Method, + RequestJSON: apiReq.Request, + AllowMutation: apiReq.AllowMutation, + }) + if errResp != nil { + return errResp + } + + return c.JSON(http.StatusOK, model.APINICoCorePassthroughResponse{ + Method: npt.MethodName(apiReq.Method), + Response: resp.ResponseJSON, + }) +} + +// ~~~~~ List methods handler ~~~~~ // + +// ListNICoCorePassthroughMethodsHandler returns the catalog of invocable NICo +// Core methods. +type ListNICoCorePassthroughMethodsHandler struct { + nicoCorePassthroughBase +} + +// NewListNICoCorePassthroughMethodsHandler returns a handler for listing the +// NICo Core method catalog. +func NewListNICoCorePassthroughMethodsHandler(dbSession *cdb.Session, scp *sc.ClientPool, cfg *config.Config) ListNICoCorePassthroughMethodsHandler { + return ListNICoCorePassthroughMethodsHandler{ + nicoCorePassthroughBase{dbSession: dbSession, scp: scp, cfg: cfg, tracerSpan: cutil.NewTracerSpan()}, + } +} + +// Handle godoc +// @Summary List invocable NICo Core gRPC methods (Provider Admin) +// @Description Return the catalog of NICo Core (forge.Forge) unary methods, each with its request/response message types and read/mutation classification. +// @Tags nico-core-passthrough +// @Accept json +// @Produce json +// @Security ApiKeyAuth +// @Param org path string true "Name of NGC organization" +// @Param siteId query string true "ID of the Site" +// @Success 200 {object} model.APINICoCoreMethodsResponse +// @Router /v2/org/{org}/nico/core/methods [get] +func (h ListNICoCorePassthroughMethodsHandler) Handle(c echo.Context) error { + org, dbUser, ctx, logger, handlerSpan := common.SetupHandler("NICoCorePassthrough", "ListMethods", c, h.tracerSpan) + if handlerSpan != nil { + defer handlerSpan.End() + } + + stc, errResp := h.authorize(ctx, c, logger, org, dbUser) + if errResp != nil { + return errResp + } + + resp, errResp := h.runWorkflow(ctx, c, logger, stc, npt.Request{List: true}) + if errResp != nil { + return errResp + } + + return c.JSON(http.StatusOK, model.APINICoCoreMethodsResponse{ + Service: npt.ServiceName, + Methods: resp.Methods, + }) +} diff --git a/rest-api/api/pkg/api/handler/nicocorepassthrough_test.go b/rest-api/api/pkg/api/handler/nicocorepassthrough_test.go new file mode 100644 index 0000000000..1ad0a894e4 --- /dev/null +++ b/rest-api/api/pkg/api/handler/nicocorepassthrough_test.go @@ -0,0 +1,31 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package handler + +import ( + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/labstack/echo/v4" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestInvokeNICoCorePassthrough_RequiresMethod verifies the handler rejects a +// request with no method before doing any authorization or Temporal work. +func TestInvokeNICoCorePassthrough_RequiresMethod(t *testing.T) { + e := echo.New() + req := httptest.NewRequest(http.MethodPost, "/", strings.NewReader(`{}`)) + req.Header.Set(echo.HeaderContentType, echo.MIMEApplicationJSON) + rec := httptest.NewRecorder() + c := e.NewContext(req, rec) + + h := NewInvokeNICoCorePassthroughHandler(nil, nil, nil) + err := h.Handle(c) + + require.NoError(t, err) + assert.Equal(t, http.StatusBadRequest, rec.Code) +} diff --git a/rest-api/api/pkg/api/model/nicocorepassthrough.go b/rest-api/api/pkg/api/model/nicocorepassthrough.go new file mode 100644 index 0000000000..a36e0a6285 --- /dev/null +++ b/rest-api/api/pkg/api/model/nicocorepassthrough.go @@ -0,0 +1,45 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package model + +import ( + "encoding/json" + + "github.com/NVIDIA/infra-controller/rest-api/common/pkg/nicopassthrough" +) + +// APINICoCorePassthroughRequest is the body for an admin NICo Core gRPC +// passthrough invocation. +type APINICoCorePassthroughRequest struct { + // Method is the NICo Core gRPC method to invoke, either bare + // ("FindMachineIds") or fully qualified ("/forge.Forge/FindMachineIds"). + Method string `json:"method"` + + // Request is the protojson-encoded request message for Method. Omit or send + // {} for methods with an empty request. + Request json.RawMessage `json:"request,omitempty"` + + // AllowMutation must be true to invoke a method classified as a + // write/destructive operation. Read methods ignore this field. + AllowMutation bool `json:"allowMutation,omitempty"` +} + +// APINICoCorePassthroughResponse is the result of a passthrough invocation. +type APINICoCorePassthroughResponse struct { + // Method is the bare method that was invoked. + Method string `json:"method"` + + // Response is the protojson-encoded response message from NICo Core. + Response json.RawMessage `json:"response,omitempty"` +} + +// APINICoCoreMethodsResponse is the catalog of invocable NICo Core methods. +type APINICoCoreMethodsResponse struct { + // Service is the fully qualified gRPC service name. + Service string `json:"service"` + + // Methods is the list of invocable unary methods with their request and + // response types and read/mutation classification. + Methods []nicopassthrough.MethodInfo `json:"methods"` +} diff --git a/rest-api/api/pkg/api/routes.go b/rest-api/api/pkg/api/routes.go index 324c081f54..019fa102d9 100644 --- a/rest-api/api/pkg/api/routes.go +++ b/rest-api/api/pkg/api/routes.go @@ -29,6 +29,20 @@ func NewAPIRoutes(dbSession *cdb.Session, tc tClient.Client, tnc tClient.Namespa Method: http.MethodGet, Handler: apiHandler.NewMetadataHandler(), }, + // NICo Core gRPC passthrough endpoints (Provider Admin only). + // Proxy any forge.Forge gRPC method to the site's Flow worker, which + // holds the direct connection to Core. Reads are open to Provider + // Admins; mutations require allowMutation=true. + { + Path: apiPathPrefix + "/core/passthrough", + Method: http.MethodPost, + Handler: apiHandler.NewInvokeNICoCorePassthroughHandler(dbSession, scp, cfg), + }, + { + Path: apiPathPrefix + "/core/methods", + Method: http.MethodGet, + Handler: apiHandler.NewListNICoCorePassthroughMethodsHandler(dbSession, scp, cfg), + }, // User endpoint { Path: apiPathPrefix + "/user/current", diff --git a/rest-api/common/pkg/nicopassthrough/nicopassthrough.go b/rest-api/common/pkg/nicopassthrough/nicopassthrough.go new file mode 100644 index 0000000000..d6cd1aa843 --- /dev/null +++ b/rest-api/common/pkg/nicopassthrough/nicopassthrough.go @@ -0,0 +1,123 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +// Package nicopassthrough holds the contract shared between the REST API and +// the on-site Flow worker for the NICo Core gRPC passthrough: the Temporal +// workflow name and task queue, the request/response envelope, the method +// catalog entry, and the read/mutation classifier. +// +// The passthrough lets a Provider Admin invoke any NICo Core (forge.Forge) +// gRPC method by name with a JSON-encoded request, without a hand-written REST +// endpoint per operation. The REST API gates the call and starts the Temporal +// workflow on the site; the Flow worker (which holds the direct mutual-TLS gRPC +// connection to Core) performs the JSON<->protobuf transcoding and the actual +// gRPC invocation. +package nicopassthrough + +import ( + "encoding/json" + "strings" +) + +const ( + // ServiceName is the fully qualified gRPC service exposed by NICo Core. + ServiceName = "forge.Forge" + + // WorkflowName is the Temporal workflow registered by the Flow worker and + // started by the REST API to drive a single passthrough invocation. + WorkflowName = "InvokeNICoCorePassthrough" + + // TaskQueue is the Temporal task queue the Flow worker polls. It must match + // flow/internal/task/executor/temporalworkflow/manager.WorkflowQueue. + TaskQueue = "flow-tasks" +) + +// Request is the Temporal workflow input describing one passthrough call. +type Request struct { + // Method is the Core gRPC method, either bare ("FindMachineIds") or fully + // qualified ("/forge.Forge/FindMachineIds"). + Method string `json:"method,omitempty"` + + // RequestJSON is the protojson-encoded request message for Method. An empty + // value is treated as the zero-valued request message. + RequestJSON json.RawMessage `json:"requestJson,omitempty"` + + // AllowMutation must be set to true to run a method classified as a + // mutation. Read methods ignore this field. + AllowMutation bool `json:"allowMutation,omitempty"` + + // List, when true, returns the Core method catalog instead of invoking a + // method. Method and RequestJSON are ignored. + List bool `json:"list,omitempty"` +} + +// Response is the Temporal workflow output for a passthrough call. +type Response struct { + // ResponseJSON is the protojson-encoded response message, set for invoke + // calls. + ResponseJSON json.RawMessage `json:"responseJson,omitempty"` + + // Methods is the Core method catalog, set for List calls. + Methods []MethodInfo `json:"methods,omitempty"` +} + +// MethodInfo describes a single invocable Core gRPC method. +type MethodInfo struct { + // Method is the bare method name (e.g. "FindMachineIds"). + Method string `json:"method"` + // FullMethod is the fully qualified gRPC path (e.g. "/forge.Forge/FindMachineIds"). + FullMethod string `json:"fullMethod"` + // InputType is the fully qualified protobuf message name of the request. + InputType string `json:"inputType"` + // OutputType is the fully qualified protobuf message name of the response. + OutputType string `json:"outputType"` + // Mutation reports whether this method is classified as a write/destructive + // operation that requires AllowMutation. + Mutation bool `json:"mutation"` + // Deprecated reports whether the proto marks this method deprecated. + Deprecated bool `json:"deprecated,omitempty"` +} + +// readPrefixes are the method-name prefixes treated as read-only. Anything that +// does not match one of these is classified as a mutation, so newly added Core +// methods default to the safer (mutation, opt-in) side of the gate. +var readPrefixes = []string{ + "Find", + "Get", + "List", + "Search", + "Lookup", + "Identify", + "Determine", + "Version", + "Echo", +} + +// MethodName returns the bare method name for a bare or fully qualified method. +func MethodName(method string) string { + method = strings.TrimPrefix(method, "/") + if i := strings.LastIndex(method, "/"); i >= 0 { + return method[i+1:] + } + return method +} + +// FullMethod returns the canonical "/forge.Forge/" gRPC path for a bare +// or already-qualified method name. +func FullMethod(method string) string { + return "/" + ServiceName + "/" + MethodName(method) +} + +// IsMutation reports whether method is classified as a write/destructive +// operation. The classifier is intentionally default-deny: only well-known +// read prefixes are treated as read-only; every other method (including ones +// not yet known to this code) is treated as a mutation. +func IsMutation(method string) bool { + name := MethodName(method) + for _, p := range readPrefixes { + if strings.HasPrefix(name, p) { + return false + } + } + return true +} diff --git a/rest-api/common/pkg/nicopassthrough/nicopassthrough_test.go b/rest-api/common/pkg/nicopassthrough/nicopassthrough_test.go new file mode 100644 index 0000000000..d481141fba --- /dev/null +++ b/rest-api/common/pkg/nicopassthrough/nicopassthrough_test.go @@ -0,0 +1,66 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package nicopassthrough + +import "testing" + +func TestMethodName(t *testing.T) { + cases := map[string]string{ + "FindMachineIds": "FindMachineIds", + "/forge.Forge/FindMachineIds": "FindMachineIds", + "forge.Forge/CreateVpc": "CreateVpc", + "": "", + } + for in, want := range cases { + if got := MethodName(in); got != want { + t.Errorf("MethodName(%q) = %q, want %q", in, got, want) + } + } +} + +func TestFullMethod(t *testing.T) { + cases := map[string]string{ + "FindMachineIds": "/forge.Forge/FindMachineIds", + "/forge.Forge/FindMachineIds": "/forge.Forge/FindMachineIds", + "CreateVpc": "/forge.Forge/CreateVpc", + } + for in, want := range cases { + if got := FullMethod(in); got != want { + t.Errorf("FullMethod(%q) = %q, want %q", in, got, want) + } + } +} + +func TestIsMutation(t *testing.T) { + reads := []string{ + "Version", "Echo", + "FindMachineIds", "FindVpcsByIds", + "GetPowerOptions", "GetComponentInventory", + "ListMachineHealthReports", "SearchVpcPrefixes", + "LookupRecord", "IdentifyUuid", "DetermineMachineIngestionState", + "/forge.Forge/FindMachineIds", + } + for _, m := range reads { + if IsMutation(m) { + t.Errorf("IsMutation(%q) = true, want false (read method)", m) + } + } + + mutations := []string{ + "CreateVpc", "UpdateVpc", "DeleteVpc", + "AddExpectedMachine", "RemoveStaticAddress", + "SetPowerShelfMaintenance", "InsertMachineHealthReport", + "AllocateInstance", "ReleaseInstance", + "AdminForceDeleteSwitch", "InvokeInstancePower", + "AssignStaticAddress", + // Default-deny: an unknown / newly added method is treated as a mutation. + "SomeBrandNewUnclassifiedMethod", + "/forge.Forge/CreateVpc", + } + for _, m := range mutations { + if !IsMutation(m) { + t.Errorf("IsMutation(%q) = false, want true (mutation/default-deny)", m) + } + } +} diff --git a/rest-api/flow/cmd/serve.go b/rest-api/flow/cmd/serve.go index 04ab3204b7..8b4d0d240f 100644 --- a/rest-api/flow/cmd/serve.go +++ b/rest-api/flow/cmd/serve.go @@ -233,6 +233,15 @@ func doServe() { ComponentManagerRegistry: cmRegistry, } + // Wire the optional NICo Core gRPC passthrough. It dials Core with the same + // URL and certificates as the typed client; when those are absent the + // passthrough activity is still registered but returns an error if invoked. + if passthrough, err := nicoapi.NewPassthrough(time.Minute); err != nil { + log.Warn().Err(err).Msg("NICo Core gRPC passthrough disabled") + } else { + temporalManagerConf.CoreInvoker = passthrough + } + if os.Getenv("REPORT_NICO_API_VERSION") != "" { // Do some basic nico-api requests, mainly for early testing; this code can be removed when we're doing actual communication go func() { diff --git a/rest-api/flow/internal/nicoapi/passthrough.go b/rest-api/flow/internal/nicoapi/passthrough.go new file mode 100644 index 0000000000..f967825757 --- /dev/null +++ b/rest-api/flow/internal/nicoapi/passthrough.go @@ -0,0 +1,187 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package nicoapi + +import ( + "context" + "errors" + "fmt" + "os" + "time" + + "github.com/rs/zerolog/log" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/reflect/protoreflect" + "google.golang.org/protobuf/reflect/protoregistry" + "google.golang.org/protobuf/types/descriptorpb" + "google.golang.org/protobuf/types/dynamicpb" + + "github.com/NVIDIA/infra-controller/rest-api/common/pkg/nicopassthrough" + "github.com/NVIDIA/infra-controller/rest-api/flow/internal/certs" + "github.com/NVIDIA/infra-controller/rest-api/flow/internal/common/grpclog" + + // Blank import guarantees the forge.Forge file descriptors are registered + // in protoregistry.GlobalFiles even if grpc.go stops importing gen. + _ "github.com/NVIDIA/infra-controller/rest-api/flow/internal/nicoapi/gen" +) + +// ErrUnknownMethod is returned when a passthrough method name does not resolve +// to a unary RPC on the NICo Core (forge.Forge) service. +var ErrUnknownMethod = errors.New("unknown NICo Core method") + +// Passthrough is a thin client that invokes arbitrary NICo Core (forge.Forge) +// unary gRPC methods by name, transcoding between protojson and protobuf using +// the descriptors compiled into this binary. It holds its own connection to +// Core, independent of the typed Client above, so it does not widen the Client +// interface or its mock. +type Passthrough struct { + conn *grpc.ClientConn + grpcTimeout time.Duration +} + +// NewPassthrough dials NICo Core using the same URL and mutual-TLS material as +// NewClient and returns a passthrough client. It returns an error when the Core +// URL or certificates are absent so the caller can run without the passthrough. +func NewPassthrough(grpcTimeout time.Duration) (*Passthrough, error) { + nicoURL := os.Getenv("NICO_CORE_API_URL") + if nicoURL == "" { + return nil, errors.New("NICO_CORE_API_URL not set, cannot make connections to NICo Core") + } + + tlsConfig, _, err := certs.TLSConfig() + if err != nil { + if errors.Is(err, certs.ErrNotPresent) { + return nil, errors.New("Certificates not present, unable to authenticate with nico-core-api") + } + return nil, err + } + + conn, err := grpc.NewClient( + nicoURL, + grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)), + grpc.WithChainUnaryInterceptor(grpclog.UnaryClientInterceptor("nico-core-api-passthrough")), + ) + if err != nil { + return nil, fmt.Errorf("Unable to connect to nico-core-api: %w", err) + } + + return &Passthrough{conn: conn, grpcTimeout: grpcTimeout}, nil +} + +// Invoke transcodes reqJSON into the request message for method, calls Core, and +// returns the protojson-encoded response. An empty reqJSON is treated as the +// zero-valued request message. +func (p *Passthrough) Invoke(ctx context.Context, method string, reqJSON []byte) ([]byte, error) { + ctx, cancel := context.WithTimeout(ctx, p.grpcTimeout) + defer cancel() + return invokeJSON(ctx, p.conn, method, reqJSON) +} + +// ListMethods returns the catalog of invocable NICo Core methods. +func (p *Passthrough) ListMethods() ([]nicopassthrough.MethodInfo, error) { + return listMethods() +} + +// Close releases the underlying connection. +func (p *Passthrough) Close() { + if p.conn != nil { + if err := p.conn.Close(); err != nil { + log.Warn().Err(err).Msg("error closing NICo Core passthrough connection") + } + } +} + +// coreService resolves the forge.Forge service descriptor from the global +// registry populated by the generated code. +func coreService() (protoreflect.ServiceDescriptor, error) { + desc, err := protoregistry.GlobalFiles.FindDescriptorByName(protoreflect.FullName(nicopassthrough.ServiceName)) + if err != nil { + return nil, fmt.Errorf("resolve service %q: %w", nicopassthrough.ServiceName, err) + } + svc, ok := desc.(protoreflect.ServiceDescriptor) + if !ok { + return nil, fmt.Errorf("%q is not a gRPC service", nicopassthrough.ServiceName) + } + return svc, nil +} + +// resolveMethod returns the unary method descriptor for the given bare or +// fully qualified method name. +func resolveMethod(method string) (protoreflect.MethodDescriptor, error) { + svc, err := coreService() + if err != nil { + return nil, err + } + + md := svc.Methods().ByName(protoreflect.Name(nicopassthrough.MethodName(method))) + if md == nil { + return nil, fmt.Errorf("%w: %q", ErrUnknownMethod, nicopassthrough.MethodName(method)) + } + if md.IsStreamingClient() || md.IsStreamingServer() { + return nil, fmt.Errorf("method %q is streaming and not supported by the passthrough", md.Name()) + } + return md, nil +} + +func invokeJSON(ctx context.Context, conn grpc.ClientConnInterface, method string, reqJSON []byte) ([]byte, error) { + md, err := resolveMethod(method) + if err != nil { + return nil, err + } + + in := dynamicpb.NewMessage(md.Input()) + if len(reqJSON) > 0 { + if err := (protojson.UnmarshalOptions{DiscardUnknown: true}).Unmarshal(reqJSON, in); err != nil { + return nil, fmt.Errorf("decode request for %q: %w", md.Name(), err) + } + } + + out := dynamicpb.NewMessage(md.Output()) + if err := conn.Invoke(ctx, nicopassthrough.FullMethod(method), in, out); err != nil { + return nil, err + } + + respJSON, err := protojson.Marshal(out) + if err != nil { + return nil, fmt.Errorf("encode response for %q: %w", md.Name(), err) + } + return respJSON, nil +} + +func listMethods() ([]nicopassthrough.MethodInfo, error) { + svc, err := coreService() + if err != nil { + return nil, err + } + + methods := svc.Methods() + infos := make([]nicopassthrough.MethodInfo, 0, methods.Len()) + for i := 0; i < methods.Len(); i++ { + md := methods.Get(i) + // The passthrough only supports unary RPCs. + if md.IsStreamingClient() || md.IsStreamingServer() { + continue + } + name := string(md.Name()) + infos = append(infos, nicopassthrough.MethodInfo{ + Method: name, + FullMethod: nicopassthrough.FullMethod(name), + InputType: string(md.Input().FullName()), + OutputType: string(md.Output().FullName()), + Mutation: nicopassthrough.IsMutation(name), + Deprecated: methodDeprecated(md), + }) + } + return infos, nil +} + +func methodDeprecated(md protoreflect.MethodDescriptor) bool { + opts, ok := md.Options().(*descriptorpb.MethodOptions) + if !ok || opts == nil { + return false + } + return opts.GetDeprecated() +} diff --git a/rest-api/flow/internal/nicoapi/passthrough_test.go b/rest-api/flow/internal/nicoapi/passthrough_test.go new file mode 100644 index 0000000000..a92e4ca603 --- /dev/null +++ b/rest-api/flow/internal/nicoapi/passthrough_test.go @@ -0,0 +1,123 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package nicoapi + +import ( + "context" + "errors" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/reflect/protoreflect" + + "github.com/NVIDIA/infra-controller/rest-api/common/pkg/nicopassthrough" +) + +// fakeConn implements grpc.ClientConnInterface for transcoder tests. It records +// the invoked method and populates the first scalar string field of the reply +// message so the response-encoding path is exercised without a real server. +type fakeConn struct { + lastMethod string + setValue string + invokeErr error +} + +func (f *fakeConn) Invoke(_ context.Context, method string, _, reply any, _ ...grpc.CallOption) error { + f.lastMethod = method + if f.invokeErr != nil { + return f.invokeErr + } + msg, ok := reply.(proto.Message) + if !ok { + return errors.New("reply is not a proto.Message") + } + pm := msg.ProtoReflect() + fields := pm.Descriptor().Fields() + for i := 0; i < fields.Len(); i++ { + fd := fields.Get(i) + if fd.Kind() == protoreflect.StringKind && fd.Cardinality() != protoreflect.Repeated { + pm.Set(fd, protoreflect.ValueOfString(f.setValue)) + break + } + } + return nil +} + +func (f *fakeConn) NewStream(context.Context, *grpc.StreamDesc, string, ...grpc.CallOption) (grpc.ClientStream, error) { + return nil, errors.New("streaming not supported") +} + +func TestResolveMethod(t *testing.T) { + t.Run("bare name", func(t *testing.T) { + md, err := resolveMethod("Version") + require.NoError(t, err) + assert.Equal(t, "Version", string(md.Name())) + }) + + t.Run("fully qualified name", func(t *testing.T) { + md, err := resolveMethod("/forge.Forge/CreateVpc") + require.NoError(t, err) + assert.Equal(t, "CreateVpc", string(md.Name())) + }) + + t.Run("unknown method", func(t *testing.T) { + _, err := resolveMethod("DefinitelyNotARealMethod") + require.Error(t, err) + assert.True(t, errors.Is(err, ErrUnknownMethod)) + }) +} + +func TestInvokeJSON(t *testing.T) { + t.Run("round trips request and response", func(t *testing.T) { + conn := &fakeConn{setValue: "passthrough-test-value"} + respJSON, err := invokeJSON(context.Background(), conn, "Version", nil) + require.NoError(t, err) + assert.Equal(t, "/forge.Forge/Version", conn.lastMethod) + assert.Contains(t, string(respJSON), "passthrough-test-value") + }) + + t.Run("rejects unknown method before dialing", func(t *testing.T) { + conn := &fakeConn{} + _, err := invokeJSON(context.Background(), conn, "NopeNotReal", nil) + require.Error(t, err) + assert.True(t, errors.Is(err, ErrUnknownMethod)) + assert.Empty(t, conn.lastMethod, "transport must not be invoked for an unknown method") + }) + + t.Run("rejects malformed request json", func(t *testing.T) { + conn := &fakeConn{} + _, err := invokeJSON(context.Background(), conn, "FindMachinesByIds", []byte("{not valid json")) + require.Error(t, err) + assert.Empty(t, conn.lastMethod) + }) +} + +func TestListMethods(t *testing.T) { + methods, err := listMethods() + require.NoError(t, err) + require.NotEmpty(t, methods) + + byName := make(map[string]nicopassthrough.MethodInfo, len(methods)) + for _, m := range methods { + byName[m.Method] = m + assert.True(t, strings.HasPrefix(m.FullMethod, "/forge.Forge/"), "full method should be qualified: %s", m.FullMethod) + assert.NotEmpty(t, m.InputType) + assert.NotEmpty(t, m.OutputType) + } + + // Read methods are not mutations; create/delete style methods are. + if v, ok := byName["FindMachineIds"]; ok { + assert.False(t, v.Mutation, "FindMachineIds should be classified read-only") + } + if v, ok := byName["Version"]; ok { + assert.False(t, v.Mutation, "Version should be classified read-only") + } + if v, ok := byName["CreateVpc"]; ok { + assert.True(t, v.Mutation, "CreateVpc should be classified as a mutation") + } +} diff --git a/rest-api/flow/internal/task/executor/temporalworkflow/activity/activity_test.go b/rest-api/flow/internal/task/executor/temporalworkflow/activity/activity_test.go index 19d1d4357d..559947dce7 100644 --- a/rest-api/flow/internal/task/executor/temporalworkflow/activity/activity_test.go +++ b/rest-api/flow/internal/task/executor/temporalworkflow/activity/activity_test.go @@ -21,7 +21,7 @@ import ( ) func TestActivitiesReturnErrorWhenComponentManagerRegistryIsMissing(t *testing.T) { - acts := New(nil, nil, nil) + acts := New(nil, nil, nil, nil) for name, call := range activityCallsForMissingManagerTest(t, acts) { t.Run(name, func(t *testing.T) { @@ -40,7 +40,7 @@ func TestActivitiesReturnErrorWhenComponentManagerIsMissing(t *testing.T) { ) require.NoError(t, err) - acts := New(nil, nil, registry) + acts := New(nil, nil, registry, nil) for name, call := range activityCallsForMissingManagerTest(t, acts) { t.Run(name, func(t *testing.T) { @@ -301,7 +301,7 @@ func newCapabilityTestActivities( ) require.NoError(t, err) - return New(nil, nil, registry), manager + return New(nil, nil, registry, nil), manager } type descriptorOnlyManager struct { @@ -345,7 +345,7 @@ func newDescriptorOnlyActivities( ) require.NoError(t, err) - return New(nil, nil, registry) + return New(nil, nil, registry, nil) } func newActivityTestTarget() common.Target { diff --git a/rest-api/flow/internal/task/executor/temporalworkflow/activity/passthrough.go b/rest-api/flow/internal/task/executor/temporalworkflow/activity/passthrough.go new file mode 100644 index 0000000000..a22b3953d4 --- /dev/null +++ b/rest-api/flow/internal/task/executor/temporalworkflow/activity/passthrough.go @@ -0,0 +1,62 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package activity + +import ( + "context" + "errors" + "fmt" + + "github.com/NVIDIA/infra-controller/rest-api/common/pkg/nicopassthrough" +) + +// NameInvokeCorePassthrough is the Temporal activity name for the NICo Core +// gRPC passthrough. +const NameInvokeCorePassthrough = "InvokeNICoCorePassthrough" + +// CoreInvoker invokes NICo Core (forge.Forge) gRPC methods by name and lists +// the available method catalog. It is satisfied by nicoapi.Passthrough. +type CoreInvoker interface { + Invoke(ctx context.Context, method string, reqJSON []byte) ([]byte, error) + ListMethods() ([]nicopassthrough.MethodInfo, error) +} + +// InvokeCorePassthrough runs a single passthrough request against NICo Core. +// When req.List is set it returns the method catalog; otherwise it transcodes +// the request and invokes the named method. Mutation gating is enforced here +// as defense in depth — the REST API gates the same request before dispatch. +func (a *Activities) InvokeCorePassthrough( + ctx context.Context, + req nicopassthrough.Request, +) (nicopassthrough.Response, error) { + if a.coreInvoker == nil { + return nicopassthrough.Response{}, errors.New( + "NICo Core passthrough is not configured on this Flow worker", + ) + } + + if req.List { + methods, err := a.coreInvoker.ListMethods() + if err != nil { + return nicopassthrough.Response{}, fmt.Errorf("list NICo Core methods: %w", err) + } + return nicopassthrough.Response{Methods: methods}, nil + } + + if req.Method == "" { + return nicopassthrough.Response{}, errors.New("method is required") + } + + if nicopassthrough.IsMutation(req.Method) && !req.AllowMutation { + return nicopassthrough.Response{}, fmt.Errorf( + "method %q is a mutation and requires allowMutation", nicopassthrough.MethodName(req.Method), + ) + } + + respJSON, err := a.coreInvoker.Invoke(ctx, req.Method, req.RequestJSON) + if err != nil { + return nicopassthrough.Response{}, err + } + return nicopassthrough.Response{ResponseJSON: respJSON}, nil +} diff --git a/rest-api/flow/internal/task/executor/temporalworkflow/activity/registry.go b/rest-api/flow/internal/task/executor/temporalworkflow/activity/registry.go index 318303abd7..551fdbafee 100644 --- a/rest-api/flow/internal/task/executor/temporalworkflow/activity/registry.go +++ b/rest-api/flow/internal/task/executor/temporalworkflow/activity/registry.go @@ -17,6 +17,7 @@ type Activities struct { updater task.TaskStatusUpdater reportUpdater task.TaskReportUpdater registry *componentmanager.Registry + coreInvoker CoreInvoker } // New creates an Activities instance. Any argument may be nil; activity @@ -25,11 +26,13 @@ func New( updater task.TaskStatusUpdater, reportUpdater task.TaskReportUpdater, registry *componentmanager.Registry, + coreInvoker CoreInvoker, ) *Activities { return &Activities{ updater: updater, reportUpdater: reportUpdater, registry: registry, + coreInvoker: coreInvoker, } } @@ -49,5 +52,6 @@ func (a *Activities) All() map[string]any { NameBringUpControl: a.BringUpControl, NameGetBringUpStatus: a.GetBringUpStatus, NameVerifyFirmwareConsistency: a.VerifyFirmwareConsistency, + NameInvokeCorePassthrough: a.InvokeCorePassthrough, } } diff --git a/rest-api/flow/internal/task/executor/temporalworkflow/activity/registry_test.go b/rest-api/flow/internal/task/executor/temporalworkflow/activity/registry_test.go index f2a7b33286..c7a81cca91 100644 --- a/rest-api/flow/internal/task/executor/temporalworkflow/activity/registry_test.go +++ b/rest-api/flow/internal/task/executor/temporalworkflow/activity/registry_test.go @@ -13,7 +13,7 @@ import ( // TestActivities_All_ContainsAllActivities verifies that All() returns every // expected activity name with a non-nil function value. func TestActivities_All_ContainsAllActivities(t *testing.T) { - acts := New(nil, nil, nil) + acts := New(nil, nil, nil, nil) all := acts.All() expectedNames := []string{ @@ -27,6 +27,7 @@ func TestActivities_All_ContainsAllActivities(t *testing.T) { NameBringUpControl, NameGetBringUpStatus, NameVerifyFirmwareConsistency, + NameInvokeCorePassthrough, } require.Len(t, all, len(expectedNames), "unexpected number of activities") @@ -39,7 +40,7 @@ func TestActivities_All_ContainsAllActivities(t *testing.T) { // TestActivities_All_ReturnsCopy verifies that mutating the returned map does // not affect subsequent calls — each call produces an independent map. func TestActivities_All_ReturnsCopy(t *testing.T) { - acts := New(nil, nil, nil) + acts := New(nil, nil, nil, nil) first := acts.All() firstLen := len(first) @@ -53,8 +54,8 @@ func TestActivities_All_ReturnsCopy(t *testing.T) { // TestActivities_Isolation verifies that two Activities instances do not share // state: mutations to one instance's map must not affect the other. func TestActivities_Isolation(t *testing.T) { - a1 := New(nil, nil, nil) - a2 := New(nil, nil, nil) + a1 := New(nil, nil, nil, nil) + a2 := New(nil, nil, nil, nil) m1 := a1.All() m1["isolation-sentinel"] = func() {} diff --git a/rest-api/flow/internal/task/executor/temporalworkflow/manager/manager.go b/rest-api/flow/internal/task/executor/temporalworkflow/manager/manager.go index b9d268c10c..6f1f3b5eab 100644 --- a/rest-api/flow/internal/task/executor/temporalworkflow/manager/manager.go +++ b/rest-api/flow/internal/task/executor/temporalworkflow/manager/manager.go @@ -39,6 +39,10 @@ type Config struct { // ComponentManagerRegistry is the registry containing initialized component managers. ComponentManagerRegistry *componentmanager.Registry + + // CoreInvoker backs the NICo Core gRPC passthrough activity. May be nil, in + // which case passthrough activity calls return an error at invocation time. + CoreInvoker activity.CoreInvoker } // Validate checks that the configuration is complete and consistent. @@ -100,7 +104,7 @@ func (c *Config) Build( // Bind dependencies into an Activities instance so each manager has its // own isolated copy — no shared mutable globals between managers. - acts := activity.New(updater, reportUpdater, c.ComponentManagerRegistry) + acts := activity.New(updater, reportUpdater, c.ComponentManagerRegistry, c.CoreInvoker) publisherClient, err := temporal.New(c.ClientConf) if err != nil { diff --git a/rest-api/flow/internal/task/executor/temporalworkflow/workflow/passthrough.go b/rest-api/flow/internal/task/executor/temporalworkflow/workflow/passthrough.go new file mode 100644 index 0000000000..8904c7fe6d --- /dev/null +++ b/rest-api/flow/internal/task/executor/temporalworkflow/workflow/passthrough.go @@ -0,0 +1,51 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package workflow + +import ( + "time" + + "go.temporal.io/sdk/temporal" + "go.temporal.io/sdk/workflow" + + "github.com/NVIDIA/infra-controller/rest-api/common/pkg/nicopassthrough" + "github.com/NVIDIA/infra-controller/rest-api/flow/internal/task/executor/temporalworkflow/activity" +) + +// init registers the NICo Core passthrough as an internal workflow (no +// TaskType): it is started directly by name from the REST API, not dispatched +// through the component-task Execute() path. +func init() { + register(WorkflowDescriptor{ + WorkflowName: nicopassthrough.WorkflowName, + WorkflowFunc: invokeCorePassthrough, + }) +} + +// corePassthroughActivityOptions intentionally disables automatic retries: a +// passthrough request may be a non-idempotent mutation, so the activity runs +// exactly once and the caller decides whether to retry. +var corePassthroughActivityOptions = workflow.ActivityOptions{ + StartToCloseTimeout: 2 * time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + MaximumAttempts: 1, + }, +} + +// invokeCorePassthrough is a thin workflow that runs the passthrough activity +// on the Flow worker (which holds the direct gRPC connection to NICo Core) and +// returns its result to the REST API. +func invokeCorePassthrough( + ctx workflow.Context, + req nicopassthrough.Request, +) (nicopassthrough.Response, error) { + ctx = workflow.WithActivityOptions(ctx, corePassthroughActivityOptions) + + var resp nicopassthrough.Response + err := workflow.ExecuteActivity(ctx, activity.NameInvokeCorePassthrough, req).Get(ctx, &resp) + if err != nil { + return nicopassthrough.Response{}, err + } + return resp, nil +} diff --git a/rest-api/openapi/spec.yaml b/rest-api/openapi/spec.yaml index 3f843ca691..80d194a6b4 100644 --- a/rest-api/openapi/spec.yaml +++ b/rest-api/openapi/spec.yaml @@ -313,6 +313,85 @@ tags: - `rackLevelAdministration` capability attribute was deprecated in favor of `flow` and was removed on May 13th, 2026 0:00 UTC. Please use `flow` instead. - `isRackLevelAdministrationEnabled` query parameter was deprecated in favor of `isFlowEnabled` and was removed on May 13th, 2026 0:00 UTC. Please use `isFlowEnabled` instead. paths: + '/v2/org/{org}/nico/core/passthrough': + parameters: + - schema: + type: string + name: org + in: path + required: true + description: Name of the Org + post: + summary: Invoke a NICo Core gRPC method (Provider Admin) + tags: + - NICo Core Passthrough + parameters: + - schema: + type: string + format: uuid + name: siteId + in: query + required: true + description: ID of the Site whose Flow worker proxies to NICo Core + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/NICoCorePassthroughRequest' + responses: + '200': + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/NICoCorePassthroughResponse' + '403': + $ref: '#/components/responses/ForbiddenError' + operationId: invoke-nico-core-passthrough + description: |- + Invoke any NICo Core (forge.Forge) unary gRPC method by name with a + protojson-encoded request, proxied through the Site's Flow worker. + + User must have authorization role with `PROVIDER_ADMIN` suffix. Methods + classified as mutations (anything other than Find/Get/List/Search/Lookup/Identify/Determine/Version/Echo) + additionally require `allowMutation: true`. + '/v2/org/{org}/nico/core/methods': + parameters: + - schema: + type: string + name: org + in: path + required: true + description: Name of the Org + get: + summary: List invocable NICo Core gRPC methods (Provider Admin) + tags: + - NICo Core Passthrough + parameters: + - schema: + type: string + format: uuid + name: siteId + in: query + required: true + description: ID of the Site whose Flow worker proxies to NICo Core + responses: + '200': + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/NICoCoreMethodsResponse' + '403': + $ref: '#/components/responses/ForbiddenError' + operationId: list-nico-core-methods + description: |- + Return the catalog of invocable NICo Core (forge.Forge) unary methods, + each with its request/response protobuf message types and read/mutation + classification. + + User must have authorization role with `PROVIDER_ADMIN` suffix. '/v2/org/{org}/nico/service-account/current': parameters: - schema: @@ -12744,6 +12823,65 @@ paths: $ref: '#/components/responses/GenericHttpError' components: schemas: + NICoCorePassthroughRequest: + type: object + title: NICoCorePassthroughRequest + description: A NICo Core gRPC passthrough invocation. + required: + - method + properties: + method: + type: string + description: 'NICo Core gRPC method, bare ("FindMachineIds") or fully qualified ("/forge.Forge/FindMachineIds").' + request: + type: object + additionalProperties: true + description: protojson-encoded request message for the method. Omit for methods with an empty request. + allowMutation: + type: boolean + default: false + description: Must be true to invoke a method classified as a write/destructive operation. + NICoCorePassthroughResponse: + type: object + title: NICoCorePassthroughResponse + description: Result of a NICo Core gRPC passthrough invocation. + properties: + method: + type: string + description: The bare method that was invoked. + response: + type: object + additionalProperties: true + description: protojson-encoded response message from NICo Core. + NICoCoreMethodsResponse: + type: object + title: NICoCoreMethodsResponse + description: Catalog of invocable NICo Core gRPC methods. + properties: + service: + type: string + description: Fully qualified gRPC service name. + methods: + type: array + items: + $ref: '#/components/schemas/NICoCoreMethod' + NICoCoreMethod: + type: object + title: NICoCoreMethod + description: A single invocable NICo Core gRPC method. + properties: + method: + type: string + fullMethod: + type: string + inputType: + type: string + outputType: + type: string + mutation: + type: boolean + deprecated: + type: boolean InfrastructureProvider: description: Infrastructure providers own and manage datacenters type: object