diff --git a/apps/api/src/webhooks/__tests__/webhook-processor.service.spec.ts b/apps/api/src/webhooks/__tests__/webhook-processor.service.spec.ts index 0a0b9055..397fbc5d 100644 --- a/apps/api/src/webhooks/__tests__/webhook-processor.service.spec.ts +++ b/apps/api/src/webhooks/__tests__/webhook-processor.service.spec.ts @@ -133,6 +133,67 @@ describe('WebhookProcessorService', () => { }); }); + describe('Adaptive scheduling loop', () => { + let processRetriesSpy: jest.SpyInstance; + + beforeEach(() => { + jest.useFakeTimers(); + processRetriesSpy = jest.spyOn(service, 'processRetries'); + }); + + afterEach(() => { + jest.clearAllTimers(); + jest.useRealTimers(); + }); + + it('schedules at FAST interval when processRetries returns true', async () => { + processRetriesSpy.mockResolvedValue(true); + (service as any).startRetryProcessor(); + + await jest.advanceTimersByTimeAsync(0); + + await jest.advanceTimersByTimeAsync(1999); + expect(processRetriesSpy).toHaveBeenCalledTimes(1); + await jest.advanceTimersByTimeAsync(1); + expect(processRetriesSpy).toHaveBeenCalledTimes(2); + }); + + it('schedules at BASE interval when processRetries returns false', async () => { + processRetriesSpy.mockResolvedValue(false); + (service as any).startRetryProcessor(); + + await jest.advanceTimersByTimeAsync(0); + + await jest.advanceTimersByTimeAsync(9999); + expect(processRetriesSpy).toHaveBeenCalledTimes(1); + await jest.advanceTimersByTimeAsync(1); + expect(processRetriesSpy).toHaveBeenCalledTimes(2); + }); + + it('does not reschedule when isShuttingDown is true', async () => { + processRetriesSpy.mockResolvedValue(true); + (service as any).isShuttingDown = true; + (service as any).startRetryProcessor(); + + await jest.advanceTimersByTimeAsync(0); + + expect(jest.getTimerCount()).toBe(0); + expect(processRetriesSpy).toHaveBeenCalledTimes(1); + }); + + it('falls back to BASE interval when processRetries throws', async () => { + processRetriesSpy.mockRejectedValue(new Error('storage error')); + (service as any).startRetryProcessor(); + + await jest.advanceTimersByTimeAsync(0); + + await jest.advanceTimersByTimeAsync(9999); + expect(processRetriesSpy).toHaveBeenCalledTimes(1); + await jest.advanceTimersByTimeAsync(1); + expect(processRetriesSpy).toHaveBeenCalledTimes(2); + }); + }); + describe('Exponential Backoff', () => { it('should calculate correct retry delays', () => { const retryPolicy = { diff --git a/apps/api/src/webhooks/webhook-processor.service.ts b/apps/api/src/webhooks/webhook-processor.service.ts index 31725717..3fbafb92 100644 --- a/apps/api/src/webhooks/webhook-processor.service.ts +++ b/apps/api/src/webhooks/webhook-processor.service.ts @@ -10,12 +10,14 @@ export class WebhookProcessorService implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(WebhookProcessorService.name); private retryInterval: NodeJS.Timeout | null = null; - // Retry processing configuration - // 10 second interval: Balances responsiveness vs. database load - // - Fast enough for most retry delays (1s, 2s, 4s initial backoff) - // - Slow enough to avoid constant polling overhead - // - TODO: Consider adaptive polling (faster when retries pending) - private readonly RETRY_CHECK_INTERVAL_MS = 10_000; + // Adaptive polling intervals: + // - BASE_RETRY_CHECK_INTERVAL_MS: Used when no retries are pending. Keeps + // database load low during quiet periods. + // - FAST_RETRY_CHECK_INTERVAL_MS: Used immediately after a cycle that found + // retries, so subsequent retries (e.g. next backoff window) are picked up + // without waiting the full base interval. + private readonly BASE_RETRY_CHECK_INTERVAL_MS = 10_000; + private readonly FAST_RETRY_CHECK_INTERVAL_MS = 2_000; // Max 10 concurrent retries: Prevents overwhelming downstream webhooks // - Limits parallel HTTP requests to avoid socket exhaustion @@ -39,6 +41,7 @@ export class WebhookProcessorService implements OnModuleInit, OnModuleDestroy { private isProcessing = false; private activeRetries = 0; private shutdownPromise: Promise | null = null; + private isShuttingDown = false; constructor( @Inject('STORAGE_CLIENT') private readonly storageClient: StoragePort, @@ -53,6 +56,7 @@ export class WebhookProcessorService implements OnModuleInit, OnModuleDestroy { async onModuleDestroy() { this.logger.log('Stopping webhook processor service'); + this.isShuttingDown = true; this.stopRetryProcessor(); // Gracefully wait for active retries to complete @@ -80,19 +84,28 @@ export class WebhookProcessorService implements OnModuleInit, OnModuleDestroy { } /** - * Start the retry processor background job + * Start the retry processor background job with adaptive polling. + * Schedules itself faster when the previous cycle found pending retries. */ private startRetryProcessor(): void { - this.retryInterval = setInterval(() => { - this.processRetries().catch(error => { - this.logger.error('Error in retry processor:', error); - }); - }, this.RETRY_CHECK_INTERVAL_MS); + const schedule = (delayMs: number): void => { + this.retryInterval = setTimeout(() => { + this.processRetries() + .then((hadRetries) => { + if (!this.isShuttingDown) { + schedule(hadRetries ? this.FAST_RETRY_CHECK_INTERVAL_MS : this.BASE_RETRY_CHECK_INTERVAL_MS); + } + }) + .catch((error) => { + this.logger.error('Error in retry processor:', error); + if (!this.isShuttingDown) { + schedule(this.BASE_RETRY_CHECK_INTERVAL_MS); + } + }); + }, delayMs); + }; - // Run immediately on start - this.processRetries().catch(error => { - this.logger.error('Error in initial retry processor run:', error); - }); + schedule(0); } /** @@ -100,19 +113,20 @@ export class WebhookProcessorService implements OnModuleInit, OnModuleDestroy { */ private stopRetryProcessor(): void { if (this.retryInterval) { - clearInterval(this.retryInterval); + clearTimeout(this.retryInterval); this.retryInterval = null; } } /** - * Process pending retries + * Process pending retries. Returns true when at least one delivery was + * found so the caller can schedule the next poll at the faster interval. */ - async processRetries(): Promise { + async processRetries(): Promise { // Prevent concurrent processing if (this.isProcessing) { this.logger.debug('Retry processing already in progress, skipping'); - return; + return false; } this.isProcessing = true; @@ -125,7 +139,7 @@ export class WebhookProcessorService implements OnModuleInit, OnModuleDestroy { if (retriableDeliveries.length === 0) { this.logger.debug('No deliveries ready for retry'); - return; + return false; } this.logger.log(`Processing ${retriableDeliveries.length} delivery retries`); @@ -135,8 +149,10 @@ export class WebhookProcessorService implements OnModuleInit, OnModuleDestroy { retriableDeliveries.map(delivery => this.retryDelivery(delivery)) ); + return true; } catch (error) { this.logger.error('Failed to process retries:', error); + return false; } finally { this.isProcessing = false; }