Skip to content
Merged
10 changes: 5 additions & 5 deletions forge/comms/aclManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,11 @@ module.exports = function (app) {
}
const [commandAgent, commandName] = commandParts
switch (commandAgent) {
// case 'forge':
// if (['mcp-get-features', 'mcp-call-tool'].indexOf(commandName) === -1) {
// throw ValidationError('invalid platform command for platform api')
// }
// break
case 'automation':
if (['mcp-get-features', 'mcp-call-tool'].indexOf(commandName) === -1) {
throw ValidationError('invalid platform command for platform api')
}
break
case 'insights':
if (['mcp-call-tool', 'mcp-read-resource'].includes(commandName) === false) {
throw ValidationError('invalid platform command for insights')
Expand Down
92 changes: 65 additions & 27 deletions forge/comms/commsClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,33 @@ class CommsClient extends EventEmitter {
const channelCommand = topicParts[6] // dynamic value, e.g. mcp:call-tool or mcp:read-resource
const direction = topicParts[7] // request or response (only for inflight channels)

// these are common for both insights and platform automations
let payload
try {
payload = JSON.parse(message.toString())
} catch (err) {
this.app.log.warn(`Ignoring malformed expert payload on ${topic}: ${err.message}`)
return
}
const correlationData = packet.properties?.correlationData
const userProperties = packet.properties?.userProperties
const mqttOptions = { properties: { correlationData, userProperties } }

if (!correlationData || !userProperties) {
this.app.log.warn(`'Tool call request missing correlationData or userProperties: ${message.toString()}`)
return // do not respond, the agent will timeout and handle it
}
// end common bits

const supportedInsightsCommands = {
'insights:mcp-call-tool': 'mcp:call-tool',
'insights:mcp-read-resource': 'mcp:read-resource'
}
const supportedPlatformAutomationCommands = {
'automation:mcp-get-features': 'mcp-get-features',
'automation:mcp-call-tool': 'mcp-call-tool'
}

if (supportedInsightsCommands[channelCommand] && direction === 'request') {
const isInsightsToolCall = channel === 'platform' && channelCommand === 'insights:mcp-call-tool' && direction === 'request'
const isInsightsResourceCall = channel === 'platform' && channelCommand === 'insights:mcp-read-resource' && direction === 'request'
Expand All @@ -74,37 +97,11 @@ class CommsClient extends EventEmitter {
// has permission to access this topic. Now, we verify the payload contains the required fields to process the request.
// If OK, emit the message to the appropriate instance/device handler (handled in ./instances.js or ./devices.js)

const payload = JSON.parse(message.toString())
const command = supportedInsightsCommands[channelCommand]
const data = payload.data || {}
const { kind, mcpServer, toolDefinition, resourceDefinition, resourceTemplateDefinition } = payload.meta || {}
const correlationData = packet.properties?.correlationData
const userProperties = packet.properties?.userProperties
if (!correlationData || !userProperties) {
console.warn('Expert Insight tool call request missing correlationData or userProperties', payload)
return // do not respond, the agent will timeout and handle it
}

const mqttOptions = { properties: { correlationData, userProperties } }
const responseTopic = `ff/v1/expert/${userId}/${sessionId}/${channel}/${channelCommand}/response`

/** Callback for failed MCP request. Publishes a structured error back to the agent. */
const onError = (content, code, error) => {
const data = {
code: code || error?.code || 'MCP_ERROR',
content: `Error: ${content}`,
isError: true
}
if (error) {
data.type = error?.name || error?.constructor?.name || 'Error'
data.message = error?.message || error?.toString()
}
this.client.publish(responseTopic, JSON.stringify(data), mqttOptions)
}
/** Callback for successful MCP request. Publishes the result back to the agent. */
const onSuccess = (result) => {
this.client.publish(responseTopic, JSON.stringify(result), mqttOptions)
}
const { onSuccess, onError } = this.createMqttCallbacks(responseTopic, mqttOptions)

// check that the mcpServer contains the required fields to process the request
if (!mcpServer || !['instance', 'device'].includes(mcpServer.instanceType) || !mcpServer.instance || !mcpServer.mcpServer) {
Expand Down Expand Up @@ -141,6 +138,24 @@ class CommsClient extends EventEmitter {
onSuccess, // success callback
onError // failure callback
)
} else if (supportedPlatformAutomationCommands[channelCommand] && direction === 'request') {
// channel command is either 'mcp-get-features' or 'mcp-call-tool'
const responseTopic = `ff/v1/expert/${userId}/${sessionId}/platform/${channelCommand}/response`
const command = supportedPlatformAutomationCommands[channelCommand]
const data = payload.data || {}
const { onSuccess, onError } = this.createMqttCallbacks(responseTopic, mqttOptions)

this.emit(
'request/platform-automation:forge', // event name
{
userId, // ID of user making the request
command, // command,
data, // payload data
meta: payload.meta
},
onSuccess, // success callback
onError // failure callback
)
}
} else if (ownerType === 'p') {
this.emit('status/project', {
Expand Down Expand Up @@ -223,6 +238,29 @@ class CommsClient extends EventEmitter {
}
}

/**
* Creates onSuccess/onError callbacks that publish results back to the agent
* over the given MQTT response topic.
*/
createMqttCallbacks (responseTopic, mqttOptions) {
const onError = (content, code, error) => {
const data = {
code: code || error?.code || 'MCP_ERROR',
content: `Error: ${content}`,
isError: true
}
if (error) {
data.type = error?.name || error?.constructor?.name || 'Error'
data.message = error?.message || error?.toString()
}
this.client.publish(responseTopic, JSON.stringify(data), mqttOptions)
}
const onSuccess = (result) => {
this.client.publish(responseTopic, JSON.stringify(result), mqttOptions)
}
return { onSuccess, onError }
}

/**
* Publish to a topic
* @param {string} topic Topic to publish to
Expand Down
3 changes: 3 additions & 0 deletions forge/comms/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const ACLManager = require('./aclManager')
const { CommsClient } = require('./commsClient')
const { DeviceCommsHandler } = require('./devices')
const { InstanceCommsHandler } = require('./instances')
const { PlatformAutomationHandler } = require('./platformAutomation.js')

/**
* This module represents the real-time comms component of the platform.
Expand Down Expand Up @@ -32,6 +33,7 @@ module.exports = fp(async function (app, _opts) {
// Create the handler for any device-related messages
const deviceCommsHandler = DeviceCommsHandler(app, client)
const instanceCommsHandler = InstanceCommsHandler(app, client)
const platformAutomationHandler = PlatformAutomationHandler(app, client)

// Not in the current release, but when we handle Launcher status
// via MQTT, it will arrive here. Compare to the status/device handler in `devices.js`
Expand All @@ -44,6 +46,7 @@ module.exports = fp(async function (app, _opts) {
devices: deviceCommsHandler,
instances: instanceCommsHandler,
aclManager: ACLManager(app),
platformAutomation: platformAutomationHandler,
platform: {
settings: {
sync: function (key) {
Expand Down
133 changes: 133 additions & 0 deletions forge/comms/platformAutomation.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// /**
// * This module provides the handler for platform automation events
// */

const { default: z } = require('zod')

/**
* PlatformAutomationHandler
* @class PlatformAutomationHandler
* @memberof forge.comms
*/
class PlatformAutomationHandler {
/**
* @param {import('../forge').ForgeApplication} app Fastify app
* @param {import('./commsClient').CommsClient} client Comms Client
*/
constructor (app, client) {
this.app = app
this.client = client

/** Tool definitions without the handler functions - for sending across the wire to the agent for tool discovery */
this._wireToolDefinitions = null
this._fullToolDefinitions = null

this.setupEventHandler()
}

/**
* Lazily loads and caches the full tool definitions (with handlers)
* from the EE MCP module.
*/
loadTools () {
if (!this._fullToolDefinitions) {
const { loadToolDefinitions } = require('../ee/lib/mcp/toolLoader')
this._fullToolDefinitions = loadToolDefinitions()
this._wireToolDefinitions = this._fullToolDefinitions.map(({ name, description, inputSchema, annotations }) => ({
name,
description,
inputSchema: inputSchema && z.toJSONSchema(z.object(inputSchema)),
annotations
}))
}
}

/**
* Returns wire-safe tool definitions (no handler functions).
*/
getToolDefinitions () {
this.loadTools()
return this._wireToolDefinitions
}

/**
* Finds a tool definition by name (including its handler).
*/
findTool (toolName) {
this.loadTools()
return this._fullToolDefinitions.find(t => t.name === toolName)
}

setupEventHandler () {
this.client.on('request/platform-automation:forge', this.eventHandler)
}

eventHandler = async ({ userId, command, data, meta } = {}, onSuccess, onError) => {
try {
let result = {}

switch (command) {
case 'mcp-get-features':
result = { tools: this.getToolDefinitions() }
break
case 'mcp-call-tool': {
const toolName = data?.name
const args = data?.input || {}

// TODO: Probably sensible to verify that toolDefinition matches the tool to ensure no tampering has occurred
const { toolDefinition } = meta || {}

const { annotations } = toolDefinition
const tool = this.findTool(toolName)

// Verify tool annotations haven't been tampered with
if (JSON.stringify({ annotations }) !== JSON.stringify({ annotations: tool.annotations })) {
return onError(
'Tool definition mismatch',
'MCP_PLATFORM_TOOL_TAMPERED'
)
}

if (!tool) {
return onError(
`Unknown platform tool: ${toolName}`,
'MCP_PLATFORM_TOOL_NOT_FOUND'
)
}

const user = await this.app.db.models.User.byId(userId)
if (user) {
const { token } = await this.app.expert.mcp.getOrCreatePlatformToken(user)
const inject = (opts) => this.app.inject({
...opts,
headers: {
...opts.headers,
authorization: `Bearer ${token}`,
'x-ff-automation-source': 'expert'
}
})

const { formatResponse } = require('../ee/lib/mcp/toolLoader')
const response = await tool.handler(args, { inject })
result = formatResponse(response)
}
break
}
default:
// unrecognized command
}

onSuccess(result)
} catch (err) {
return onError(
`An error occurred performing a platform automation request: ${err.message}`,
'MCP_PLATFORM_AUTOMATION_REQUEST_ERROR',
err
)
}
}
}

module.exports = {
PlatformAutomationHandler: (app, client) => new PlatformAutomationHandler(app, client)
}
4 changes: 2 additions & 2 deletions forge/db/controllers/AccessToken.js
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ module.exports = {
/**
* Create an AccessToken for the editor.
*/
createTokenForUser: async function (app, user, expiresAt, scope = [], includeRefresh) {
createTokenForUser: async function (app, user, expiresAt, scope = [], includeRefresh, ownerType = 'user') {
const userId = typeof user === 'number' ? user : user.id
const token = generateToken(32, 'ffu')
const refreshToken = includeRefresh ? generateToken(32, 'ffu') : null
Expand All @@ -162,7 +162,7 @@ module.exports = {
expiresAt,
scope,
ownerId: '' + userId,
ownerType: 'user'
ownerType
})
return { token, expiresAt, refreshToken }
},
Expand Down
41 changes: 39 additions & 2 deletions forge/ee/lib/expert/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@ const fp = require('fastify-plugin')

const TOKEN_CACHE_NAME = 'ExpertMCPAccessTokenCache'

const EXPERT_MCP_SCOPE = 'ff-expert:mcp'
const EXPERT_MCP_PLATFORM_SCOPE = 'ff-expert:platform'
// Dedicated owner type so platform-automation tokens are not treated as general user tokens
const EXPERT_MCP_PLATFORM_OWNER_TYPE = 'user:expert-mcp'

const EXPERT_MCP_SCOPES = [
EXPERT_MCP_SCOPE,
EXPERT_MCP_PLATFORM_SCOPE
]

module.exports = fp(async function (app, _opts) {
// Get the assistant service configuration
const serviceEnabled = app.config.expert?.enabled === true
Expand Down Expand Up @@ -60,7 +70,7 @@ module.exports = fp(async function (app, _opts) {
httpNodeAuth = deviceSettings?.httpNodeAuth
}
const tokenName = 'FlowFuse Expert MCP Access Token'
const scope = ['ff-expert:mcp', instanceType]
const scope = [EXPERT_MCP_SCOPE, instanceType]
if (httpNodeAuth?.type === 'flowforge-user' && teamHttpSecurityFeatureEnabled) {
// FlowFuse auth is enabled for this instance
const expiresAt = new Date(Date.now() + (TOKEN_TTL))
Expand Down Expand Up @@ -97,6 +107,30 @@ module.exports = fp(async function (app, _opts) {
return readCachedMcpAccessToken(instanceId)
}

async function getOrCreateMcpPlatformToken (user) {
const cacheKey = `platform:${user.hashid}`
const cached = await readCachedMcpAccessToken(cacheKey)
if (cached) {
return cached
}

const expiresAt = new Date(Date.now() + TOKEN_TTL)
const { token } = await app.db.controllers.AccessToken.createTokenForUser(
user,
expiresAt,
[EXPERT_MCP_PLATFORM_SCOPE],
undefined,
EXPERT_MCP_PLATFORM_OWNER_TYPE
)

const entry = { token }
await tokenCache().set(cacheKey, {
value: entry,
expiresAt: Date.now() + TOKEN_TTL
})
return entry
}

app.decorate('expert', {
serviceEnabled,
expertUrl,
Expand All @@ -105,7 +139,10 @@ module.exports = fp(async function (app, _opts) {
mcp: {
clearTokenCache: clearMcpAccessTokenCache,
getCachedToken: getCachedMcpAccessToken,
getOrCreateToken: getOrCreateMcpAccessToken
getOrCreateToken: getOrCreateMcpAccessToken,
getOrCreatePlatformToken: getOrCreateMcpPlatformToken
}
})
}, { name: 'app.expert' })

module.exports.EXPERT_MCP_SCOPES = EXPERT_MCP_SCOPES
Loading
Loading