Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions src/channels/email/smtp-imap/SmtpImapChannelHost.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,8 @@ export class SmtpImapChannelHost {
if (oauth2?.accessToken && oauth2.accessTokenExpiry && Date.now() >= oauth2.accessTokenExpiry) {
logger.info({ channelProviderId }, 'SMTP/IMAP outgoing: OAuth2 token expired, refreshing inline');
await this.oauth2TokenRefreshService.refreshProvider(channelProviderId);
const refreshedRawConfig = await this.secretRefUtils.resolveObject(providerRecord.config as Record<string, unknown>);
const refreshedResult = smtpImapChannelProviderConfigSchema.safeParse(refreshedRawConfig);
if (refreshedResult.success) {
configResult.data = refreshedResult.data;
}
}

const { oauth2: refreshedOAuth2 } = configResult.data;

let resolvedStageId = body.stageId ?? queryStageId;
if (!resolvedStageId) {
const project = await this.projectService.getProjectById(projectId, SYSTEM_CONTEXT);
Expand Down Expand Up @@ -174,12 +167,12 @@ export class SmtpImapChannelHost {
threadingStrategy ?? 'messageId',
this.sessionManager,
subject,
channelProviderId,
smtp.host,
smtp.port,
smtp.secure,
smtp.auth.user,
smtp.auth.pass,
refreshedOAuth2?.accessToken,
);

try {
Expand Down Expand Up @@ -243,6 +236,7 @@ export class SmtpImapChannelHost {
keySettings: Record<string, unknown> | null,
fromAddress: string,
threadingStrategy: 'messageId' | 'senderSubject',
providerId: string,
smtpHost: string,
smtpPort: number,
smtpSecure: boolean,
Expand All @@ -256,7 +250,6 @@ export class SmtpImapChannelHost {
references: string | string[] | undefined,
stageId: string | undefined,
agentId: string | undefined,
oauth2AccessToken: string | undefined,
): Promise<void> {
const replyConversationId = extractConversationIdFromMessageId(inReplyTo) ?? extractConversationIdFromReferences(references);
logger.info({ projectId, from: senderEmail, inReplyTo, references, replyConversationId }, 'SMTP/IMAP: inbound email threading headers');
Expand Down Expand Up @@ -286,12 +279,12 @@ export class SmtpImapChannelHost {
threadingStrategy ?? 'messageId',
this.sessionManager,
subject ?? 'Re: Conversation',
providerId,
smtpHost,
smtpPort,
smtpSecure,
smtpAuthUser,
smtpAuthPass,
oauth2AccessToken,
);

try {
Expand Down
65 changes: 51 additions & 14 deletions src/channels/email/smtp-imap/SmtpImapConnection.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import { eq } from 'drizzle-orm';
import { container } from 'tsyringe';
import type { Session, SessionManager } from '../../SessionManager';
import type { CALOutputMessage } from '../../messages';
import * as nodemailer from 'nodemailer';
import { db } from '../../../db';
import { providers } from '../../../db/schema';
import { EmailConnectionBase, type EmailHeaders } from '../shared/EmailConnectionBase';
import { extractDomainFromEmail, generateEmailMessageId } from '../shared/MessageIdUtils';
import { logger } from '../../../utils/logger';
import { smtpImapChannelProviderConfigSchema } from '../../../services/providers/channel/SmtpImapChannelProvider';

export class SmtpImapConnection extends EmailConnectionBase {
readonly connectionType = 'smtp_imap' as const;
Expand All @@ -17,36 +22,30 @@ export class SmtpImapConnection extends EmailConnectionBase {
private inboundMessageId: string | undefined;
private referencesChain: string[] = [];
private skipNextEmail = false;
private cachedOAuth2Token: string | undefined;

constructor(
private readonly toAddress: string,
fromAddress: string,
threadingStrategy: 'messageId' | 'senderSubject',
sessionManager: SessionManager,
private readonly subject: string,
private readonly providerId: string,
smtpHost: string,
smtpPort: number,
smtpSecure: boolean,
smtpAuthUser: string,
smtpAuthPass: string,
private oauth2AccessToken: string | undefined,
private readonly smtpAuthPass: string,
) {
super(fromAddress, threadingStrategy, sessionManager, 'smtp_imap');
this.smtpAuthUser = smtpAuthUser;
this.smtpHost = smtpHost;
this.smtpPort = smtpPort;
this.smtpSecure = smtpSecure;
this.oauth2AccessToken = oauth2AccessToken || undefined;
this.createTransporter(smtpAuthPass);
}

setOAuth2AccessToken(token: string): void {
this.oauth2AccessToken = token;
this.createTransporter();
}

private createTransporter(smtpAuthPass?: string): void {
const isOAuth2 = !!this.oauth2AccessToken;
private createTransporter(oauth2Token?: string): void {
const isOAuth2 = !!oauth2Token;
const transporterConfig: Record<string, unknown> = {
host: this.smtpHost,
port: this.smtpPort,
Expand All @@ -57,18 +56,55 @@ export class SmtpImapConnection extends EmailConnectionBase {
? {
type: 'OAuth2',
user: this.smtpAuthUser,
accessToken: this.oauth2AccessToken,
accessToken: oauth2Token,
}
: {
user: this.smtpAuthUser,
pass: smtpAuthPass,
pass: this.smtpAuthPass,
},
};
logger.info({ host: this.smtpHost, port: this.smtpPort, secure: this.smtpSecure, authType: isOAuth2 ? 'OAuth2' : 'LOGIN' }, 'SMTP/IMAP: creating transporter');
this.transporter = nodemailer.createTransport(transporterConfig as nodemailer.TransportOptions);
}

private async ensureTransporter(): Promise<void> {
if (this.transporter && !this.cachedOAuth2Token) {
return;
}

const provider = await db.query.providers.findFirst({
where: eq(providers.id, this.providerId),
});

if (!provider) {
if (!this.transporter) {
this.createTransporter();
}
return;
}

const { SecretRefUtils } = await import('../../../services/secrets/SecretRefUtils');
const secretRefUtils = container.resolve(SecretRefUtils);
const rawConfig = await secretRefUtils.resolveObject(provider.config as Record<string, unknown>);
const configResult = smtpImapChannelProviderConfigSchema.safeParse(rawConfig);

if (!configResult.success) {
if (!this.transporter) {
this.createTransporter();
}
return;
}

const newToken = configResult.data.oauth2?.accessToken;

if (newToken !== this.cachedOAuth2Token) {
this.cachedOAuth2Token = newToken;
this.createTransporter(newToken);
}
}

async verifyConnection(): Promise<void> {
await this.ensureTransporter();
if (!this.transporter) {
throw new Error('SMTP transporter not initialized');
}
Expand Down Expand Up @@ -138,7 +174,8 @@ export class SmtpImapConnection extends EmailConnectionBase {
await this.sendEmail(this.toAddress, this.subject, body, headers);
}

protected async sendEmail(to: string, subject: string, body: string, headers?: EmailHeaders): Promise<void> {
protected async sendEmail(to: string, subject: string, body: string, headers?: EmailHeaders): Promise<void> {
await this.ensureTransporter();
const messageId = headers?.messageId ?? this.generateMessageId();
const from = headers?.from ?? this.fromAddress ?? this.smtpAuthUser;

Expand Down
2 changes: 1 addition & 1 deletion src/services/ImapInboundService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ class ImapMailboxSession {
this.keySettings,
this.fromAddress,
this.threadingStrategy,
this.providerId,
this.smtpHost,
this.smtpPort,
this.smtpSecure,
Expand All @@ -294,7 +295,6 @@ class ImapMailboxSession {
references,
undefined,
undefined,
this.oauth2AccessToken,
);

this.processedUids.add(uid);
Expand Down
Loading