From 663e33dc5e98ab2af1c533a439a883440d8512a5 Mon Sep 17 00:00:00 2001 From: Maxence Maireaux Date: Tue, 26 May 2026 17:08:03 +0200 Subject: [PATCH 1/5] feat(stack): add stdio MCP bridge --- cmd/stack/mcp/root.go | 396 +++++++++++++++++++++++++++++++++++++ cmd/stack/mcp/root_test.go | 123 ++++++++++++ cmd/stack/root.go | 2 + pkg/authentication.go | 15 +- pkg/clients.go | 39 +++- 5 files changed, 571 insertions(+), 4 deletions(-) create mode 100644 cmd/stack/mcp/root.go create mode 100644 cmd/stack/mcp/root_test.go diff --git a/cmd/stack/mcp/root.go b/cmd/stack/mcp/root.go new file mode 100644 index 00000000..313de905 --- /dev/null +++ b/cmd/stack/mcp/root.go @@ -0,0 +1,396 @@ +package mcp + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "mime" + "net/http" + "net/url" + "os" + "strconv" + "strings" + + "github.com/spf13/cobra" + "golang.org/x/oauth2" + + fctl "github.com/formancehq/fctl/v3/pkg" +) + +const transportFlag = "transport" + +var mcpAPIScopes = []string{ + "openid", + "email", + "ledger:read", + "payments:read", + "reconciliation:read", +} + +func NewCommand() *cobra.Command { + return fctl.NewStackCommand("mcp", + fctl.WithShortDescription("Run stack MCP integrations"), + fctl.WithChildCommands(NewServeCommand()), + ) +} + +func NewServeCommand() *cobra.Command { + return fctl.NewStackCommand("serve", + fctl.WithShortDescription("Start a stack MCP server"), + fctl.WithStringFlag(transportFlag, "stdio", "MCP transport to use (stdio)"), + fctl.WithArgs(cobra.NoArgs), + fctl.WithRunE(runServe), + ) +} + +func runServe(cmd *cobra.Command, _ []string) error { + transport := fctl.GetString(cmd, transportFlag) + if transport != "stdio" { + return fmt.Errorf("unsupported MCP transport %q: only stdio is currently supported", transport) + } + + _, profile, profileName, relyingParty, err := fctl.LoadAndAuthenticateCurrentProfile(cmd) + if err != nil { + return err + } + + organizationID, stackID, err := fctl.ResolveStackID(cmd, *profile) + if err != nil { + return err + } + + stackToken, stackAccess, err := fctl.EnsureStackAccess(cmd, relyingParty, stderrDialog{w: cmd.ErrOrStderr()}, profileName, *profile, organizationID, stackID) + if err != nil { + return err + } + + mcpResource := stackResourceURL(stackAccess.URI, "/api/mcp") + tokenSource := fctl.NewStackTokenSourceWithAPIScopesAndResource( + *stackToken, + stackAccess, + relyingParty, + func(newToken fctl.AccessToken) error { + return fctl.WriteStackToken(cmd, profileName, stackID, newToken) + }, + cmd, + profileName, + organizationID, + stackID, + mcpAPIScopes, + mcpResource, + ) + httpClient := oauth2.NewClient(cmd.Context(), tokenSource) + + server := &stdioServer{ + in: os.Stdin, + out: os.Stdout, + err: cmd.ErrOrStderr(), + httpClient: httpClient, + stackURI: stackAccess.URI, + } + return server.Serve(cmd.Context()) +} + +func stackResourceURL(stackURI, path string) string { + base, err := url.Parse(stackURI) + if err != nil { + return strings.TrimRight(stackURI, "/") + path + } + return base.ResolveReference(&url.URL{Path: path}).String() +} + +type stderrDialog struct { + w io.Writer +} + +func (d stderrDialog) Info(msg string, args ...any) { + _, _ = fmt.Fprintf(d.w, msg+"\n", args...) +} + +type stdioServer struct { + in io.Reader + out io.Writer + err io.Writer + httpClient *http.Client + stackURI string + remote *remoteMCPClient +} + +type rpcMessage struct { + JSONRPC string `json:"jsonrpc,omitempty"` + ID json.RawMessage `json:"id,omitempty"` + Method string `json:"method,omitempty"` + Params json.RawMessage `json:"params,omitempty"` +} + +type rpcResponse struct { + JSONRPC string `json:"jsonrpc"` + ID json.RawMessage `json:"id"` + Result any `json:"result,omitempty"` + Error *rpcError `json:"error,omitempty"` +} + +type rpcError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +func (s *stdioServer) Serve(ctx context.Context) error { + s.remote = newRemoteMCPClient(s.httpClient, s.stackURI) + reader := bufio.NewReader(s.in) + for { + data, err := readMCPMessage(reader) + if err != nil { + if errors.Is(err, io.EOF) { + return nil + } + return err + } + if len(bytes.TrimSpace(data)) == 0 { + continue + } + + var msg rpcMessage + if err := json.Unmarshal(data, &msg); err != nil { + _ = s.writeResponse(rpcResponse{ + JSONRPC: "2.0", + ID: json.RawMessage("null"), + Error: &rpcError{Code: -32700, Message: "parse error"}, + }) + continue + } + + if len(msg.ID) == 0 { + s.handleNotification(msg) + continue + } + + result, rpcErr := s.handleRequest(ctx, msg) + resp := rpcResponse{ + JSONRPC: "2.0", + ID: msg.ID, + Result: result, + Error: rpcErr, + } + if err := s.writeResponse(resp); err != nil { + return err + } + } +} + +func (s *stdioServer) handleNotification(msg rpcMessage) { + switch msg.Method { + case "notifications/cancelled": + _, _ = fmt.Fprintf(s.err, "MCP request cancelled\n") + case "notifications/initialized": + if err := s.remote.Notify(context.Background(), msg); err != nil { + _, _ = fmt.Fprintf(s.err, "forwarding MCP notification %q failed: %v\n", msg.Method, err) + } + } +} + +func (s *stdioServer) handleRequest(ctx context.Context, msg rpcMessage) (any, *rpcError) { + switch msg.Method { + case "ping": + return map[string]any{}, nil + case "initialize", "tools/list", "tools/call": + result, err := s.remote.Request(ctx, msg) + if err != nil { + return nil, &rpcError{Code: -32000, Message: err.Error()} + } + return result, nil + default: + result, err := s.remote.Request(ctx, msg) + if err != nil { + return nil, &rpcError{Code: -32601, Message: "method not found"} + } + return result, nil + } +} + +type remoteMCPClient struct { + httpClient *http.Client + endpoint string + sessionID string + protocolVersion string +} + +func newRemoteMCPClient(httpClient *http.Client, stackURI string) *remoteMCPClient { + if httpClient == nil { + httpClient = http.DefaultClient + } + base, err := url.Parse(stackURI) + if err != nil { + return &remoteMCPClient{httpClient: httpClient, endpoint: stackURI, protocolVersion: "2024-11-05"} + } + endpoint := base.ResolveReference(&url.URL{Path: "/api/mcp"}).String() + return &remoteMCPClient{httpClient: httpClient, endpoint: endpoint, protocolVersion: "2024-11-05"} +} + +func (c *remoteMCPClient) Request(ctx context.Context, msg rpcMessage) (any, error) { + resp, err := c.send(ctx, msg) + if err != nil { + return nil, err + } + if resp.Error != nil { + return nil, fmt.Errorf("remote MCP error %d: %s", resp.Error.Code, resp.Error.Message) + } + return resp.Result, nil +} + +func (c *remoteMCPClient) Notify(ctx context.Context, msg rpcMessage) error { + _, err := c.send(ctx, msg) + return err +} + +func (c *remoteMCPClient) send(ctx context.Context, msg rpcMessage) (*rpcResponse, error) { + if msg.Method == "initialize" { + c.captureProtocolVersion(msg.Params) + } + data, err := json.Marshal(msg) + if err != nil { + return nil, err + } + req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.endpoint, bytes.NewReader(data)) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json, text/event-stream") + req.Header.Set("MCP-Protocol-Version", c.protocolVersion) + if c.sessionID != "" { + req.Header.Set("Mcp-Session-Id", c.sessionID) + } + + httpResp, err := c.httpClient.Do(req) + if err != nil { + return nil, err + } + defer httpResp.Body.Close() + + if sessionID := httpResp.Header.Get("Mcp-Session-Id"); sessionID != "" { + c.sessionID = sessionID + } + + if httpResp.StatusCode == http.StatusAccepted && len(msg.ID) == 0 { + return &rpcResponse{JSONRPC: "2.0"}, nil + } + + payload, err := io.ReadAll(httpResp.Body) + if err != nil { + return nil, err + } + if httpResp.StatusCode >= 300 { + return nil, fmt.Errorf("remote MCP HTTP %d: %s", httpResp.StatusCode, strings.TrimSpace(string(payload))) + } + if len(bytes.TrimSpace(payload)) == 0 { + return &rpcResponse{JSONRPC: "2.0"}, nil + } + + return decodeRemoteMCPResponse(httpResp.Header.Get("Content-Type"), payload) +} + +func (c *remoteMCPClient) captureProtocolVersion(params json.RawMessage) { + var initParams struct { + ProtocolVersion string `json:"protocolVersion"` + } + if err := json.Unmarshal(params, &initParams); err == nil && initParams.ProtocolVersion != "" { + c.protocolVersion = initParams.ProtocolVersion + } +} + +func decodeRemoteMCPResponse(contentType string, payload []byte) (*rpcResponse, error) { + mediaType, _, err := mime.ParseMediaType(contentType) + if err != nil { + mediaType = contentType + } + + if mediaType == "text/event-stream" { + return decodeSSEResponse(payload) + } + + var resp rpcResponse + if err := json.Unmarshal(payload, &resp); err != nil { + return nil, fmt.Errorf("decoding remote MCP response: %w", err) + } + return &resp, nil +} + +func decodeSSEResponse(payload []byte) (*rpcResponse, error) { + scanner := bufio.NewScanner(bytes.NewReader(payload)) + var dataLines []string + for scanner.Scan() { + line := scanner.Text() + if strings.HasPrefix(line, "data:") { + dataLines = append(dataLines, strings.TrimSpace(strings.TrimPrefix(line, "data:"))) + } + } + if err := scanner.Err(); err != nil { + return nil, err + } + if len(dataLines) == 0 { + return nil, fmt.Errorf("remote MCP SSE response did not contain data") + } + var resp rpcResponse + if err := json.Unmarshal([]byte(strings.Join(dataLines, "\n")), &resp); err != nil { + return nil, fmt.Errorf("decoding remote MCP SSE response: %w", err) + } + return &resp, nil +} + +func readMCPMessage(reader *bufio.Reader) ([]byte, error) { + for { + first, err := reader.Peek(1) + if err != nil { + return nil, err + } + if first[0] != '\n' && first[0] != '\r' && first[0] != ' ' && first[0] != '\t' { + break + } + _, _ = reader.ReadByte() + } + + headerOrJSON, err := reader.ReadString('\n') + if err != nil { + if errors.Is(err, io.EOF) && strings.TrimSpace(headerOrJSON) != "" { + return []byte(strings.TrimSpace(headerOrJSON)), nil + } + return nil, err + } + if strings.HasPrefix(strings.ToLower(headerOrJSON), "content-length:") { + _, lengthValue, _ := strings.Cut(headerOrJSON, ":") + lengthValue = strings.TrimSpace(lengthValue) + length, err := strconv.Atoi(lengthValue) + if err != nil { + return nil, fmt.Errorf("invalid Content-Length header: %w", err) + } + for { + line, err := reader.ReadString('\n') + if err != nil { + return nil, err + } + if strings.TrimSpace(line) == "" { + break + } + } + payload := make([]byte, length) + if _, err := io.ReadFull(reader, payload); err != nil { + return nil, err + } + return payload, nil + } + return []byte(strings.TrimSpace(headerOrJSON)), nil +} + +func (s *stdioServer) writeResponse(resp rpcResponse) error { + data, err := json.Marshal(resp) + if err != nil { + return err + } + _, err = s.out.Write(append(data, '\n')) + return err +} diff --git a/cmd/stack/mcp/root_test.go b/cmd/stack/mcp/root_test.go new file mode 100644 index 00000000..e30c0559 --- /dev/null +++ b/cmd/stack/mcp/root_test.go @@ -0,0 +1,123 @@ +package mcp + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "strings" + "sync/atomic" + "testing" +) + +func TestStdioServerForwardsInitialize(t *testing.T) { + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/api/mcp" { + t.Fatalf("path = %s, want /api/mcp", r.URL.Path) + } + var request rpcMessage + if err := json.NewDecoder(r.Body).Decode(&request); err != nil { + t.Fatalf("decoding remote request: %v", err) + } + if request.Method != "initialize" { + t.Fatalf("method = %s, want initialize", request.Method) + } + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"jsonrpc":"2.0","id":1,"result":{"protocolVersion":"2024-11-05","capabilities":{"tools":{}},"serverInfo":{"name":"remote"}}}`)) + })) + defer upstream.Close() + + input := `{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2024-11-05"}}` + "\n" + var output bytes.Buffer + server := &stdioServer{ + in: strings.NewReader(input), + out: &output, + err: &bytes.Buffer{}, + httpClient: upstream.Client(), + stackURI: upstream.URL, + } + + if err := server.Serve(context.Background()); err != nil { + t.Fatalf("Serve() error = %v", err) + } + + var response rpcResponse + if err := json.Unmarshal(bytes.TrimSpace(output.Bytes()), &response); err != nil { + t.Fatalf("invalid response: %v", err) + } + if string(response.ID) != "1" { + t.Fatalf("response id = %s, want 1", response.ID) + } + if response.Error != nil { + t.Fatalf("response error = %#v", response.Error) + } +} + +func TestReadMCPMessageContentLength(t *testing.T) { + payload := []byte(`{"jsonrpc":"2.0","id":"abc","method":"ping"}`) + input := fmt.Sprintf("Content-Length: %d\r\n\r\n%s", len(payload), payload) + + got, err := readMCPMessage(bufioReader(input)) + if err != nil { + t.Fatalf("readMCPMessage() error = %v", err) + } + if string(got) != string(payload) { + t.Fatalf("payload = %q, want %q", got, payload) + } +} + +func TestRemoteMCPClientKeepsSessionID(t *testing.T) { + var count atomic.Int32 + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + requestNumber := count.Add(1) + if requestNumber == 1 { + w.Header().Set("Mcp-Session-Id", "session-123") + } else if got := r.Header.Get("Mcp-Session-Id"); got != "session-123" { + t.Fatalf("Mcp-Session-Id = %q, want session-123", got) + } + if got := r.Header.Get("MCP-Protocol-Version"); got != "2025-03-26" { + t.Fatalf("MCP-Protocol-Version = %q, want 2025-03-26", got) + } + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"jsonrpc":"2.0","id":1,"result":{}}`)) + })) + defer upstream.Close() + + client := newRemoteMCPClient(upstream.Client(), upstream.URL) + _, err := client.Request(context.Background(), rpcMessage{ + JSONRPC: "2.0", + ID: json.RawMessage("1"), + Method: "initialize", + Params: json.RawMessage(`{"protocolVersion":"2025-03-26"}`), + }) + if err != nil { + t.Fatalf("initialize request error = %v", err) + } + + _, err = client.Request(context.Background(), rpcMessage{ + JSONRPC: "2.0", + ID: json.RawMessage("2"), + Method: "tools/list", + }) + if err != nil { + t.Fatalf("tools/list request error = %v", err) + } +} + +func TestDecodeSSEResponse(t *testing.T) { + resp, err := decodeRemoteMCPResponse("text/event-stream", []byte("event: message\ndata: {\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"ok\":true}}\n\n")) + if err != nil { + t.Fatalf("decodeRemoteMCPResponse() error = %v", err) + } + result, ok := resp.Result.(map[string]any) + if !ok || result["ok"] != true { + t.Fatalf("result = %#v, want ok=true", resp.Result) + } +} + +func bufioReader(s string) *bufio.Reader { + return bufio.NewReader(strings.NewReader(s)) +} diff --git a/cmd/stack/root.go b/cmd/stack/root.go index c16c8430..c7018e20 100644 --- a/cmd/stack/root.go +++ b/cmd/stack/root.go @@ -3,6 +3,7 @@ package stack import ( "github.com/spf13/cobra" + "github.com/formancehq/fctl/v3/cmd/stack/mcp" "github.com/formancehq/fctl/v3/cmd/stack/modules" "github.com/formancehq/fctl/v3/cmd/stack/users" fctl "github.com/formancehq/fctl/v3/pkg" @@ -24,6 +25,7 @@ func NewCommand() *cobra.Command { NewUpgradeCommand(), NewHistoryCommand(), NewProxyCommand(), + mcp.NewCommand(), users.NewCommand(), modules.NewCommand(), ), diff --git a/pkg/authentication.go b/pkg/authentication.go index 32deba71..88d92ffe 100644 --- a/pkg/authentication.go +++ b/pkg/authentication.go @@ -249,11 +249,24 @@ func Refresh(ctx context.Context, relyingParty client.RelyingParty, token Access } func FetchStackToken(ctx context.Context, httpClient *http.Client, stackURI, token string) (*oauth2.Token, error) { + return FetchStackTokenWithScopesAndResource(ctx, httpClient, stackURI, token, nil, "") +} + +func FetchStackTokenWithScopes(ctx context.Context, httpClient *http.Client, stackURI, token string, scopes []string) (*oauth2.Token, error) { + return FetchStackTokenWithScopesAndResource(ctx, httpClient, stackURI, token, scopes, "") +} +func FetchStackTokenWithScopesAndResource(ctx context.Context, httpClient *http.Client, stackURI, token string, scopes []string, resource string) (*oauth2.Token, error) { + if len(scopes) == 0 { + scopes = []string{oidc.ScopeOpenID, oidc.ScopeEmail} + } form := url.Values{ "grant_type": []string{"urn:ietf:params:oauth:grant-type:jwt-bearer"}, "assertion": []string{token}, - "scope": []string{strings.Join([]string{oidc.ScopeOpenID, oidc.ScopeEmail}, " ")}, + "scope": []string{strings.Join(scopes, " ")}, + } + if resource != "" { + form.Set("resource", resource) } stackDiscoveryConfiguration, err := client.Discover[oidc.DiscoveryConfiguration](ctx, stackURI+"/api/auth", httpClient) diff --git a/pkg/clients.go b/pkg/clients.go index 58983cfd..0b44f8d3 100644 --- a/pkg/clients.go +++ b/pkg/clients.go @@ -625,6 +625,8 @@ type stackTokenSource struct { stackAccess *StackAccess relyingParty client.RelyingParty onRefresh func(newToken AccessToken) error + apiScopes []string + apiResource string // Cache fields cmd *cobra.Command @@ -639,7 +641,7 @@ func (t *stackTokenSource) Token() (*oauth2.Token, error) { if t.accessToken == nil || t.accessToken.Expiry.Before(time.Now()) { // Try to load from disk cache - if t.cmd != nil { + if t.cmd != nil && len(t.apiScopes) == 0 { cached, err := ReadCachedStackAPIToken(t.cmd, t.profileName, t.organizationID, t.stackID) if err == nil && cached != nil && cached.Expiry.After(time.Now().Add(5*time.Second)) { t.accessToken = &oauth2.Token{ @@ -666,7 +668,7 @@ func (t *stackTokenSource) Token() (*oauth2.Token, error) { } } - token, err := FetchStackToken(context.Background(), t.relyingParty.HttpClient(), t.stackAccess.URI, t.stackToken.Token) + token, err := FetchStackTokenWithScopesAndResource(context.Background(), t.relyingParty.HttpClient(), t.stackAccess.URI, t.stackToken.Token, t.apiScopes, t.apiResource) if err != nil { return nil, err } @@ -674,7 +676,7 @@ func (t *stackTokenSource) Token() (*oauth2.Token, error) { t.accessToken = token // Write to disk cache (best-effort) - if t.cmd != nil { + if t.cmd != nil && len(t.apiScopes) == 0 { _ = WriteCachedStackAPIToken(t.cmd, t.profileName, t.organizationID, t.stackID, CachedStackAPIToken{ AccessToken: token.AccessToken, TokenType: token.TokenType, @@ -697,12 +699,43 @@ func NewStackTokenSource( profileName string, organizationID string, stackID string, +) oauth2.TokenSource { + return NewStackTokenSourceWithAPIScopes(stackToken, stackAccess, relyingParty, onRefresh, cmd, profileName, organizationID, stackID, nil) +} + +func NewStackTokenSourceWithAPIScopes( + stackToken AccessToken, + stackAccess *StackAccess, + relyingParty client.RelyingParty, + onRefresh func(newToken AccessToken) error, + cmd *cobra.Command, + profileName string, + organizationID string, + stackID string, + apiScopes []string, +) oauth2.TokenSource { + return NewStackTokenSourceWithAPIScopesAndResource(stackToken, stackAccess, relyingParty, onRefresh, cmd, profileName, organizationID, stackID, apiScopes, "") +} + +func NewStackTokenSourceWithAPIScopesAndResource( + stackToken AccessToken, + stackAccess *StackAccess, + relyingParty client.RelyingParty, + onRefresh func(newToken AccessToken) error, + cmd *cobra.Command, + profileName string, + organizationID string, + stackID string, + apiScopes []string, + apiResource string, ) oauth2.TokenSource { return &stackTokenSource{ stackToken: stackToken, stackAccess: stackAccess, relyingParty: relyingParty, onRefresh: onRefresh, + apiScopes: apiScopes, + apiResource: apiResource, cmd: cmd, profileName: profileName, organizationID: organizationID, From b0ea97a4374a038c9cf9adc6e838985f30b90c6e Mon Sep 17 00:00:00 2001 From: Maxence Maireaux Date: Tue, 26 May 2026 19:03:43 +0200 Subject: [PATCH 2/5] fix(stack): address mcp bridge review comments --- cmd/stack/mcp/root.go | 8 ++++- cmd/stack/mcp/root_test.go | 74 ++++++++++++++++++++++++++++++++++++++ pkg/clients.go | 4 +-- 3 files changed, 83 insertions(+), 3 deletions(-) diff --git a/cmd/stack/mcp/root.go b/cmd/stack/mcp/root.go index 313de905..70a52a58 100644 --- a/cmd/stack/mcp/root.go +++ b/cmd/stack/mcp/root.go @@ -186,6 +186,9 @@ func (s *stdioServer) handleNotification(msg rpcMessage) { switch msg.Method { case "notifications/cancelled": _, _ = fmt.Fprintf(s.err, "MCP request cancelled\n") + if err := s.remote.Notify(context.Background(), msg); err != nil { + _, _ = fmt.Fprintf(s.err, "forwarding MCP notification %q failed: %v\n", msg.Method, err) + } case "notifications/initialized": if err := s.remote.Notify(context.Background(), msg); err != nil { _, _ = fmt.Fprintf(s.err, "forwarding MCP notification %q failed: %v\n", msg.Method, err) @@ -206,7 +209,7 @@ func (s *stdioServer) handleRequest(ctx context.Context, msg rpcMessage) (any, * default: result, err := s.remote.Request(ctx, msg) if err != nil { - return nil, &rpcError{Code: -32601, Message: "method not found"} + return nil, &rpcError{Code: -32000, Message: err.Error()} } return result, nil } @@ -368,6 +371,9 @@ func readMCPMessage(reader *bufio.Reader) ([]byte, error) { if err != nil { return nil, fmt.Errorf("invalid Content-Length header: %w", err) } + if length < 0 { + return nil, fmt.Errorf("invalid Content-Length header: must be non-negative") + } for { line, err := reader.ReadString('\n') if err != nil { diff --git a/cmd/stack/mcp/root_test.go b/cmd/stack/mcp/root_test.go index e30c0559..3b30eee8 100644 --- a/cmd/stack/mcp/root_test.go +++ b/cmd/stack/mcp/root_test.go @@ -69,6 +69,80 @@ func TestReadMCPMessageContentLength(t *testing.T) { } } +func TestReadMCPMessageRejectsNegativeContentLength(t *testing.T) { + _, err := readMCPMessage(bufioReader("Content-Length: -1\r\n\r\n")) + if err == nil { + t.Fatalf("readMCPMessage() expected error") + } + if !strings.Contains(err.Error(), "must be non-negative") { + t.Fatalf("readMCPMessage() error = %v, want non-negative framing error", err) + } +} + +func TestStdioServerForwardsCancelledNotification(t *testing.T) { + var gotMethod atomic.Value + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var request rpcMessage + if err := json.NewDecoder(r.Body).Decode(&request); err != nil { + t.Fatalf("decoding remote request: %v", err) + } + gotMethod.Store(request.Method) + w.WriteHeader(http.StatusAccepted) + })) + defer upstream.Close() + + input := `{"jsonrpc":"2.0","method":"notifications/cancelled","params":{"requestId":1}}` + "\n" + server := &stdioServer{ + in: strings.NewReader(input), + out: &bytes.Buffer{}, + err: &bytes.Buffer{}, + httpClient: upstream.Client(), + stackURI: upstream.URL, + } + + if err := server.Serve(context.Background()); err != nil { + t.Fatalf("Serve() error = %v", err) + } + if got := gotMethod.Load(); got != "notifications/cancelled" { + t.Fatalf("forwarded method = %v, want notifications/cancelled", got) + } +} + +func TestStdioServerPreservesForwardedMethodErrors(t *testing.T) { + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "invalid_token", http.StatusUnauthorized) + })) + defer upstream.Close() + + input := `{"jsonrpc":"2.0","id":1,"method":"resources/list"}` + "\n" + var output bytes.Buffer + server := &stdioServer{ + in: strings.NewReader(input), + out: &output, + err: &bytes.Buffer{}, + httpClient: upstream.Client(), + stackURI: upstream.URL, + } + + if err := server.Serve(context.Background()); err != nil { + t.Fatalf("Serve() error = %v", err) + } + + var response rpcResponse + if err := json.Unmarshal(bytes.TrimSpace(output.Bytes()), &response); err != nil { + t.Fatalf("invalid response: %v", err) + } + if response.Error == nil { + t.Fatalf("response error is nil") + } + if response.Error.Code != -32000 { + t.Fatalf("response error code = %d, want -32000", response.Error.Code) + } + if !strings.Contains(response.Error.Message, "remote MCP HTTP 401") { + t.Fatalf("response error message = %q, want remote HTTP error", response.Error.Message) + } +} + func TestRemoteMCPClientKeepsSessionID(t *testing.T) { var count atomic.Int32 upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/pkg/clients.go b/pkg/clients.go index 0b44f8d3..1ac81af3 100644 --- a/pkg/clients.go +++ b/pkg/clients.go @@ -641,7 +641,7 @@ func (t *stackTokenSource) Token() (*oauth2.Token, error) { if t.accessToken == nil || t.accessToken.Expiry.Before(time.Now()) { // Try to load from disk cache - if t.cmd != nil && len(t.apiScopes) == 0 { + if t.cmd != nil && len(t.apiScopes) == 0 && t.apiResource == "" { cached, err := ReadCachedStackAPIToken(t.cmd, t.profileName, t.organizationID, t.stackID) if err == nil && cached != nil && cached.Expiry.After(time.Now().Add(5*time.Second)) { t.accessToken = &oauth2.Token{ @@ -676,7 +676,7 @@ func (t *stackTokenSource) Token() (*oauth2.Token, error) { t.accessToken = token // Write to disk cache (best-effort) - if t.cmd != nil && len(t.apiScopes) == 0 { + if t.cmd != nil && len(t.apiScopes) == 0 && t.apiResource == "" { _ = WriteCachedStackAPIToken(t.cmd, t.profileName, t.organizationID, t.stackID, CachedStackAPIToken{ AccessToken: token.AccessToken, TokenType: token.TokenType, From a5f38f13780683b1aa56afeb9b9cee808250a8b9 Mon Sep 17 00:00:00 2001 From: Maxence Maireaux Date: Thu, 28 May 2026 15:00:04 +0200 Subject: [PATCH 3/5] fix(stack): address MCP review comments --- cmd/stack/mcp/root.go | 78 ++++++++++++++++++++++++++++---------- cmd/stack/mcp/root_test.go | 48 +++++++++++++++++++++++ 2 files changed, 107 insertions(+), 19 deletions(-) diff --git a/cmd/stack/mcp/root.go b/cmd/stack/mcp/root.go index 70a52a58..4ea5dd7c 100644 --- a/cmd/stack/mcp/root.go +++ b/cmd/stack/mcp/root.go @@ -21,7 +21,10 @@ import ( fctl "github.com/formancehq/fctl/v3/pkg" ) -const transportFlag = "transport" +const ( + transportFlag = "transport" + maxMCPMessageSize = 10 * 1024 * 1024 +) var mcpAPIScopes = []string{ "openid", @@ -142,8 +145,35 @@ type rpcError struct { func (s *stdioServer) Serve(ctx context.Context) error { s.remote = newRemoteMCPClient(s.httpClient, s.stackURI) reader := bufio.NewReader(s.in) + + type readResult struct { + data []byte + err error + } + reads := make(chan readResult, 1) + go func() { + for { + data, err := readMCPMessage(reader) + select { + case reads <- readResult{data: data, err: err}: + case <-ctx.Done(): + return + } + if err != nil { + return + } + } + }() + for { - data, err := readMCPMessage(reader) + var read readResult + select { + case <-ctx.Done(): + return ctx.Err() + case read = <-reads: + } + + data, err := read.data, read.err if err != nil { if errors.Is(err, io.EOF) { return nil @@ -197,22 +227,14 @@ func (s *stdioServer) handleNotification(msg rpcMessage) { } func (s *stdioServer) handleRequest(ctx context.Context, msg rpcMessage) (any, *rpcError) { - switch msg.Method { - case "ping": + if msg.Method == "ping" { return map[string]any{}, nil - case "initialize", "tools/list", "tools/call": - result, err := s.remote.Request(ctx, msg) - if err != nil { - return nil, &rpcError{Code: -32000, Message: err.Error()} - } - return result, nil - default: - result, err := s.remote.Request(ctx, msg) - if err != nil { - return nil, &rpcError{Code: -32000, Message: err.Error()} - } - return result, nil } + result, err := s.remote.Request(ctx, msg) + if err != nil { + return nil, &rpcError{Code: -32000, Message: err.Error()} + } + return result, nil } type remoteMCPClient struct { @@ -325,9 +347,17 @@ func decodeRemoteMCPResponse(contentType string, payload []byte) (*rpcResponse, func decodeSSEResponse(payload []byte) (*rpcResponse, error) { scanner := bufio.NewScanner(bytes.NewReader(payload)) + var events [][]string var dataLines []string for scanner.Scan() { - line := scanner.Text() + line := strings.TrimRight(scanner.Text(), "\r") + if line == "" { + if len(dataLines) > 0 { + events = append(events, dataLines) + dataLines = nil + } + continue + } if strings.HasPrefix(line, "data:") { dataLines = append(dataLines, strings.TrimSpace(strings.TrimPrefix(line, "data:"))) } @@ -335,11 +365,18 @@ func decodeSSEResponse(payload []byte) (*rpcResponse, error) { if err := scanner.Err(); err != nil { return nil, err } - if len(dataLines) == 0 { + if len(dataLines) > 0 { + events = append(events, dataLines) + } + if len(events) == 0 { return nil, fmt.Errorf("remote MCP SSE response did not contain data") } + if len(events) > 1 { + // The stdio bridge writes one JSON-RPC response per request; streaming SSE is not supported yet. + return nil, fmt.Errorf("remote MCP SSE response contained multiple events") + } var resp rpcResponse - if err := json.Unmarshal([]byte(strings.Join(dataLines, "\n")), &resp); err != nil { + if err := json.Unmarshal([]byte(strings.Join(events[0], "\n")), &resp); err != nil { return nil, fmt.Errorf("decoding remote MCP SSE response: %w", err) } return &resp, nil @@ -374,6 +411,9 @@ func readMCPMessage(reader *bufio.Reader) ([]byte, error) { if length < 0 { return nil, fmt.Errorf("invalid Content-Length header: must be non-negative") } + if length > maxMCPMessageSize { + return nil, fmt.Errorf("invalid Content-Length header: exceeds maximum size %d", maxMCPMessageSize) + } for { line, err := reader.ReadString('\n') if err != nil { diff --git a/cmd/stack/mcp/root_test.go b/cmd/stack/mcp/root_test.go index 3b30eee8..9793f830 100644 --- a/cmd/stack/mcp/root_test.go +++ b/cmd/stack/mcp/root_test.go @@ -5,7 +5,9 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" + "io" "net/http" "net/http/httptest" "strings" @@ -79,6 +81,39 @@ func TestReadMCPMessageRejectsNegativeContentLength(t *testing.T) { } } +func TestReadMCPMessageRejectsOversizedContentLength(t *testing.T) { + input := fmt.Sprintf("Content-Length: %d\r\n\r\n", maxMCPMessageSize+1) + + _, err := readMCPMessage(bufioReader(input)) + if err == nil { + t.Fatalf("readMCPMessage() expected error") + } + if !strings.Contains(err.Error(), "exceeds maximum size") { + t.Fatalf("readMCPMessage() error = %v, want maximum size framing error", err) + } +} + +func TestStdioServerStopsWhenContextIsCancelled(t *testing.T) { + reader, writer := io.Pipe() + defer reader.Close() + defer writer.Close() + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + server := &stdioServer{ + in: reader, + out: &bytes.Buffer{}, + err: &bytes.Buffer{}, + httpClient: http.DefaultClient, + stackURI: "http://127.0.0.1", + } + + if err := server.Serve(ctx); !errors.Is(err, context.Canceled) { + t.Fatalf("Serve() error = %v, want context.Canceled", err) + } +} + func TestStdioServerForwardsCancelledNotification(t *testing.T) { var gotMethod atomic.Value upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -192,6 +227,19 @@ func TestDecodeSSEResponse(t *testing.T) { } } +func TestDecodeSSEResponseRejectsMultipleEvents(t *testing.T) { + payload := []byte("event: message\ndata: {\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"first\":true}}\n\n" + + "event: message\ndata: {\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"second\":true}}\n\n") + + _, err := decodeRemoteMCPResponse("text/event-stream", payload) + if err == nil { + t.Fatalf("decodeRemoteMCPResponse() expected error") + } + if !strings.Contains(err.Error(), "multiple events") { + t.Fatalf("decodeRemoteMCPResponse() error = %v, want multiple events error", err) + } +} + func bufioReader(s string) *bufio.Reader { return bufio.NewReader(strings.NewReader(s)) } From 6f9f4fb635bb3cb96a58ca5e4408209740447710 Mon Sep 17 00:00:00 2001 From: Maxence Maireaux Date: Thu, 28 May 2026 15:10:57 +0200 Subject: [PATCH 4/5] refactor(stack): use standard token source for MCP --- cmd/stack/mcp/root.go | 21 +-------------------- pkg/authentication.go | 16 +--------------- pkg/clients.go | 39 +++------------------------------------ 3 files changed, 5 insertions(+), 71 deletions(-) diff --git a/cmd/stack/mcp/root.go b/cmd/stack/mcp/root.go index 4ea5dd7c..d4ea5fd2 100644 --- a/cmd/stack/mcp/root.go +++ b/cmd/stack/mcp/root.go @@ -26,14 +26,6 @@ const ( maxMCPMessageSize = 10 * 1024 * 1024 ) -var mcpAPIScopes = []string{ - "openid", - "email", - "ledger:read", - "payments:read", - "reconciliation:read", -} - func NewCommand() *cobra.Command { return fctl.NewStackCommand("mcp", fctl.WithShortDescription("Run stack MCP integrations"), @@ -71,8 +63,7 @@ func runServe(cmd *cobra.Command, _ []string) error { return err } - mcpResource := stackResourceURL(stackAccess.URI, "/api/mcp") - tokenSource := fctl.NewStackTokenSourceWithAPIScopesAndResource( + tokenSource := fctl.NewStackTokenSource( *stackToken, stackAccess, relyingParty, @@ -83,8 +74,6 @@ func runServe(cmd *cobra.Command, _ []string) error { profileName, organizationID, stackID, - mcpAPIScopes, - mcpResource, ) httpClient := oauth2.NewClient(cmd.Context(), tokenSource) @@ -98,14 +87,6 @@ func runServe(cmd *cobra.Command, _ []string) error { return server.Serve(cmd.Context()) } -func stackResourceURL(stackURI, path string) string { - base, err := url.Parse(stackURI) - if err != nil { - return strings.TrimRight(stackURI, "/") + path - } - return base.ResolveReference(&url.URL{Path: path}).String() -} - type stderrDialog struct { w io.Writer } diff --git a/pkg/authentication.go b/pkg/authentication.go index 88d92ffe..a9932582 100644 --- a/pkg/authentication.go +++ b/pkg/authentication.go @@ -249,24 +249,10 @@ func Refresh(ctx context.Context, relyingParty client.RelyingParty, token Access } func FetchStackToken(ctx context.Context, httpClient *http.Client, stackURI, token string) (*oauth2.Token, error) { - return FetchStackTokenWithScopesAndResource(ctx, httpClient, stackURI, token, nil, "") -} - -func FetchStackTokenWithScopes(ctx context.Context, httpClient *http.Client, stackURI, token string, scopes []string) (*oauth2.Token, error) { - return FetchStackTokenWithScopesAndResource(ctx, httpClient, stackURI, token, scopes, "") -} - -func FetchStackTokenWithScopesAndResource(ctx context.Context, httpClient *http.Client, stackURI, token string, scopes []string, resource string) (*oauth2.Token, error) { - if len(scopes) == 0 { - scopes = []string{oidc.ScopeOpenID, oidc.ScopeEmail} - } form := url.Values{ "grant_type": []string{"urn:ietf:params:oauth:grant-type:jwt-bearer"}, "assertion": []string{token}, - "scope": []string{strings.Join(scopes, " ")}, - } - if resource != "" { - form.Set("resource", resource) + "scope": []string{strings.Join([]string{oidc.ScopeOpenID, oidc.ScopeEmail}, " ")}, } stackDiscoveryConfiguration, err := client.Discover[oidc.DiscoveryConfiguration](ctx, stackURI+"/api/auth", httpClient) diff --git a/pkg/clients.go b/pkg/clients.go index 1ac81af3..58983cfd 100644 --- a/pkg/clients.go +++ b/pkg/clients.go @@ -625,8 +625,6 @@ type stackTokenSource struct { stackAccess *StackAccess relyingParty client.RelyingParty onRefresh func(newToken AccessToken) error - apiScopes []string - apiResource string // Cache fields cmd *cobra.Command @@ -641,7 +639,7 @@ func (t *stackTokenSource) Token() (*oauth2.Token, error) { if t.accessToken == nil || t.accessToken.Expiry.Before(time.Now()) { // Try to load from disk cache - if t.cmd != nil && len(t.apiScopes) == 0 && t.apiResource == "" { + if t.cmd != nil { cached, err := ReadCachedStackAPIToken(t.cmd, t.profileName, t.organizationID, t.stackID) if err == nil && cached != nil && cached.Expiry.After(time.Now().Add(5*time.Second)) { t.accessToken = &oauth2.Token{ @@ -668,7 +666,7 @@ func (t *stackTokenSource) Token() (*oauth2.Token, error) { } } - token, err := FetchStackTokenWithScopesAndResource(context.Background(), t.relyingParty.HttpClient(), t.stackAccess.URI, t.stackToken.Token, t.apiScopes, t.apiResource) + token, err := FetchStackToken(context.Background(), t.relyingParty.HttpClient(), t.stackAccess.URI, t.stackToken.Token) if err != nil { return nil, err } @@ -676,7 +674,7 @@ func (t *stackTokenSource) Token() (*oauth2.Token, error) { t.accessToken = token // Write to disk cache (best-effort) - if t.cmd != nil && len(t.apiScopes) == 0 && t.apiResource == "" { + if t.cmd != nil { _ = WriteCachedStackAPIToken(t.cmd, t.profileName, t.organizationID, t.stackID, CachedStackAPIToken{ AccessToken: token.AccessToken, TokenType: token.TokenType, @@ -699,43 +697,12 @@ func NewStackTokenSource( profileName string, organizationID string, stackID string, -) oauth2.TokenSource { - return NewStackTokenSourceWithAPIScopes(stackToken, stackAccess, relyingParty, onRefresh, cmd, profileName, organizationID, stackID, nil) -} - -func NewStackTokenSourceWithAPIScopes( - stackToken AccessToken, - stackAccess *StackAccess, - relyingParty client.RelyingParty, - onRefresh func(newToken AccessToken) error, - cmd *cobra.Command, - profileName string, - organizationID string, - stackID string, - apiScopes []string, -) oauth2.TokenSource { - return NewStackTokenSourceWithAPIScopesAndResource(stackToken, stackAccess, relyingParty, onRefresh, cmd, profileName, organizationID, stackID, apiScopes, "") -} - -func NewStackTokenSourceWithAPIScopesAndResource( - stackToken AccessToken, - stackAccess *StackAccess, - relyingParty client.RelyingParty, - onRefresh func(newToken AccessToken) error, - cmd *cobra.Command, - profileName string, - organizationID string, - stackID string, - apiScopes []string, - apiResource string, ) oauth2.TokenSource { return &stackTokenSource{ stackToken: stackToken, stackAccess: stackAccess, relyingParty: relyingParty, onRefresh: onRefresh, - apiScopes: apiScopes, - apiResource: apiResource, cmd: cmd, profileName: profileName, organizationID: organizationID, From ca7e5052b73a090f8ad915b87515a87dcf493444 Mon Sep 17 00:00:00 2001 From: Maxence Maireaux Date: Thu, 28 May 2026 19:08:05 +0200 Subject: [PATCH 5/5] test(stack): satisfy errcheck in MCP tests --- cmd/stack/mcp/root_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/stack/mcp/root_test.go b/cmd/stack/mcp/root_test.go index 9793f830..a4c19f0d 100644 --- a/cmd/stack/mcp/root_test.go +++ b/cmd/stack/mcp/root_test.go @@ -95,8 +95,8 @@ func TestReadMCPMessageRejectsOversizedContentLength(t *testing.T) { func TestStdioServerStopsWhenContextIsCancelled(t *testing.T) { reader, writer := io.Pipe() - defer reader.Close() - defer writer.Close() + defer func() { _ = reader.Close() }() + defer func() { _ = writer.Close() }() ctx, cancel := context.WithCancel(context.Background()) cancel()