From ac59b8acf08fa645dc8d8c03a93fff18a9cdcf03 Mon Sep 17 00:00:00 2001 From: Patryk Bajer Date: Fri, 19 Jun 2026 19:36:10 +0200 Subject: [PATCH] feat: add OAuth2/XOAUTH2 support to SMTP/IMAP channel - Add optional oauth2 config block to SmtpImapChannelProvider schema - OAuth2 token refresh service polling every 5 minutes - XOAUTH2 SMTP transport (nodemailer) when oauth2 token present - Manual SASL XOAUTH2 for IMAP (imap package) when oauth2 token present - Three OAuth2 API endpoints: /authorize, /callback, /refresh - OAuth2 tokens threaded through SmtpImapChannelHost to connections - OAuth2 secrets (clientSecret, refreshToken, accessToken) stored as secret refs - OAuthTokenRefreshError (502) in error handler - Register SmtpImapOAuth2Controller routes + OpenAPI paths in swagger - Start OAuth2TokenRefreshService on server boot --- .../email/smtp-imap/SmtpImapChannelHost.ts | 5 +- .../email/smtp-imap/SmtpImapConnection.ts | 47 ++- src/errors.ts | 10 + src/http/contracts/smtp-imap-oauth2.ts | 44 +++ .../controllers/SmtpImapOAuth2Controller.ts | 304 ++++++++++++++++++ src/http/middleware/errorHandler.ts | 7 +- src/server.ts | 6 + src/services/ImapInboundService.ts | 38 ++- src/services/OAuth2TokenRefreshService.ts | 182 +++++++++++ .../channel/SmtpImapChannelProvider.ts | 11 + src/services/secrets/SecretRefUtils.ts | 3 + src/swagger.ts | 7 + 12 files changed, 645 insertions(+), 19 deletions(-) create mode 100644 src/http/contracts/smtp-imap-oauth2.ts create mode 100644 src/http/controllers/SmtpImapOAuth2Controller.ts create mode 100644 src/services/OAuth2TokenRefreshService.ts diff --git a/src/channels/email/smtp-imap/SmtpImapChannelHost.ts b/src/channels/email/smtp-imap/SmtpImapChannelHost.ts index 15a2d3a0..fbf64c70 100644 --- a/src/channels/email/smtp-imap/SmtpImapChannelHost.ts +++ b/src/channels/email/smtp-imap/SmtpImapChannelHost.ts @@ -121,7 +121,7 @@ export class SmtpImapChannelHost { res.status(500).json({ error: 'Channel provider config is invalid' }); return; } - const { fromAddress, smtp, threadingStrategy } = configResult.data; + const { fromAddress, smtp, threadingStrategy, oauth2 } = configResult.data; let resolvedStageId = body.stageId ?? queryStageId; if (!resolvedStageId) { @@ -165,6 +165,7 @@ export class SmtpImapChannelHost { smtp.secure, smtp.auth.user, smtp.auth.pass, + oauth2?.accessToken, ); try { @@ -241,6 +242,7 @@ export class SmtpImapChannelHost { references: string | string[] | undefined, stageId: string | undefined, agentId: string | undefined, + oauth2AccessToken: string | undefined, ): Promise { const replyConversationId = extractConversationIdFromMessageId(inReplyTo) ?? extractConversationIdFromReferences(references); logger.info({ projectId, from: senderEmail, inReplyTo, references, replyConversationId }, 'SMTP/IMAP: inbound email threading headers'); @@ -275,6 +277,7 @@ export class SmtpImapChannelHost { smtpSecure, smtpAuthUser, smtpAuthPass, + oauth2AccessToken, ); try { diff --git a/src/channels/email/smtp-imap/SmtpImapConnection.ts b/src/channels/email/smtp-imap/SmtpImapConnection.ts index 3cfb282a..2d235577 100644 --- a/src/channels/email/smtp-imap/SmtpImapConnection.ts +++ b/src/channels/email/smtp-imap/SmtpImapConnection.ts @@ -8,8 +8,11 @@ import { logger } from '../../../utils/logger'; export class SmtpImapConnection extends EmailConnectionBase { readonly connectionType = 'smtp_imap' as const; - private transporter: nodemailer.Transporter; + private transporter: nodemailer.Transporter | null = null; private readonly smtpAuthUser: string; + private readonly smtpHost: string; + private readonly smtpPort: number; + private readonly smtpSecure: boolean; private conversationId: string | undefined; private inboundMessageId: string | undefined; private referencesChain: string[] = []; @@ -26,21 +29,43 @@ export class SmtpImapConnection extends EmailConnectionBase { smtpSecure: boolean, smtpAuthUser: string, smtpAuthPass: string, + private oauth2AccessToken: string | undefined, ) { super(fromAddress, threadingStrategy, sessionManager, 'smtp_imap'); this.smtpAuthUser = smtpAuthUser; + this.smtpHost = smtpHost; + this.smtpPort = smtpPort; + this.smtpSecure = smtpSecure; + this.oauth2AccessToken = oauth2AccessToken || undefined; + this.createTransporter(smtpAuthPass); + } + + setOAuth2AccessToken(token: string): void { + this.oauth2AccessToken = token; + this.createTransporter(); + } + + private createTransporter(smtpAuthPass?: string): void { this.transporter = nodemailer.createTransport({ - host: smtpHost, - port: smtpPort, - secure: smtpSecure, - auth: { - user: smtpAuthUser, - pass: smtpAuthPass, - }, - }); + host: this.smtpHost, + port: this.smtpPort, + secure: this.smtpSecure, + auth: this.oauth2AccessToken + ? { + user: this.smtpAuthUser, + xoauth2: Buffer.from(`user=${this.smtpAuthUser}\x01auth=Bearer ${this.oauth2AccessToken}\x01\x01`, 'utf-8').toString('base64'), + } + : { + user: this.smtpAuthUser, + pass: smtpAuthPass, + }, + } as nodemailer.TransportOptions); } async verifyConnection(): Promise { + if (!this.transporter) { + throw new Error('SMTP transporter not initialized'); + } try { await this.transporter.verify(); logger.info({ to: this.toAddress }, 'SMTP/IMAP: transporter verified successfully'); @@ -127,6 +152,10 @@ export class SmtpImapConnection extends EmailConnectionBase { (mailOptions.headers as Record)['References'] = headers.references; } + if (!this.transporter) { + logger.error({ to }, 'SMTP/IMAP: transporter not initialized'); + return; + } try { const info = await this.transporter.sendMail(mailOptions); logger.info({ to, messageId, sessionId: this.session?.id, messageIdRemote: info.messageId }, 'SMTP/IMAP email sent'); diff --git a/src/errors.ts b/src/errors.ts index a6802226..1fa702be 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -153,6 +153,16 @@ export class ConflictError extends Error { } } +/** + * Error thrown when an OAuth2 token refresh fails for an SMTP/IMAP provider + */ +export class OAuthTokenRefreshError extends Error { + constructor(message: string) { + super(message); + this.name = 'OAuthTokenRefreshError'; + } +} + /** A single field-level validation error detail, compatible with Zod issue format */ export type ValidationErrorDetail = { code: string; diff --git a/src/http/contracts/smtp-imap-oauth2.ts b/src/http/contracts/smtp-imap-oauth2.ts new file mode 100644 index 00000000..67158210 --- /dev/null +++ b/src/http/contracts/smtp-imap-oauth2.ts @@ -0,0 +1,44 @@ +import { z } from 'zod'; +import { extendZodWithOpenApi } from '@asteasolutions/zod-to-openapi'; + +extendZodWithOpenApi(z); + +export const oauth2AuthorizeBodySchema = z.strictObject({ + providerId: z.string().min(1).describe('ID of the SMTP/IMAP channel provider to configure OAuth2 for'), + tokenUrl: z.string().url().describe('OAuth2 token endpoint URL (e.g. https://oauth2.googleapis.com/token for Gmail)'), + authorizationUrl: z.string().url().describe('OAuth2 authorization endpoint URL (e.g. https://accounts.google.com/o/oauth2/v2/auth for Gmail)'), + clientId: z.string().min(1).describe('OAuth2 client ID'), + scope: z.string().min(1).describe('OAuth2 scope string (e.g. https://www.googleapis.com/auth/gmail.modify for Gmail)'), + redirectUrl: z.string().url().describe('Redirect URI registered with the OAuth2 provider (must match the callback endpoint)'), +}); + +export const oauth2AuthorizeResponseSchema = z.strictObject({ + authorizationUrl: z.string().url().describe('Full authorization URL to redirect the user to'), + state: z.string().describe('Random state parameter for CSRF protection'), +}); + +export const oauth2CallbackQuerySchema = z.strictObject({ + code: z.string().min(1).describe('Authorization code from the OAuth2 provider'), + state: z.string().min(1).describe('State parameter that was returned from the authorization URL'), +}); + +export const oauth2CallbackResponseSchema = z.strictObject({ + success: z.boolean().describe('Whether the OAuth2 callback was processed successfully'), + message: z.string().describe('Human-readable result message'), +}); + +export const oauth2RefreshBodySchema = z.strictObject({ + providerId: z.string().min(1).describe('ID of the SMTP/IMAP channel provider to refresh tokens for'), +}); + +export const oauth2RefreshResponseSchema = z.strictObject({ + success: z.boolean().describe('Whether the token refresh was successful'), + accessTokenExpiry: z.number().int().optional().describe('Unix timestamp in milliseconds when the access token expires'), +}); + +export type OAuth2AuthorizeBody = z.infer; +export type OAuth2AuthorizeResponse = z.infer; +export type OAuth2CallbackQuery = z.infer; +export type OAuth2CallbackResponse = z.infer; +export type OAuth2RefreshBody = z.infer; +export type OAuth2RefreshResponse = z.infer; diff --git a/src/http/controllers/SmtpImapOAuth2Controller.ts b/src/http/controllers/SmtpImapOAuth2Controller.ts new file mode 100644 index 00000000..5cf9a8d5 --- /dev/null +++ b/src/http/controllers/SmtpImapOAuth2Controller.ts @@ -0,0 +1,304 @@ +import { inject, singleton } from 'tsyringe'; +import type { Request, Response, Router } from 'express'; +import type { RouteConfig } from '@asteasolutions/zod-to-openapi'; +import { z } from 'zod'; +import { eq } from 'drizzle-orm'; +import { randomBytes } from 'crypto'; +import { db } from '../../db'; +import { providers } from '../../db/schema'; +import { smtpImapChannelProviderConfigSchema } from '../../services/providers/channel/SmtpImapChannelProvider'; +import { SecretRefUtils } from '../../services/secrets/SecretRefUtils'; +import { OAuth2TokenRefreshService } from '../../services/OAuth2TokenRefreshService'; +import { logger } from '../../utils/logger'; +import { asyncHandler } from '../../utils/asyncHandler'; +import { + oauth2AuthorizeBodySchema, + oauth2AuthorizeResponseSchema, + oauth2CallbackQuerySchema, + oauth2CallbackResponseSchema, + oauth2RefreshBodySchema, + oauth2RefreshResponseSchema, + type OAuth2AuthorizeResponse, + type OAuth2CallbackResponse, + type OAuth2RefreshResponse, +} from '../contracts/smtp-imap-oauth2'; + +const STATE_TTL_MS = 10 * 60 * 1000; + +type OAuth2StateEntry = { + providerId: string; + tokenUrl: string; + clientId: string; + clientSecret: string; + redirectUrl: string; + scope: string; + createdAt: number; +}; + +@singleton() +export class SmtpImapOAuth2Controller { + private readonly pendingStates = new Map(); + + constructor( + @inject(SecretRefUtils) private readonly secretRefUtils: SecretRefUtils, + @inject(OAuth2TokenRefreshService) private readonly tokenRefreshService: OAuth2TokenRefreshService, + ) { + setInterval(() => { + const now = Date.now(); + for (const [state, entry] of this.pendingStates.entries()) { + if (now - entry.createdAt > STATE_TTL_MS) { + this.pendingStates.delete(state); + } + } + }, 60_000).unref(); + } + + static getOpenAPIPaths(): RouteConfig[] { + return [ + { + method: 'post', + path: '/api/email/smtp-imap/oauth2/authorize', + tags: ['SMTP/IMAP OAuth2'], + summary: 'Generate OAuth2 authorization URL', + description: 'Generates an authorization URL for the user to grant OAuth2 access to their email account. The user should be redirected to this URL.', + request: { + body: { content: { 'application/json': { schema: oauth2AuthorizeBodySchema } } }, + }, + responses: { + 200: { description: 'Authorization URL generated', content: { 'application/json': { schema: oauth2AuthorizeResponseSchema } } }, + 400: { description: 'Invalid request or provider not found' }, + }, + }, + { + method: 'get', + path: '/api/email/smtp-imap/oauth2/callback', + tags: ['SMTP/IMAP OAuth2'], + summary: 'OAuth2 authorization callback', + description: 'Handles the OAuth2 authorization callback. The OAuth2 provider redirects here with the authorization code.', + request: { + query: oauth2CallbackQuerySchema, + }, + responses: { + 200: { description: 'Callback processed', content: { 'application/json': { schema: oauth2CallbackResponseSchema } } }, + 400: { description: 'Invalid or expired state' }, + }, + }, + { + method: 'post', + path: '/api/email/smtp-imap/oauth2/refresh', + tags: ['SMTP/IMAP OAuth2'], + summary: 'Manually trigger OAuth2 token refresh', + description: 'Immediately refreshes the OAuth2 access token for the given provider.', + request: { + body: { content: { 'application/json': { schema: oauth2RefreshBodySchema } } }, + }, + responses: { + 200: { description: 'Token refresh initiated', content: { 'application/json': { schema: oauth2RefreshResponseSchema } } }, + 400: { description: 'Provider not found or not configured for OAuth2' }, + }, + }, + ]; + } + + registerRoutes(router: Router): void { + router.post('/api/email/smtp-imap/oauth2/authorize', asyncHandler(this.handleAuthorize.bind(this))); + router.get('/api/email/smtp-imap/oauth2/callback', asyncHandler(this.handleCallback.bind(this))); + router.post('/api/email/smtp-imap/oauth2/refresh', asyncHandler(this.handleRefresh.bind(this))); + } + + private async handleAuthorize(req: Request, res: Response): Promise { + const body = oauth2AuthorizeBodySchema.parse(req.body); + const { providerId, tokenUrl, authorizationUrl, clientId, scope, redirectUrl } = body; + + const provider = await db.query.providers.findFirst({ where: eq(providers.id, providerId) }); + if (!provider || provider.apiType !== 'smtp_imap') { + res.status(400).json({ error: 'Provider not found or not an SMTP/IMAP channel' }); + return; + } + + const rawConfig = await this.secretRefUtils.resolveObject(provider.config as Record); + const configResult = smtpImapChannelProviderConfigSchema.safeParse(rawConfig); + if (!configResult.success) { + res.status(400).json({ error: 'Provider config is invalid' }); + return; + } + + const oauth2 = configResult.data.oauth2; + const clientSecret = oauth2?.clientSecret ?? ''; + + const state = randomBytes(16).toString('hex'); + + this.pendingStates.set(state, { + providerId, + tokenUrl, + clientId, + clientSecret, + redirectUrl, + scope, + createdAt: Date.now(), + }); + + const params = new URLSearchParams({ + response_type: 'code', + client_id: clientId, + redirect_uri: redirectUrl, + scope: encodeURIComponent(scope), + access_type: 'offline', + prompt: 'consent', + state, + }); + + const fullAuthUrl = `${authorizationUrl}?${params.toString()}`; + + const response: OAuth2AuthorizeResponse = oauth2AuthorizeResponseSchema.parse({ + authorizationUrl: fullAuthUrl, + state, + }); + + res.status(200).json(response); + } + + private async handleCallback(req: Request, res: Response): Promise { + const query = oauth2CallbackQuerySchema.parse(req.query); + const { code, state } = query; + + const entry = this.pendingStates.get(state); + if (!entry) { + res.status(400).json({ error: 'Invalid or expired state parameter' }); + return; + } + this.pendingStates.delete(state); + + try { + const tokenResponse = await this.exchangeCodeForToken( + entry.tokenUrl, + entry.clientId, + entry.clientSecret, + entry.redirectUrl, + code, + ); + + const provider = await db.query.providers.findFirst({ where: eq(providers.id, entry.providerId) }); + if (!provider) { + res.status(400).json({ error: 'Provider not found' }); + return; + } + + const updatedConfig: Record = { ...provider.config }; + if (typeof updatedConfig.oauth2 !== 'object' || updatedConfig.oauth2 === null) { + updatedConfig.oauth2 = {}; + } + + const oauth2Config = updatedConfig.oauth2 as Record; + oauth2Config.tokenUrl = entry.tokenUrl; + oauth2Config.clientId = entry.clientId; + oauth2Config.accessToken = tokenResponse.access_token; + oauth2Config.refreshToken = tokenResponse.refresh_token; + oauth2Config.accessTokenExpiry = Date.now() + (tokenResponse.expires_in ?? 3600) * 1000; + oauth2Config.scope = entry.scope; + + const secretizedConfig = await this.secretRefUtils.secretizeObject(updatedConfig, new Set()); + + await db.update(providers) + .set({ + config: secretizedConfig, + updatedAt: new Date(), + }) + .where(eq(providers.id, entry.providerId)); + + logger.info({ providerId: entry.providerId }, 'OAuth2 tokens stored successfully'); + + const response: OAuth2CallbackResponse = oauth2CallbackResponseSchema.parse({ + success: true, + message: 'OAuth2 tokens stored successfully', + }); + + res.status(200).json(response); + } catch (error) { + logger.error({ error, providerId: entry.providerId }, 'Failed to exchange OAuth2 code for tokens'); + const response: OAuth2CallbackResponse = oauth2CallbackResponseSchema.parse({ + success: false, + message: `Failed to exchange code for tokens: ${error instanceof Error ? error.message : String(error)}`, + }); + res.status(400).json(response); + } + } + + private async handleRefresh(req: Request, res: Response): Promise { + const body = oauth2RefreshBodySchema.parse(req.body); + const { providerId } = body; + + const provider = await db.query.providers.findFirst({ where: eq(providers.id, providerId) }); + if (!provider || provider.apiType !== 'smtp_imap') { + res.status(400).json({ error: 'Provider not found or not an SMTP/IMAP channel' }); + return; + } + + const rawConfig = await this.secretRefUtils.resolveObject(provider.config as Record); + const configResult = smtpImapChannelProviderConfigSchema.safeParse(rawConfig); + if (!configResult.success || !configResult.data.oauth2) { + res.status(400).json({ error: 'Provider is not configured for OAuth2' }); + return; + } + + try { + await this.tokenRefreshService.refreshProvider(providerId); + + const updatedProvider = await db.query.providers.findFirst({ where: eq(providers.id, providerId) }); + const updatedConfig = await this.secretRefUtils.resolveObject(updatedProvider!.config as Record); + const parsedConfig = smtpImapChannelProviderConfigSchema.parse(updatedConfig); + + const response: OAuth2RefreshResponse = oauth2RefreshResponseSchema.parse({ + success: true, + accessTokenExpiry: parsedConfig.oauth2?.accessTokenExpiry, + }); + + res.status(200).json(response); + } catch (error) { + const response: OAuth2RefreshResponse = oauth2RefreshResponseSchema.parse({ + success: false, + accessTokenExpiry: undefined, + }); + res.status(502).json(response); + } + } + + private async exchangeCodeForToken( + tokenUrl: string, + clientId: string, + clientSecret: string, + redirectUrl: string, + code: string, + ): Promise<{ access_token: string; refresh_token: string; expires_in: number }> { + const response = await fetch(tokenUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/x-www-form-urlencoded', + }, + body: new URLSearchParams({ + grant_type: 'authorization_code', + client_id: clientId, + client_secret: clientSecret, + redirect_uri: redirectUrl, + code, + }), + }); + + if (!response.ok) { + const body = await response.text(); + throw new Error(`Token exchange failed: ${response.status} ${body}`); + } + + const json = await response.json(); + + if (!json.access_token || !json.refresh_token) { + throw new Error('Token exchange response missing access_token or refresh_token'); + } + + return { + access_token: json.access_token, + refresh_token: json.refresh_token, + expires_in: json.expires_in ?? 3600, + }; + } +} diff --git a/src/http/middleware/errorHandler.ts b/src/http/middleware/errorHandler.ts index 233aa200..01c90c33 100644 --- a/src/http/middleware/errorHandler.ts +++ b/src/http/middleware/errorHandler.ts @@ -1,6 +1,6 @@ import type { Request, Response, NextFunction } from 'express'; import { z } from 'zod'; -import { OptimisticLockError, NotFoundError, InvalidOperationError, RemoteConnectionError, AccessDeniedError, UnauthorizedError, ForbiddenError, ArchivedProjectError, TooManyRequestsError, ConflictError, ValidationError } from '../../errors'; +import { OptimisticLockError, NotFoundError, InvalidOperationError, RemoteConnectionError, AccessDeniedError, UnauthorizedError, ForbiddenError, ArchivedProjectError, TooManyRequestsError, ConflictError, ValidationError, OAuthTokenRefreshError } from '../../errors'; import logger from '../../utils/logger'; /** @@ -66,6 +66,11 @@ export function errorHandler(err: any, req: Request, res: Response, next: NextFu return; } + if (err instanceof OAuthTokenRefreshError) { + res.status(502).json({ error: err.message }); + return; + } + if (err instanceof AccessDeniedError) { res.status(403).json({ error: err.message }); return; diff --git a/src/server.ts b/src/server.ts index bee81be8..689dc854 100644 --- a/src/server.ts +++ b/src/server.ts @@ -37,6 +37,7 @@ import { ProjectExchangeController } from './http/controllers/ProjectExchangeCon import { ConversationTimeoutService } from './services/ConversationTimeoutService'; import { ScenarioRunExecutorService } from './services/testing/ScenarioRunExecutorService'; import { ImapInboundService } from './services/ImapInboundService'; +import { OAuth2TokenRefreshService } from './services/OAuth2TokenRefreshService'; import { errorHandler } from './http/middleware/errorHandler'; import { optionalAuthMiddleware } from './http/middleware/auth'; import { requestContextMiddleware } from './http/middleware/requestContext'; @@ -52,6 +53,7 @@ import { TelegramChannelHost } from './channels/telegram/TelegramChannelHost'; // import { SendGridChannelHost } from './channels/email/sendgrid/SendGridChannelHost'; // import { SesChannelHost } from './channels/email/ses/SesChannelHost'; import { SmtpImapChannelHost } from './channels/email/smtp-imap/SmtpImapChannelHost'; +import { SmtpImapOAuth2Controller } from './http/controllers/SmtpImapOAuth2Controller'; import logger from './utils/logger'; import { fileURLToPath } from 'url'; import { SecretsManagerRegistry } from './services/secrets/SecretsManagerRegistry'; @@ -284,6 +286,9 @@ export async function createApp(): Promise { // container.resolve(SesChannelHost).registerRoutes(app); container.resolve(SmtpImapChannelHost).registerRoutes(app); + const smtpImapOAuth2Controller = container.resolve(SmtpImapOAuth2Controller); + smtpImapOAuth2Controller.registerRoutes(app); + try { await SpeexResamplerClass.initPromise; const warmup = new SpeexResamplerClass(1, 16000, 8000, 3); @@ -309,6 +314,7 @@ export async function createApp(): Promise { container.resolve(ScenarioRunExecutorService).start(); container.resolve(BenchmarkExecutorService).start(); container.resolve(ImapInboundService).start(); + container.resolve(OAuth2TokenRefreshService).start(); app.use(errorHandler); diff --git a/src/services/ImapInboundService.ts b/src/services/ImapInboundService.ts index cc06f0ec..a99ca0a0 100644 --- a/src/services/ImapInboundService.ts +++ b/src/services/ImapInboundService.ts @@ -44,6 +44,7 @@ class ImapMailboxSession { public readonly smtpAuthUser: string, public readonly smtpAuthPass: string, public readonly keySettings: Record | null, + public readonly oauth2AccessToken: string | undefined, ) {} public async connect(): Promise { @@ -51,9 +52,11 @@ class ImapMailboxSession { this.state = 'connecting'; try { + const useOAuth2 = this.oauth2AccessToken != null && this.oauth2AccessToken.length > 0; + const imap = new ImapConnection({ - user: this.imapUser, - password: this.imapPass, + user: useOAuth2 ? undefined : this.imapUser, + password: useOAuth2 ? undefined : this.imapPass, host: this.imapHost, port: this.imapPort, tls: this.imapSecure, @@ -63,15 +66,31 @@ class ImapMailboxSession { keepalive: true, }); - await new Promise((resolve, reject) => { - imap.once('ready', () => resolve()); - imap.once('error', reject); - imap.connect(); - }); + if (useOAuth2) { + await new Promise((resolve, reject) => { + const xoauth2Token = Buffer.from( + `user=${this.imapUser}\x01auth=Bearer ${this.oauth2AccessToken}\x01\x01`, + 'utf-8', + ).toString('base64'); + + imap.once('connect', () => { + (imap as any).C('AUTHENTICATE XOAUTH2 ' + xoauth2Token); + }); + imap.once('ready', () => resolve()); + imap.once('error', reject); + imap.connect(); + }); + } else { + await new Promise((resolve, reject) => { + imap.once('ready', () => resolve()); + imap.once('error', reject); + imap.connect(); + }); + } this.imap = imap; this.consecutiveErrors = 0; - logger.info({ providerId: this.providerId, host: this.imapHost }, 'IMAP connected'); + logger.info({ providerId: this.providerId, host: this.imapHost, authMethod: useOAuth2 ? 'XOAUTH2' : 'password' }, 'IMAP connected'); await this.openInbox(); } catch (error) { @@ -282,6 +301,7 @@ class ImapMailboxSession { references, undefined, undefined, + this.oauth2AccessToken, ); this.processedUids.add(uid); @@ -440,6 +460,7 @@ export class ImapInboundService { config.smtp.auth.user, config.smtp.auth.pass, apiKeyRecord.keySettings ?? null, + config.oauth2?.accessToken, ); this.sessions.set(provider.id, session); @@ -496,6 +517,7 @@ export class ImapInboundService { config.smtp.auth.user, config.smtp.auth.pass, apiKeyRecord.keySettings ?? null, + config.oauth2?.accessToken, ); this.sessions.set(provider.id, session); diff --git a/src/services/OAuth2TokenRefreshService.ts b/src/services/OAuth2TokenRefreshService.ts new file mode 100644 index 00000000..35f05d41 --- /dev/null +++ b/src/services/OAuth2TokenRefreshService.ts @@ -0,0 +1,182 @@ +import { inject, singleton } from 'tsyringe'; +import { eq, and } from 'drizzle-orm'; +import { db } from '../db'; +import { providers } from '../db/schema'; +import { smtpImapChannelProviderConfigSchema } from './providers/channel/SmtpImapChannelProvider'; +import { SecretRefUtils } from './secrets/SecretRefUtils'; +import { ImapInboundService } from './ImapInboundService'; +import { logger } from '../utils/logger'; + +const REFRESH_INTERVAL_MS = 5 * 60 * 1000; +const EXPIRY_BUFFER_MS = 5 * 60 * 1000; + +type OAuth2TokenResponse = { + access_token: string; + token_type: string; + refresh_token?: string; + expires_in?: number; +}; + +@singleton() +export class OAuth2TokenRefreshService { + private timer: NodeJS.Timeout | null = null; + private isRunning = false; + + constructor( + @inject(SecretRefUtils) private readonly secretRefUtils: SecretRefUtils, + @inject(ImapInboundService) private readonly imapInboundService: ImapInboundService, + ) {} + + start(): void { + if (this.isRunning) { + logger.warn('OAuth2TokenRefreshService already started'); + return; + } + this.isRunning = true; + logger.info('Starting OAuth2TokenRefreshService'); + this.scheduleRefresh(); + } + + stop(): void { + this.isRunning = false; + if (this.timer) { + clearTimeout(this.timer); + this.timer = null; + } + logger.info('OAuth2TokenRefreshService stopped'); + } + + async refreshProvider(providerId: string): Promise { + const provider = await db.query.providers.findFirst({ + where: eq(providers.id, providerId), + }); + + if (!provider || provider.apiType !== 'smtp_imap') { + logger.warn({ providerId }, 'Provider not found or not smtp_imap, skipping OAuth2 refresh'); + return; + } + + await this.processProviderRefresh(provider); + } + + private scheduleRefresh(): void { + if (this.timer) { + clearTimeout(this.timer); + } + this.timer = setTimeout(async () => { + if (!this.isRunning) return; + try { + await this.runRefreshCycle(); + } catch (error) { + logger.error({ error }, 'Error during OAuth2 refresh cycle'); + } + if (this.isRunning) { + this.scheduleRefresh(); + } + }, REFRESH_INTERVAL_MS); + if (this.timer) { + this.timer.unref?.(); + } + } + + private async runRefreshCycle(): Promise { + const providerRecords = await db.query.providers.findMany({ + where: and( + eq(providers.providerType, 'channel'), + ), + }); + + const smtpImapProviders = providerRecords.filter((p) => p.apiType === 'smtp_imap'); + + for (const provider of smtpImapProviders) { + try { + await this.processProviderRefresh(provider); + } catch (error) { + logger.error({ error, providerId: provider.id }, 'Failed to process OAuth2 refresh for provider'); + } + } + } + + private async processProviderRefresh(provider: { id: string; config: Record }): Promise { + const rawConfig = await this.secretRefUtils.resolveObject(provider.config); + const configResult = smtpImapChannelProviderConfigSchema.safeParse(rawConfig); + + if (!configResult.success) { + return; + } + + const config = configResult.data; + + if (!config.oauth2) { + return; + } + + const { oauth2 } = config; + const now = Date.now(); + + if (oauth2.accessTokenExpiry - now > EXPIRY_BUFFER_MS) { + return; + } + + logger.info({ providerId: provider.id, expiresIn: oauth2.accessTokenExpiry - now }, 'OAuth2 token expiring, refreshing'); + + const tokenResponse = await this.fetchToken(oauth2); + + const updatedConfig: Record = { ...provider.config }; + if (typeof updatedConfig.oauth2 !== 'object' || updatedConfig.oauth2 === null) { + updatedConfig.oauth2 = {}; + } + + const oauth2Config = updatedConfig.oauth2 as Record; + oauth2Config.accessToken = tokenResponse.access_token; + oauth2Config.accessTokenExpiry = now + (tokenResponse.expires_in ?? 3600) * 1000; + + if (tokenResponse.refresh_token) { + oauth2Config.refreshToken = tokenResponse.refresh_token; + } + + const secretizedConfig = await this.secretRefUtils.secretizeObject(updatedConfig, new Set()); + + await db.update(providers) + .set({ + config: secretizedConfig, + updatedAt: new Date(), + }) + .where(eq(providers.id, provider.id)); + + logger.info({ providerId: provider.id, newExpiry: oauth2Config.accessTokenExpiry }, 'OAuth2 token refreshed successfully'); + + this.imapInboundService.reload(provider.id).catch((error) => { + logger.error({ error, providerId: provider.id }, 'Failed to reload IMAP session after OAuth2 token refresh'); + }); + } + + private async fetchToken(oauth2: NonNullable['oauth2']>): Promise { + const response = await fetch(oauth2.tokenUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/x-www-form-urlencoded', + }, + body: new URLSearchParams({ + grant_type: 'refresh_token', + client_id: oauth2.clientId, + client_secret: oauth2.clientSecret, + refresh_token: oauth2.refreshToken, + }), + }); + + if (!response.ok) { + const body = await response.text(); + logger.error({ status: response.status, body, providerTokenUrl: oauth2.tokenUrl }, 'OAuth2 token refresh failed'); + throw new Error(`OAuth2 token refresh failed: ${response.status} ${body}`); + } + + const json = (await response.json()) as OAuth2TokenResponse; + + if (!json.access_token) { + throw new Error('OAuth2 token refresh response missing access_token'); + } + + return json; + } +} diff --git a/src/services/providers/channel/SmtpImapChannelProvider.ts b/src/services/providers/channel/SmtpImapChannelProvider.ts index 466ffb6d..1a7d9aba 100644 --- a/src/services/providers/channel/SmtpImapChannelProvider.ts +++ b/src/services/providers/channel/SmtpImapChannelProvider.ts @@ -28,12 +28,23 @@ const imapConfigSchema = z.strictObject({ pollingIntervalMs: z.number().int().min(1000).default(30000).describe('Fallback polling interval in milliseconds when IDLE is unavailable'), }).openapi('SmtpImapImapConfig'); +const oauth2ConfigSchema = z.strictObject({ + tokenUrl: z.string().url().describe('OAuth2 token endpoint URL (e.g. https://oauth2.googleapis.com/token for Gmail)'), + clientId: z.string().describe('OAuth2 client ID'), + clientSecret: z.string().describe('OAuth2 client secret'), + refreshToken: z.string().describe('OAuth2 refresh token (long-lived)'), + accessToken: z.string().describe('Current OAuth2 access token (rotated by the refresh service)'), + accessTokenExpiry: z.number().int().describe('Unix timestamp in milliseconds when the access token expires'), + scope: z.string().describe('OAuth2 scope string (e.g. https://www.googleapis.com/auth/gmail.modify for Gmail)'), +}).openapi('SmtpImapOauth2Config'); + export const smtpImapChannelProviderConfigSchema = z.strictObject({ projectId: z.string().describe('Project ID that this email channel belongs to (required for IMAP inbound routing)'), fromAddress: z.string().email().describe('Sender email address'), smtp: smtpConfigSchema.describe('SMTP server configuration for sending emails'), imap: imapConfigSchema.describe('IMAP server configuration for receiving inbound email replies'), threadingStrategy: z.enum(['messageId', 'senderSubject']).default('messageId').describe('How to derive thread ID for conversation continuity'), + oauth2: oauth2ConfigSchema.optional().describe('Optional OAuth2/XOAUTH2 configuration. When present, supersedes password-based authentication for both SMTP and IMAP.'), }).openapi('SmtpImapChannelConfig'); export type SmtpImapChannelProviderConfig = z.infer; diff --git a/src/services/secrets/SecretRefUtils.ts b/src/services/secrets/SecretRefUtils.ts index c21b5bfb..2fe9cd25 100644 --- a/src/services/secrets/SecretRefUtils.ts +++ b/src/services/secrets/SecretRefUtils.ts @@ -25,6 +25,9 @@ export const SENSITIVE_PROVIDER_CONFIG_FIELDS = new Set([ const SENSITIVE_NESTED_PATHS = [ 'smtp.auth.pass', 'imap.auth.pass', + 'oauth2.clientSecret', + 'oauth2.refreshToken', + 'oauth2.accessToken', ]; /** diff --git a/src/swagger.ts b/src/swagger.ts index 173e1e8a..20f73dc3 100644 --- a/src/swagger.ts +++ b/src/swagger.ts @@ -120,6 +120,7 @@ import { TelegramChannelHost } from './channels/telegram/TelegramChannelHost'; // import { SesChannelHost } from './channels/email/ses/SesChannelHost'; // import { SendGridChannelHost } from './channels/email/sendgrid/SendGridChannelHost'; import { SmtpImapChannelHost } from './channels/email/smtp-imap/SmtpImapChannelHost'; +import { SmtpImapOAuth2Controller } from './http/controllers/SmtpImapOAuth2Controller'; import { providerHintSchema, providerHintResolutionTargetSchema, providerHintResolutionSchema, asrConfigExchangeV1Schema, storageConfigExchangeV1Schema, moderationConfigExchangeV1Schema, fillerSettingsExchangeV1Schema, projectExchangeV1Schema, agentExchangeV1Schema, stageExchangeV1Schema, classifierExchangeV1Schema, contextTransformerExchangeV1Schema, toolExchangeV1Schema, globalActionExchangeV1Schema, guardrailExchangeV1Schema, knowledgeCategoryExchangeV1Schema, knowledgeItemExchangeV1Schema, projectExchangeBundleV1Schema, projectExchangeImportResultSchema } from './http/contracts/projectExchange'; extendZodWithOpenApi(z); @@ -667,6 +668,12 @@ export function getOpenAPISpec(): any { registry.registerPath(path); } + // Register SMTP/IMAP OAuth2 routes + const smtpImapOAuth2Paths = SmtpImapOAuth2Controller.getOpenAPIPaths(); + for (const path of smtpImapOAuth2Paths) { + registry.registerPath(path); + } + // Register Benchmark sub-schemas (reusable components) and routes registry.register('BenchmarkTimingStats', timingStatsSchema); registry.register('BenchmarkStats', benchmarkStatsSchema);