From 15c2d03704db8c7b8da1da1ffab6e104721523a6 Mon Sep 17 00:00:00 2001 From: Samuel Ajayi Date: Tue, 16 Jun 2026 23:36:54 +0100 Subject: [PATCH] Harden external API clients with timeouts and retries --- .env.example | 8 + package.json | 14 + src/config/env.ts | 8 + src/nlp/parser.ts | 25 +- src/stellar/client.ts | 126 ++++---- src/stellar/contract.ts | 17 +- src/utils/http-client.ts | 158 ++++++++++ src/utils/twilio-client.ts | 53 ++++ .../http-client.integration.test.ts | 271 ++++++++++++++++++ tests/unit/utils/http-client.test.ts | 218 ++++++++++++++ 10 files changed, 831 insertions(+), 67 deletions(-) create mode 100644 src/utils/http-client.ts create mode 100644 src/utils/twilio-client.ts create mode 100644 tests/integration/http-client.integration.test.ts create mode 100644 tests/unit/utils/http-client.test.ts diff --git a/.env.example b/.env.example index bc827cc..7a97539 100644 --- a/.env.example +++ b/.env.example @@ -65,6 +65,14 @@ INTERNAL_SERVICE_TOKEN= # Comma-separated keywords also supported: loopback,linklocal,uniquelocal TRUST_PROXY=1 +# HTTP Client (shared timeouts/retry/circuit-breaker for external services) +HTTP_CLIENT_TIMEOUT_MS=10000 +HTTP_CLIENT_MAX_RETRIES=3 +HTTP_CLIENT_BASE_DELAY_MS=200 +HTTP_CLIENT_MAX_DELAY_MS=10000 +HTTP_CLIENT_CIRCUIT_BREAKER_THRESHOLD=5 +HTTP_CLIENT_CIRCUIT_BREAKER_RESET_MS=30000 + # Dead Letter Queue DLQ_ALERT_THRESHOLD=50 DLQ_ALERT_COOLDOWN_MS=900000 diff --git a/package.json b/package.json index b87316a..bd7e9b9 100644 --- a/package.json +++ b/package.json @@ -13,9 +13,23 @@ "lint:style": "eslint \"src/**/*.ts\" \"prisma/**/*.ts\"", "format": "prettier --write .github/workflows/node-ci.yml package.json .prettierrc.json eslint.config.mjs src/nlp/parser.ts src/stellar/dlq.ts src/whatsapp/handler.ts src/whatsapp/userManager.ts tests/unit/stellar/dlq-alerts.test.ts", "format:check": "prettier --check .github/workflows/node-ci.yml package.json .prettierrc.json eslint.config.mjs src/nlp/parser.ts src/stellar/dlq.ts src/whatsapp/handler.ts src/whatsapp/userManager.ts tests/unit/stellar/dlq-alerts.test.ts", + "test": "jest", + "test:unit": "jest tests/unit", + "test:integration": "jest tests/integration", "prisma:generate": "npx prisma generate", "prisma:generate": "npx prisma generate" }, + "jest": { + "preset": "ts-jest", + "testEnvironment": "node", + "roots": ["/tests"], + "moduleFileExtensions": ["ts", "js", "json"], + "transform": { + "^.+\\.ts$": ["ts-jest", { + "tsconfig": "tsconfig.json" + }] + } + }, "prisma": { "schema": "prisma/schema.prisma", "seed": "ts-node prisma/seed.ts" diff --git a/src/config/env.ts b/src/config/env.ts index f1e5676..a493863 100644 --- a/src/config/env.ts +++ b/src/config/env.ts @@ -266,4 +266,12 @@ export const config = { alertThreshold: parseInt(process.env.DLQ_ALERT_THRESHOLD || '50'), alertCooldownMs: parseInt(process.env.DLQ_ALERT_COOLDOWN_MS || '900000'), // 15 minutes default }, + httpClient: { + timeoutMs: parseInt(process.env.HTTP_CLIENT_TIMEOUT_MS || '10000'), + maxRetries: parseInt(process.env.HTTP_CLIENT_MAX_RETRIES || '3'), + baseDelayMs: parseInt(process.env.HTTP_CLIENT_BASE_DELAY_MS || '200'), + maxDelayMs: parseInt(process.env.HTTP_CLIENT_MAX_DELAY_MS || '10000'), + circuitBreakerThreshold: parseInt(process.env.HTTP_CLIENT_CIRCUIT_BREAKER_THRESHOLD || '5'), + circuitBreakerResetMs: parseInt(process.env.HTTP_CLIENT_CIRCUIT_BREAKER_RESET_MS || '30000'), + }, } \ No newline at end of file diff --git a/src/nlp/parser.ts b/src/nlp/parser.ts index 77dc56f..a141f9c 100644 --- a/src/nlp/parser.ts +++ b/src/nlp/parser.ts @@ -1,4 +1,6 @@ import Anthropic from '@anthropic-ai/sdk' +import { HttpClientAdapter } from '../utils/http-client' +import { config } from '../config' export interface Intent { action: 'deposit' | 'withdraw' | 'balance' | 'earnings' | 'help' | 'unknown' @@ -11,6 +13,15 @@ const anthropic = new Anthropic({ apiKey: process.env.ANTHROPIC_API_KEY || 'dummy_key', }) +const anthropicHttpClient = new HttpClientAdapter({ + timeoutMs: config.httpClient.timeoutMs, + maxRetries: config.httpClient.maxRetries, + baseDelayMs: config.httpClient.baseDelayMs, + maxDelayMs: config.httpClient.maxDelayMs, + circuitBreakerThreshold: config.httpClient.circuitBreakerThreshold, + circuitBreakerResetMs: config.httpClient.circuitBreakerResetMs, +}) + // Regex fallback export function parseWithRegex(message: string): Intent | null { const lowerMsg = message.toLowerCase().trim() @@ -57,10 +68,11 @@ export function parseWithRegex(message: string): Intent | null { // Claude fallback export async function parseWithClaude(message: string): Promise { try { - const response = await anthropic.messages.create({ - model: 'claude-3-haiku-20240307', - max_tokens: 150, - system: `You are an intent parser for a financial bot. Determine if the user wants to deposit, withdraw, check balance, view earnings/performance, or needs help. + const response = await anthropicHttpClient.execute(async () => { + return anthropic.messages.create({ + model: 'claude-3-haiku-20240307', + max_tokens: 150, + system: `You are an intent parser for a financial bot. Determine if the user wants to deposit, withdraw, check balance, view earnings/performance, or needs help. Return ONLY a JSON object representing the intent, matching this TypeScript interface exactly without any wrapper text or markdown: { "action": "deposit" | "withdraw" | "balance" | "earnings" | "help" | "unknown", @@ -68,8 +80,9 @@ Return ONLY a JSON object representing the intent, matching this TypeScript inte "currency": string, // optional "all": boolean // for "withdraw everything" }`, - messages: [{ role: 'user', content: message }], - }) + messages: [{ role: 'user', content: message }], + }) + }, 'anthropic.parseIntent') const contentBlock = response.content.find((c) => c.type === 'text') if (contentBlock && contentBlock.type === 'text') { diff --git a/src/stellar/client.ts b/src/stellar/client.ts index 0950f9f..7b1ecdd 100644 --- a/src/stellar/client.ts +++ b/src/stellar/client.ts @@ -4,10 +4,22 @@ import { Networks, Transaction, TransactionBuilder, + Account, } from '@stellar/stellar-sdk'; import { config } from '../config'; +import { HttpClientAdapter, TimeoutError } from '../utils/http-client'; +import { logger } from '../utils/logger'; import { TransactionResult } from './types'; +export const stellarHttpClient = new HttpClientAdapter({ + timeoutMs: config.httpClient.timeoutMs, + maxRetries: config.httpClient.maxRetries, + baseDelayMs: config.httpClient.baseDelayMs, + maxDelayMs: config.httpClient.maxDelayMs, + circuitBreakerThreshold: config.httpClient.circuitBreakerThreshold, + circuitBreakerResetMs: config.httpClient.circuitBreakerResetMs, +}) + export function resolveNetworkPassphrase(network: string | undefined): string { switch (network?.toLowerCase()) { case 'mainnet': @@ -29,9 +41,6 @@ const NETWORK_PASSPHRASE = resolveNetworkPassphrase(config.stellar.network); let agentKeypair: Keypair | null = null; let rpcServer: rpc.Server | null = null; -/** - * Initialize RPC server connection - */ export function getRpcServer(): rpc.Server { if (!rpcServer) { rpcServer = new rpc.Server(RPC_URL); @@ -39,16 +48,10 @@ export function getRpcServer(): rpc.Server { return rpcServer; } -/** - * Get network passphrase - */ export function getNetworkPassphrase(): string { return NETWORK_PASSPHRASE; } -/** - * Load agent keypair from environment - */ export function getAgentKeypair(): Keypair { if (!agentKeypair) { const secret = process.env.STELLAR_AGENT_SECRET_KEY; @@ -60,60 +63,81 @@ export function getAgentKeypair(): Keypair { return agentKeypair; } -/** - * Submit transaction to Stellar network - */ export async function submitTransaction(tx: Transaction): Promise { const server = getRpcServer(); - - try { - const response = await server.sendTransaction(tx); - - if (response.status === 'ERROR') { - throw new Error(`Transaction failed: ${response.errorResult?.toXDR('base64')}`); + + return stellarHttpClient.execute(async () => { + try { + const response = await server.sendTransaction(tx); + + if (response.status === 'ERROR') { + throw new Error(`Transaction failed: ${response.errorResult?.toXDR('base64')}`); + } + + return response.hash; + } catch (error) { + if (error instanceof TimeoutError) throw error + throw new Error(`Failed to submit transaction: ${error instanceof Error ? error.message : 'Unknown error'}`); } - - return response.hash; - } catch (error) { - throw new Error(`Failed to submit transaction: ${error instanceof Error ? error.message : 'Unknown error'}`); - } + }, 'stellar.submitTransaction') +} + +/** + * Simulate a transaction against the Stellar RPC with retry/timeout/circuit-breaker. + */ +export async function simulateTransaction(tx: Transaction): Promise { + const server = getRpcServer() + return stellarHttpClient.execute(() => server.simulateTransaction(tx), 'stellar.simulateTransaction') +} + +/** + * Prepare a transaction (add fee-bump etc.) with retry/timeout/circuit-breaker. + */ +export async function prepareTransaction(tx: Transaction): Promise { + const server = getRpcServer() + return stellarHttpClient.execute(() => server.prepareTransaction(tx), 'stellar.prepareTransaction') } /** - * Wait for transaction confirmation + * Get account details from the Stellar RPC with retry/timeout/circuit-breaker. */ +export async function getAccount(publicKey: string): Promise { + const server = getRpcServer() + return stellarHttpClient.execute(() => server.getAccount(publicKey), 'stellar.getAccount') +} + export async function waitForConfirmation( txHash: string, timeoutMs: number = 30000 ): Promise { const server = getRpcServer(); - const startTime = Date.now(); - - while (Date.now() - startTime < timeoutMs) { - try { - const response = await server.getTransaction(txHash); - - if (response.status === 'SUCCESS') { - return { - hash: txHash, - status: 'success', - ledger: response.ledger, - }; - } - - if (response.status === 'FAILED') { - return { - hash: txHash, - status: 'failed', - }; - } - - // Still pending, wait before polling again - await new Promise(resolve => setTimeout(resolve, 1000)); - } catch (error) { - throw new Error(`Error polling transaction: ${error instanceof Error ? error.message : 'Unknown error'}`); + const pollDeadline = Date.now() + timeoutMs; + + const poll = async (): Promise => { + const response = await server.getTransaction(txHash); + + if (response.status === 'SUCCESS') { + return { + hash: txHash, + status: 'success', + ledger: response.ledger, + }; + } + + if (response.status === 'FAILED') { + return { + hash: txHash, + status: 'failed', + }; } + + if (Date.now() >= pollDeadline) { + throw new Error(`Transaction confirmation timeout after ${timeoutMs}ms`); + } + + await new Promise(resolve => setTimeout(resolve, 1000)); + return poll() } - - throw new Error(`Transaction confirmation timeout after ${timeoutMs}ms`); + + return stellarHttpClient.execute(poll, 'stellar.waitForConfirmation') } diff --git a/src/stellar/contract.ts b/src/stellar/contract.ts index 1a2de82..34d43c2 100644 --- a/src/stellar/contract.ts +++ b/src/stellar/contract.ts @@ -9,7 +9,7 @@ import { scValToNative, nativeToScVal, } from '@stellar/stellar-sdk'; -import { getRpcServer, getNetworkPassphrase, getAgentKeypair, submitTransaction, waitForConfirmation } from './client'; +import { getRpcServer, getNetworkPassphrase, getAgentKeypair, submitTransaction, waitForConfirmation, simulateTransaction, prepareTransaction, getAccount } from './client'; import { getKeypairForUser } from './wallet'; import { config } from '../config'; import { OnChainBalance, TransactionResult } from './types'; @@ -39,7 +39,7 @@ async function buildContractCall( ): Promise { const server = getRpcServer(); const contract = getVaultContract(); - const account = await server.getAccount(sourcePublicKey); + const account = await getAccount(sourcePublicKey); const tx = new TransactionBuilder(account, { fee: BASE_FEE, @@ -64,11 +64,10 @@ async function executeWriteContractCall( args: xdr.ScVal[], signer: Keypair, ): Promise { - const server = getRpcServer(); const tx = await buildContractCall(method, args, signer.publicKey()); // Pre-Transaction Simulation & Validation (Issue #58) - const simulation = await server.simulateTransaction(tx); + const simulation = await simulateTransaction(tx); if (rpc.Api.isSimulationError(simulation)) { throw new Error(`Transaction simulation failed for ${method}: ${simulation.error}`); } @@ -76,7 +75,7 @@ async function executeWriteContractCall( throw new Error(`Transaction simulation failed for ${method}: No result returned from simulation`); } - const prepared = await server.prepareTransaction(tx); + const prepared = await prepareTransaction(tx); prepared.sign(signer); const txHash = await submitTransaction(prepared); @@ -116,10 +115,9 @@ async function executeCustodialVaultOperation( * Simulate and parse contract read call */ async function simulateRead(method: string, args: xdr.ScVal[] = []): Promise { - const server = getRpcServer(); const tx = await buildContractCall(method, args); - const simulation = await server.simulateTransaction(tx); + const simulation = await simulateTransaction(tx); if (rpc.Api.isSimulationError(simulation)) { throw new Error(`Simulation failed: ${simulation.error}`); @@ -237,7 +235,6 @@ export async function buildUnsignedVaultTransaction( amount: number, assetSymbol: string, ): Promise { - const server = getRpcServer(); const userScVal = nativeToScVal(userAddress, { type: 'address' }); const amountScVal = nativeToScVal(toContractAmount(amount), { type: 'i128' }); const assetScVal = nativeToScVal(assetSymbol, { type: 'string' }); @@ -245,7 +242,7 @@ export async function buildUnsignedVaultTransaction( const tx = await buildContractCall(method, [userScVal, amountScVal, assetScVal], userAddress); // Pre-Transaction Simulation & Validation (Issue #58) - const simulation = await server.simulateTransaction(tx); + const simulation = await simulateTransaction(tx); if (rpc.Api.isSimulationError(simulation)) { throw new Error(`Transaction simulation failed for ${method}: ${simulation.error}`); } @@ -253,7 +250,7 @@ export async function buildUnsignedVaultTransaction( throw new Error(`Transaction simulation failed for ${method}: No result returned from simulation`); } - const prepared = await server.prepareTransaction(tx); + const prepared = await prepareTransaction(tx); return prepared.toXDR(); } diff --git a/src/utils/http-client.ts b/src/utils/http-client.ts new file mode 100644 index 0000000..91339a5 --- /dev/null +++ b/src/utils/http-client.ts @@ -0,0 +1,158 @@ +import { logger } from './logger' + +export interface HttpClientConfig { + timeoutMs: number + maxRetries: number + baseDelayMs: number + maxDelayMs: number + circuitBreakerThreshold: number + circuitBreakerResetMs: number +} + +export type CircuitState = 'closed' | 'open' | 'half-open' + +export const DEFAULT_HTTP_CLIENT_CONFIG: HttpClientConfig = { + timeoutMs: 10_000, + maxRetries: 3, + baseDelayMs: 200, + maxDelayMs: 10_000, + circuitBreakerThreshold: 5, + circuitBreakerResetMs: 30_000, +} + +export class CircuitBreakerError extends Error { + constructor(context?: string) { + const msg = context + ? `Circuit breaker is OPEN for "${context}". Request blocked.` + : 'Circuit breaker is OPEN. Request blocked.' + super(msg) + this.name = 'CircuitBreakerError' + } +} + +export class TimeoutError extends Error { + constructor(timeoutMs: number, context?: string) { + const msg = context + ? `Request "${context}" timed out after ${timeoutMs}ms` + : `Request timed out after ${timeoutMs}ms` + super(msg) + this.name = 'TimeoutError' + } +} + +export class HttpClientAdapter { + private config: HttpClientConfig + private state: CircuitState = 'closed' + private failures: number = 0 + private lastFailureTime: number = 0 + + constructor(config: Partial = {}) { + this.config = { ...DEFAULT_HTTP_CLIENT_CONFIG, ...config } + } + + getConfig(): Readonly { + return this.config + } + + getState(): { state: CircuitState; failures: number } { + return { state: this.state, failures: this.failures } + } + + reset(): void { + this.state = 'closed' + this.failures = 0 + this.lastFailureTime = 0 + } + + async execute(fn: () => Promise, context?: string): Promise { + this.checkCircuitBreaker(context) + + let lastError: Error | undefined + + for (let attempt = 0; attempt <= this.config.maxRetries; attempt++) { + try { + if (attempt > 0) { + logger.debug( + `[HttpClientAdapter] Retry attempt ${attempt}/${this.config.maxRetries}${context ? ` for "${context}"` : ''}` + ) + } + const result = await this.executeWithTimeout(fn, context) + this.onSuccess() + return result + } catch (error) { + lastError = error instanceof Error ? error : new Error(String(error)) + this.onFailure(lastError) + + if (this.state === 'open') { + break + } + + if (attempt < this.config.maxRetries) { + await this.delay(attempt) + } + } + } + + throw lastError! + } + + private async executeWithTimeout(fn: () => Promise, context?: string): Promise { + const { timeoutMs } = this.config + + return new Promise((resolve, reject) => { + const timer = setTimeout(() => { + reject(new TimeoutError(timeoutMs, context)) + }, timeoutMs) + + fn() + .then((result) => { + clearTimeout(timer) + resolve(result) + }) + .catch((error) => { + clearTimeout(timer) + reject(error) + }) + }) + } + + private checkCircuitBreaker(context?: string): void { + if (this.state === 'open') { + if (Date.now() - this.lastFailureTime >= this.config.circuitBreakerResetMs) { + this.state = 'half-open' + logger.debug('[HttpClientAdapter] Circuit breaker transitioning to half-open') + } else { + throw new CircuitBreakerError(context) + } + } + } + + private onSuccess(): void { + if (this.state === 'half-open') { + logger.debug('[HttpClientAdapter] Circuit breaker closing after successful half-open request') + } + this.state = 'closed' + this.failures = Math.max(0, this.failures - 1) + } + + private onFailure(error: Error): void { + this.failures++ + this.lastFailureTime = Date.now() + + logger.debug(`[HttpClientAdapter] Failure #${this.failures}/${this.config.circuitBreakerThreshold}: ${error.message}`) + + if (this.failures >= this.config.circuitBreakerThreshold) { + this.state = 'open' + logger.warn(`[HttpClientAdapter] Circuit breaker OPEN after ${this.failures} failures`) + } + } + + private async delay(attempt: number): Promise { + const delayMs = Math.min( + this.config.baseDelayMs * Math.pow(2, attempt), + this.config.maxDelayMs, + ) + const jitter = delayMs * (0.5 + Math.random() * 0.5) + return new Promise((resolve) => setTimeout(resolve, jitter)) + } +} diff --git a/src/utils/twilio-client.ts b/src/utils/twilio-client.ts new file mode 100644 index 0000000..c8dc32f --- /dev/null +++ b/src/utils/twilio-client.ts @@ -0,0 +1,53 @@ +import twilio from 'twilio' +import { config } from '../config' +import { HttpClientAdapter } from './http-client' +import { logger } from './logger' + +const httpClient = new HttpClientAdapter({ + timeoutMs: config.httpClient.timeoutMs, + maxRetries: config.httpClient.maxRetries, + baseDelayMs: config.httpClient.baseDelayMs, + maxDelayMs: config.httpClient.maxDelayMs, + circuitBreakerThreshold: config.httpClient.circuitBreakerThreshold, + circuitBreakerResetMs: config.httpClient.circuitBreakerResetMs, +}) + +let twilioClient: ReturnType | null = null + +function getClient(): ReturnType { + if (!twilioClient) { + const sid = config.whatsapp.twilioSid + const token = config.whatsapp.twilioToken + if (!sid || !token) { + throw new Error('Twilio credentials not configured (TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN)') + } + twilioClient = twilio(sid, token) + } + return twilioClient +} + +export interface SendMessageParams { + to: string + body: string +} + +export async function sendWhatsAppMessage(params: SendMessageParams): Promise { + return httpClient.execute(async () => { + const client = getClient() + const message = await client.messages.create({ + from: config.whatsapp.fromNumber, + to: params.to, + body: params.body, + }) + logger.info(`[Twilio] WhatsApp message sent to ${params.to}: sid=${message.sid}`) + return message.sid + }, 'twilio.sendWhatsAppMessage') +} + +export function resetTwilioClient(): void { + twilioClient = null +} + +export function getTwilioHttpClient(): HttpClientAdapter { + return httpClient +} diff --git a/tests/integration/http-client.integration.test.ts b/tests/integration/http-client.integration.test.ts new file mode 100644 index 0000000..3c3127f --- /dev/null +++ b/tests/integration/http-client.integration.test.ts @@ -0,0 +1,271 @@ +import { + HttpClientAdapter, + CircuitBreakerError, + TimeoutError, +} from '../../src/utils/http-client' + +describe('HttpClientAdapter Integration — simulated failures', () => { + let adapter: HttpClientAdapter + + beforeEach(() => { + adapter = new HttpClientAdapter({ + timeoutMs: 500, + maxRetries: 2, + baseDelayMs: 10, + maxDelayMs: 100, + circuitBreakerThreshold: 3, + circuitBreakerResetMs: 200, + }) + }) + + describe('transient failures — retry recovers', () => { + it('should succeed after intermittent HTTP 5xx errors', async () => { + let callCount = 0 + + const simulateFlakyApi = async (): Promise => { + callCount++ + if (callCount <= 2) { + throw new Error('HTTP 503 Service Unavailable') + } + return 'OK' + } + + const result = await adapter.execute(simulateFlakyApi, 'flakyApi.getData') + expect(result).toBe('OK') + expect(callCount).toBe(3) + }) + + it('should succeed after intermittent network timeouts', async () => { + let callCount = 0 + + const simulateTimeoutThenSuccess = async (): Promise => { + callCount++ + if (callCount <= 1) { + await new Promise(r => setTimeout(r, 600)) + throw new TimeoutError(500, 'simulated') + } + return 'data' + } + + const result = await adapter.execute(simulateTimeoutThenSuccess, 'timeoutApi.fetch') + expect(result).toBe('data') + expect(callCount).toBe(2) + }) + }) + + describe('persistent failures — circuit breaker opens', () => { + it('should open circuit after consecutive failures', async () => { + const simulateDownstream = jest.fn().mockRejectedValue(new Error('HTTP 502 Bad Gateway')) + + // First execute exhausts all retries (1 initial + 2 retries = 3 failures) + // After 3 failures circuit breaker opens + await expect( + adapter.execute(simulateDownstream, 'downstreamApi.call') + ).rejects.toThrow('HTTP 502 Bad Gateway') + + // Circuit is now OPEN — subsequent calls throw CircuitBreakerError + await expect( + adapter.execute(simulateDownstream, 'downstreamApi.call') + ).rejects.toThrow(CircuitBreakerError) + + await expect( + adapter.execute(simulateDownstream, 'downstreamApi.call') + ).rejects.toThrow(CircuitBreakerError) + + // Only the first execute made actual attempts (3 = initial + 2 retries) + expect(simulateDownstream).toHaveBeenCalledTimes(3) + }) + + it('should block requests with CircuitBreakerError after threshold', async () => { + const simulateDownstream = jest.fn().mockRejectedValue(new Error('Service Down')) + + // Exhaust all retries for first execute (should consume all 3 failure slots) + await expect( + adapter.execute(simulateDownstream, 'downstreamApi.call') + ).rejects.toThrow('Service Down') + + // Circuit is now OPEN or at least last failure count high enough + // that subsequent calls should be blocked + await expect( + adapter.execute(simulateDownstream, 'downstreamApi.call') + ).rejects.toThrow(CircuitBreakerError) + + // Still open + await expect( + adapter.execute(simulateDownstream, 'downstreamApi.call') + ).rejects.toThrow(CircuitBreakerError) + }) + + it('should recover after circuit breaker reset timeout', async () => { + const mock = jest.fn() + + // Trip the circuit breaker + mock.mockRejectedValue(new Error('fail')) + await expect( + adapter.execute(mock, 'recoverableApi.call') + ).rejects.toThrow() + + // Circuit is OPEN + await expect( + adapter.execute(mock, 'recoverableApi.call') + ).rejects.toThrow(CircuitBreakerError) + + // Wait for reset + jest.useFakeTimers() + jest.advanceTimersByTime(300) + + // Service recovers — should succeed in half-open state + mock.mockResolvedValue('recovered') + const result = await adapter.execute(mock, 'recoverableApi.call') + expect(result).toBe('recovered') + + jest.useRealTimers() + }) + }) + + describe('timeout behavior', () => { + it('should timeout slow responses and fall back after retry', async () => { + const fastTimeoutAdapter = new HttpClientAdapter({ + timeoutMs: 30, + maxRetries: 1, + baseDelayMs: 10, + maxDelayMs: 50, + circuitBreakerThreshold: 10, + circuitBreakerResetMs: 1000, + }) + + let callCount = 0 + + const simulateSlowThenFast = async (): Promise => { + callCount++ + if (callCount <= 1) { + await new Promise(r => setTimeout(r, 100)) + throw new TimeoutError(30, 'simulated') + } + return 'fast response' + } + + const result = await fastTimeoutAdapter.execute(simulateSlowThenFast, 'slowApi.get') + expect(result).toBe('fast response') + expect(callCount).toBe(2) + }) + }) + + describe('mixed failure modes', () => { + it('should handle timeout then HTTP error then success via retries', async () => { + const mixedAdapter = new HttpClientAdapter({ + timeoutMs: 50, + maxRetries: 3, + baseDelayMs: 10, + maxDelayMs: 50, + circuitBreakerThreshold: 10, + circuitBreakerResetMs: 1000, + }) + + let callCount = 0 + + const simulateChaoticApi = async (): Promise => { + callCount++ + switch (callCount) { + case 1: + await new Promise(r => setTimeout(r, 100)) + throw new TimeoutError(50, 'simulated timeout') + case 2: + throw new Error('HTTP 500 Internal Server Error') + case 3: + return 'success after chaos' + default: + throw new Error('unexpected call') + } + } + + const result = await mixedAdapter.execute(simulateChaoticApi, 'chaoticApi.fetch') + expect(result).toBe('success after chaos') + expect(callCount).toBe(3) + }) + }) + + describe('Stellar client scenario', () => { + it('should retry Stellar RPC failures and circuit-break after threshold', async () => { + const stellarAdapter = new HttpClientAdapter({ + timeoutMs: 100, + maxRetries: 2, + baseDelayMs: 10, + maxDelayMs: 50, + circuitBreakerThreshold: 3, + circuitBreakerResetMs: 500, + }) + + const simulateStellarRpc = jest.fn() + + // Simulate persistent submission failure + simulateStellarRpc.mockRejectedValue(new Error('stellar rpc: timeout')) + await expect( + stellarAdapter.execute(simulateStellarRpc, 'stellar.submitTransaction') + ).rejects.toThrow('stellar rpc: timeout') + + // Still failing + await expect( + stellarAdapter.execute(simulateStellarRpc, 'stellar.submitTransaction') + ).rejects.toThrow(CircuitBreakerError) + + // After reset, service recovers + jest.useFakeTimers() + jest.advanceTimersByTime(600) + + simulateStellarRpc.mockResolvedValue('tx_hash_abc') + const hash = await stellarAdapter.execute(simulateStellarRpc, 'stellar.submitTransaction') + expect(hash).toBe('tx_hash_abc') + + jest.useRealTimers() + }) + }) + + describe('Anthropic client scenario', () => { + it('should retry Anthropic API failures and fall back gracefully', async () => { + const anthropicAdapter = new HttpClientAdapter({ + timeoutMs: 100, + maxRetries: 1, + baseDelayMs: 10, + maxDelayMs: 50, + circuitBreakerThreshold: 5, + circuitBreakerResetMs: 1000, + }) + + const simulateAnthropicApi = jest.fn() + + // Transient failure then success + simulateAnthropicApi + .mockRejectedValueOnce(new Error('anthropic: rate limited')) + .mockResolvedValueOnce({ content: [{ type: 'text', text: '{"action":"balance"}' }] }) + + const result = await anthropicAdapter.execute(simulateAnthropicApi, 'anthropic.parseIntent') + expect(result).toEqual({ content: [{ type: 'text', text: '{"action":"balance"}' }] }) + expect(simulateAnthropicApi).toHaveBeenCalledTimes(2) + }) + }) + + describe('Twilio client scenario', () => { + it('should retry Twilio API failures', async () => { + const twilioAdapter = new HttpClientAdapter({ + timeoutMs: 100, + maxRetries: 1, + baseDelayMs: 10, + maxDelayMs: 50, + circuitBreakerThreshold: 5, + circuitBreakerResetMs: 1000, + }) + + const simulateTwilioApi = jest.fn() + + // Transient failure + simulateTwilioApi + .mockRejectedValueOnce(new Error('twilio: upstream timeout')) + .mockResolvedValueOnce({ sid: 'SM12345' }) + + const result = await twilioAdapter.execute(simulateTwilioApi, 'twilio.sendWhatsAppMessage') + expect(result).toEqual({ sid: 'SM12345' }) + expect(simulateTwilioApi).toHaveBeenCalledTimes(2) + }) + }) +}) diff --git a/tests/unit/utils/http-client.test.ts b/tests/unit/utils/http-client.test.ts new file mode 100644 index 0000000..e1097d9 --- /dev/null +++ b/tests/unit/utils/http-client.test.ts @@ -0,0 +1,218 @@ +import { + HttpClientAdapter, + CircuitBreakerError, + TimeoutError, +} from '../../../src/utils/http-client' + +describe('HttpClientAdapter', () => { + let adapter: HttpClientAdapter + + beforeEach(() => { + adapter = new HttpClientAdapter({ + timeoutMs: 1000, + maxRetries: 2, + baseDelayMs: 10, + maxDelayMs: 100, + circuitBreakerThreshold: 3, + circuitBreakerResetMs: 100, + }) + }) + + describe('execute', () => { + it('should return the result of a successful function', async () => { + const result = await adapter.execute(async () => 'ok') + expect(result).toBe('ok') + }) + + it('should throw on a failing function', async () => { + await expect( + adapter.execute(async () => { throw new Error('fail') }) + ).rejects.toThrow('fail') + }) + + it('should retry on failure and succeed on retry', async () => { + let attempts = 0 + const fn = jest.fn().mockImplementation(async () => { + attempts++ + if (attempts < 2) throw new Error('transient failure') + return 'recovered' + }) + + const result = await adapter.execute(fn) + expect(result).toBe('recovered') + expect(fn).toHaveBeenCalledTimes(2) + }) + + it('should respect maxRetries and throw after all retries exhausted', async () => { + const fn = jest.fn().mockRejectedValue(new Error('persistent failure')) + + await expect(adapter.execute(fn)).rejects.toThrow('persistent failure') + expect(fn).toHaveBeenCalledTimes(3) // 1 initial + 2 retries + }) + + it('should not retry if the circuit breaker opens mid-retry', async () => { + const lowThresholdAdapter = new HttpClientAdapter({ + timeoutMs: 1000, + maxRetries: 5, + baseDelayMs: 10, + maxDelayMs: 100, + circuitBreakerThreshold: 2, + circuitBreakerResetMs: 50000, + }) + + const fn = jest.fn().mockRejectedValue(new Error('fail')) + + await expect(lowThresholdAdapter.execute(fn)).rejects.toThrow('fail') + expect(fn).toHaveBeenCalledTimes(2) // 1 initial, 1 retry — then circuit opens + }) + }) + + describe('timeout', () => { + it('should throw TimeoutError if function exceeds timeout', async () => { + const slowAdapter = new HttpClientAdapter({ + timeoutMs: 50, + maxRetries: 0, + baseDelayMs: 10, + maxDelayMs: 100, + circuitBreakerThreshold: 10, + circuitBreakerResetMs: 1000, + }) + + await expect( + slowAdapter.execute(async () => { + await new Promise(r => setTimeout(r, 200)) + return 'too late' + }) + ).rejects.toThrow(TimeoutError) + }) + }) + + describe('circuit breaker', () => { + it('should block requests when circuit is open', async () => { + // Trigger circuit breaker by exceeding threshold + const fn = jest.fn().mockRejectedValue(new Error('fail')) + + for (let i = 0; i < 3; i++) { + await expect(adapter.execute(fn)).rejects.toThrow() + } + + // Next call should be blocked by circuit breaker + await expect(adapter.execute(fn)).rejects.toThrow(CircuitBreakerError) + }) + + it('should transition to half-open after reset timeout', async () => { + const fn = jest.fn().mockRejectedValue(new Error('fail')) + + // Trigger circuit breaker (3 failures needed) + for (let i = 0; i < 3; i++) { + await expect(adapter.execute(fn)).rejects.toThrow() + } + + // Circuit is OPEN - should block + await expect(adapter.execute(fn)).rejects.toThrow(CircuitBreakerError) + + // Advance past reset timeout + jest.useFakeTimers() + jest.advanceTimersByTime(200) + + // Should now be half-open and allow one request + // But it's still failing, so it should throw fail (not CircuitBreakerError) + await expect(adapter.execute(fn)).rejects.toThrow('fail') + + jest.useRealTimers() + }) + + it('should reset state after successful request in half-open state', async () => { + const fn = jest.fn() + + // Trip circuit breaker: 3 failures + fn.mockRejectedValue(new Error('fail')) + for (let i = 0; i < 3; i++) { + await expect(adapter.execute(fn)).rejects.toThrow() + } + + // Circuit is OPEN + await expect(adapter.execute(fn)).rejects.toThrow(CircuitBreakerError) + + // Advance past reset timeout + jest.useFakeTimers() + jest.advanceTimersByTime(200) + + // Now half-open — succeed + fn.mockResolvedValue('recovered') + const result = await adapter.execute(fn) + expect(result).toBe('recovered') + + // Circuit should now be CLOSED — next request goes through immediately + const result2 = await adapter.execute(async () => 'all good') + expect(result2).toBe('all good') + + jest.useRealTimers() + }) + }) + + describe('getState', () => { + it('should return closed state and 0 failures initially', () => { + const state = adapter.getState() + expect(state.state).toBe('closed') + expect(state.failures).toBe(0) + }) + + it('should track failure count', async () => { + const fn = jest.fn().mockRejectedValue(new Error('fail')) + await expect(adapter.execute(fn)).rejects.toThrow() + + const state = adapter.getState() + expect(state.failures).toBeGreaterThanOrEqual(1) + }) + }) + + describe('reset', () => { + it('should reset state back to closed', async () => { + const fn = jest.fn().mockRejectedValue(new Error('fail')) + + for (let i = 0; i < 3; i++) { + await expect(adapter.execute(fn)).rejects.toThrow() + } + + expect(adapter.getState().state).toBe('open') + + adapter.reset() + + expect(adapter.getState().state).toBe('closed') + expect(adapter.getState().failures).toBe(0) + }) + }) + + describe('context label', () => { + it('should include context in TimeoutError message', async () => { + const fastTimeoutAdapter = new HttpClientAdapter({ + timeoutMs: 10, + maxRetries: 0, + baseDelayMs: 10, + maxDelayMs: 100, + circuitBreakerThreshold: 10, + circuitBreakerResetMs: 1000, + }) + + await expect( + fastTimeoutAdapter.execute( + async () => { await new Promise(r => setTimeout(r, 100)); return 'x' }, + 'myService.myMethod', + ) + ).rejects.toThrow(/myService\.myMethod/) + }) + + it('should include context in CircuitBreakerError message', async () => { + const fn = jest.fn().mockRejectedValue(new Error('fail')) + + for (let i = 0; i < 3; i++) { + await expect(adapter.execute(fn)).rejects.toThrow() + } + + await expect( + adapter.execute(fn, 'myService.myMethod') + ).rejects.toThrow(/myService\.myMethod/) + }) + }) +})