Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions forge/comms/aclManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,11 @@ module.exports = function (app) {
}
const [commandAgent, commandName] = commandParts
switch (commandAgent) {
// case 'forge':
// if (['mcp-get-features', 'mcp-call-tool'].indexOf(commandName) === -1) {
// throw ValidationError('invalid platform command for platform api')
// }
// break
case 'automation':
if (['mcp-get-features', 'mcp-call-tool'].indexOf(commandName) === -1) {
throw ValidationError('invalid platform command for platform api')
}
break
case 'insights':
if (['mcp-call-tool', 'mcp-read-resource'].includes(commandName) === false) {
throw ValidationError('invalid platform command for insights')
Expand Down
92 changes: 65 additions & 27 deletions forge/comms/commsClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,33 @@ class CommsClient extends EventEmitter {
const channelCommand = topicParts[6] // dynamic value, e.g. mcp:call-tool or mcp:read-resource
const direction = topicParts[7] // request or response (only for inflight channels)

// these are common for both insights and platform automations
let payload
try {
payload = JSON.parse(message.toString())
} catch (err) {
this.app.log.warn(`Ignoring malformed expert payload on ${topic}: ${err.message}`)
return
}
const correlationData = packet.properties?.correlationData
const userProperties = packet.properties?.userProperties
const mqttOptions = { properties: { correlationData, userProperties } }

if (!correlationData || !userProperties) {
this.app.log.warn(`'Tool call request missing correlationData or userProperties: ${message.toString()}`)
return // do not respond, the agent will timeout and handle it
}
// end common bits

const supportedInsightsCommands = {
'insights:mcp-call-tool': 'mcp:call-tool',
'insights:mcp-read-resource': 'mcp:read-resource'
}
const supportedPlatformAutomationCommands = {
'automation:mcp-get-features': 'mcp-get-features',
'automation:mcp-call-tool': 'mcp-call-tool'
}

if (supportedInsightsCommands[channelCommand] && direction === 'request') {
const isInsightsToolCall = channel === 'platform' && channelCommand === 'insights:mcp-call-tool' && direction === 'request'
const isInsightsResourceCall = channel === 'platform' && channelCommand === 'insights:mcp-read-resource' && direction === 'request'
Expand All @@ -74,37 +97,11 @@ class CommsClient extends EventEmitter {
// has permission to access this topic. Now, we verify the payload contains the required fields to process the request.
// If OK, emit the message to the appropriate instance/device handler (handled in ./instances.js or ./devices.js)

const payload = JSON.parse(message.toString())
const command = supportedInsightsCommands[channelCommand]
const data = payload.data || {}
const { kind, mcpServer, toolDefinition, resourceDefinition, resourceTemplateDefinition } = payload.meta || {}
const correlationData = packet.properties?.correlationData
const userProperties = packet.properties?.userProperties
if (!correlationData || !userProperties) {
console.warn('Expert Insight tool call request missing correlationData or userProperties', payload)
return // do not respond, the agent will timeout and handle it
}

const mqttOptions = { properties: { correlationData, userProperties } }
const responseTopic = `ff/v1/expert/${userId}/${sessionId}/${channel}/${channelCommand}/response`

/** Callback for failed MCP request. Publishes a structured error back to the agent. */
const onError = (content, code, error) => {
const data = {
code: code || error?.code || 'MCP_ERROR',
content: `Error: ${content}`,
isError: true
}
if (error) {
data.type = error?.name || error?.constructor?.name || 'Error'
data.message = error?.message || error?.toString()
}
this.client.publish(responseTopic, JSON.stringify(data), mqttOptions)
}
/** Callback for successful MCP request. Publishes the result back to the agent. */
const onSuccess = (result) => {
this.client.publish(responseTopic, JSON.stringify(result), mqttOptions)
}
const { onSuccess, onError } = this.createMqttCallbacks(responseTopic, mqttOptions)

// check that the mcpServer contains the required fields to process the request
if (!mcpServer || !['instance', 'device'].includes(mcpServer.instanceType) || !mcpServer.instance || !mcpServer.mcpServer) {
Expand Down Expand Up @@ -141,6 +138,24 @@ class CommsClient extends EventEmitter {
onSuccess, // success callback
onError // failure callback
)
} else if (supportedPlatformAutomationCommands[channelCommand] && direction === 'request') {
// channel command is either 'mcp-get-features' or 'mcp-call-tool'
const responseTopic = `ff/v1/expert/${userId}/${sessionId}/platform/${channelCommand}/response`
const command = supportedPlatformAutomationCommands[channelCommand]
const data = payload.data || {}
const { onSuccess, onError } = this.createMqttCallbacks(responseTopic, mqttOptions)

this.emit(
'request/platform-automation:forge', // event name
{
userId, // ID of user making the request
command, // command,
data, // payload data
meta: payload.meta
},
onSuccess, // success callback
onError // failure callback
)
}
} else if (ownerType === 'p') {
this.emit('status/project', {
Expand Down Expand Up @@ -223,6 +238,29 @@ class CommsClient extends EventEmitter {
}
}

/**
* Creates onSuccess/onError callbacks that publish results back to the agent
* over the given MQTT response topic.
*/
createMqttCallbacks (responseTopic, mqttOptions) {
const onError = (content, code, error) => {
const data = {
code: code || error?.code || 'MCP_ERROR',
content: `Error: ${content}`,
isError: true
}
if (error) {
data.type = error?.name || error?.constructor?.name || 'Error'
data.message = error?.message || error?.toString()
}
this.client.publish(responseTopic, JSON.stringify(data), mqttOptions)
}
const onSuccess = (result) => {
this.client.publish(responseTopic, JSON.stringify(result), mqttOptions)
}
return { onSuccess, onError }
}

/**
* Publish to a topic
* @param {string} topic Topic to publish to
Expand Down
3 changes: 3 additions & 0 deletions forge/comms/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const ACLManager = require('./aclManager')
const { CommsClient } = require('./commsClient')
const { DeviceCommsHandler } = require('./devices')
const { InstanceCommsHandler } = require('./instances')
const { PlatformAutomationHandler } = require('./platformAutomation.js')

/**
* This module represents the real-time comms component of the platform.
Expand Down Expand Up @@ -32,6 +33,7 @@ module.exports = fp(async function (app, _opts) {
// Create the handler for any device-related messages
const deviceCommsHandler = DeviceCommsHandler(app, client)
const instanceCommsHandler = InstanceCommsHandler(app, client)
const platformAutomationHandler = PlatformAutomationHandler(app, client)

// Not in the current release, but when we handle Launcher status
// via MQTT, it will arrive here. Compare to the status/device handler in `devices.js`
Expand All @@ -44,6 +46,7 @@ module.exports = fp(async function (app, _opts) {
devices: deviceCommsHandler,
instances: instanceCommsHandler,
aclManager: ACLManager(app),
platformAutomation: platformAutomationHandler,
platform: {
settings: {
sync: function (key) {
Expand Down
133 changes: 133 additions & 0 deletions forge/comms/platformAutomation.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// /**
// * This module provides the handler for platform automation events
// */

const { default: z } = require('zod')

/**
* PlatformAutomationHandler
* @class PlatformAutomationHandler
* @memberof forge.comms
*/
class PlatformAutomationHandler {
/**
* @param {import('../forge').ForgeApplication} app Fastify app
* @param {import('./commsClient').CommsClient} client Comms Client
*/
constructor (app, client) {
this.app = app
this.client = client

/** Tool definitions without the handler functions - for sending across the wire to the agent for tool discovery */
this._wireToolDefinitions = null
this._fullToolDefinitions = null

this.setupEventHandler()
}

/**
* Lazily loads and caches the full tool definitions (with handlers)
* from the EE MCP module.
*/
loadTools () {
if (!this._fullToolDefinitions) {
const { loadToolDefinitions } = require('../ee/lib/mcp/toolLoader')
this._fullToolDefinitions = loadToolDefinitions()
this._wireToolDefinitions = this._fullToolDefinitions.map(({ name, description, inputSchema, annotations }) => ({
name,
description,
inputSchema: inputSchema && z.toJSONSchema(z.object(inputSchema)),
annotations
}))
}
}

/**
* Returns wire-safe tool definitions (no handler functions).
*/
getToolDefinitions () {
this.loadTools()
return this._wireToolDefinitions
}

/**
* Finds a tool definition by name (including its handler).
*/
findTool (toolName) {
this.loadTools()
return this._fullToolDefinitions.find(t => t.name === toolName)
}

setupEventHandler () {
this.client.on('request/platform-automation:forge', this.eventHandler)
}

eventHandler = async ({ userId, command, data, meta } = {}, onSuccess, onError) => {
try {
let result = {}

switch (command) {
case 'mcp-get-features':
result = { tools: this.getToolDefinitions() }
break
case 'mcp-call-tool': {
const toolName = data?.name
const args = data?.input || {}

// TODO: Probably sensible to verify that toolDefinition matches the tool to ensure no tampering has occurred
const { toolDefinition } = meta || {}

const { annotations } = toolDefinition
const tool = this.findTool(toolName)

// Verify tool annotations haven't been tampered with
if (JSON.stringify({ annotations }) !== JSON.stringify({ annotations: tool.annotations })) {
return onError(
'Tool definition mismatch',
'MCP_PLATFORM_TOOL_TAMPERED'
)
}

if (!tool) {
return onError(
`Unknown platform tool: ${toolName}`,
'MCP_PLATFORM_TOOL_NOT_FOUND'
)
}

const user = await this.app.db.models.User.byId(userId)
if (user) {
const { token } = await this.app.expert.mcp.getOrCreatePlatformToken(user)
const inject = (opts) => this.app.inject({
...opts,
headers: {
...opts.headers,
authorization: `Bearer ${token}`,
'x-ff-automation-source': 'expert'
}
})

const { formatResponse } = require('../ee/lib/mcp/toolLoader')
const response = await tool.handler(args, { inject })
result = formatResponse(response)
}
break
}
default:
// unrecognized command
}

onSuccess(result)
} catch (err) {
return onError(
`An error occurred performing a platform automation request: ${err.message}`,
'MCP_PLATFORM_AUTOMATION_REQUEST_ERROR',
err
)
}
}
}

module.exports = {
PlatformAutomationHandler: (app, client) => new PlatformAutomationHandler(app, client)
}
4 changes: 2 additions & 2 deletions forge/db/controllers/AccessToken.js
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ module.exports = {
/**
* Create an AccessToken for the editor.
*/
createTokenForUser: async function (app, user, expiresAt, scope = [], includeRefresh) {
createTokenForUser: async function (app, user, expiresAt, scope = [], includeRefresh, ownerType = 'user') {
const userId = typeof user === 'number' ? user : user.id
const token = generateToken(32, 'ffu')
const refreshToken = includeRefresh ? generateToken(32, 'ffu') : null
Expand All @@ -162,7 +162,7 @@ module.exports = {
expiresAt,
scope,
ownerId: '' + userId,
ownerType: 'user'
ownerType
})
return { token, expiresAt, refreshToken }
},
Expand Down
41 changes: 39 additions & 2 deletions forge/ee/lib/expert/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@ const fp = require('fastify-plugin')

const TOKEN_CACHE_NAME = 'ExpertMCPAccessTokenCache'

const EXPERT_MCP_SCOPE = 'ff-expert:mcp'
const EXPERT_MCP_PLATFORM_SCOPE = 'ff-expert:platform'
// Dedicated owner type so platform-automation tokens are not treated as general user tokens
const EXPERT_MCP_PLATFORM_OWNER_TYPE = 'user:expert-mcp'

const EXPERT_MCP_SCOPES = [
EXPERT_MCP_SCOPE,
EXPERT_MCP_PLATFORM_SCOPE
]

module.exports = fp(async function (app, _opts) {
// Get the assistant service configuration
const serviceEnabled = app.config.expert?.enabled === true
Expand Down Expand Up @@ -60,7 +70,7 @@ module.exports = fp(async function (app, _opts) {
httpNodeAuth = deviceSettings?.httpNodeAuth
}
const tokenName = 'FlowFuse Expert MCP Access Token'
const scope = ['ff-expert:mcp', instanceType]
const scope = [EXPERT_MCP_SCOPE, instanceType]
if (httpNodeAuth?.type === 'flowforge-user' && teamHttpSecurityFeatureEnabled) {
// FlowFuse auth is enabled for this instance
const expiresAt = new Date(Date.now() + (TOKEN_TTL))
Expand Down Expand Up @@ -97,6 +107,30 @@ module.exports = fp(async function (app, _opts) {
return readCachedMcpAccessToken(instanceId)
}

async function getOrCreateMcpPlatformToken (user) {
const cacheKey = `platform:${user.hashid}`
const cached = await readCachedMcpAccessToken(cacheKey)
if (cached) {
return cached
}

const expiresAt = new Date(Date.now() + TOKEN_TTL)
const { token } = await app.db.controllers.AccessToken.createTokenForUser(
user,
expiresAt,
[EXPERT_MCP_PLATFORM_SCOPE],
undefined,
EXPERT_MCP_PLATFORM_OWNER_TYPE
)

const entry = { token }
await tokenCache().set(cacheKey, {
value: entry,
expiresAt: Date.now() + TOKEN_TTL
})
return entry
}

app.decorate('expert', {
serviceEnabled,
expertUrl,
Expand All @@ -105,7 +139,10 @@ module.exports = fp(async function (app, _opts) {
mcp: {
clearTokenCache: clearMcpAccessTokenCache,
getCachedToken: getCachedMcpAccessToken,
getOrCreateToken: getOrCreateMcpAccessToken
getOrCreateToken: getOrCreateMcpAccessToken,
getOrCreatePlatformToken: getOrCreateMcpPlatformToken
}
})
}, { name: 'app.expert' })

module.exports.EXPERT_MCP_SCOPES = EXPERT_MCP_SCOPES
Loading
Loading