Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions apps/api/src/webhooks/__tests__/webhook-processor.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,67 @@ describe('WebhookProcessorService', () => {
});
});

describe('Adaptive scheduling loop', () => {
let processRetriesSpy: jest.SpyInstance;

beforeEach(() => {
jest.useFakeTimers();
processRetriesSpy = jest.spyOn(service, 'processRetries');
});

afterEach(() => {
jest.clearAllTimers();
jest.useRealTimers();
});

it('schedules at FAST interval when processRetries returns true', async () => {
processRetriesSpy.mockResolvedValue(true);
(service as any).startRetryProcessor();

await jest.advanceTimersByTimeAsync(0);

await jest.advanceTimersByTimeAsync(1999);
expect(processRetriesSpy).toHaveBeenCalledTimes(1);
await jest.advanceTimersByTimeAsync(1);
expect(processRetriesSpy).toHaveBeenCalledTimes(2);
});

it('schedules at BASE interval when processRetries returns false', async () => {
processRetriesSpy.mockResolvedValue(false);
(service as any).startRetryProcessor();

await jest.advanceTimersByTimeAsync(0);

await jest.advanceTimersByTimeAsync(9999);
expect(processRetriesSpy).toHaveBeenCalledTimes(1);
await jest.advanceTimersByTimeAsync(1);
expect(processRetriesSpy).toHaveBeenCalledTimes(2);
});

it('does not reschedule when isShuttingDown is true', async () => {
processRetriesSpy.mockResolvedValue(true);
(service as any).isShuttingDown = true;
(service as any).startRetryProcessor();

await jest.advanceTimersByTimeAsync(0);

expect(jest.getTimerCount()).toBe(0);
expect(processRetriesSpy).toHaveBeenCalledTimes(1);
});

it('falls back to BASE interval when processRetries throws', async () => {
processRetriesSpy.mockRejectedValue(new Error('storage error'));
(service as any).startRetryProcessor();

await jest.advanceTimersByTimeAsync(0);

await jest.advanceTimersByTimeAsync(9999);
expect(processRetriesSpy).toHaveBeenCalledTimes(1);
await jest.advanceTimersByTimeAsync(1);
expect(processRetriesSpy).toHaveBeenCalledTimes(2);
});
});

describe('Exponential Backoff', () => {
it('should calculate correct retry delays', () => {
const retryPolicy = {
Expand Down
58 changes: 37 additions & 21 deletions apps/api/src/webhooks/webhook-processor.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ export class WebhookProcessorService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(WebhookProcessorService.name);
private retryInterval: NodeJS.Timeout | null = null;

// Retry processing configuration
// 10 second interval: Balances responsiveness vs. database load
// - Fast enough for most retry delays (1s, 2s, 4s initial backoff)
// - Slow enough to avoid constant polling overhead
// - TODO: Consider adaptive polling (faster when retries pending)
private readonly RETRY_CHECK_INTERVAL_MS = 10_000;
// Adaptive polling intervals:
// - BASE_RETRY_CHECK_INTERVAL_MS: Used when no retries are pending. Keeps
// database load low during quiet periods.
// - FAST_RETRY_CHECK_INTERVAL_MS: Used immediately after a cycle that found
// retries, so subsequent retries (e.g. next backoff window) are picked up
// without waiting the full base interval.
private readonly BASE_RETRY_CHECK_INTERVAL_MS = 10_000;
private readonly FAST_RETRY_CHECK_INTERVAL_MS = 2_000;

// Max 10 concurrent retries: Prevents overwhelming downstream webhooks
// - Limits parallel HTTP requests to avoid socket exhaustion
Expand All @@ -39,6 +41,7 @@ export class WebhookProcessorService implements OnModuleInit, OnModuleDestroy {
private isProcessing = false;
private activeRetries = 0;
private shutdownPromise: Promise<void> | null = null;
private isShuttingDown = false;

constructor(
@Inject('STORAGE_CLIENT') private readonly storageClient: StoragePort,
Expand All @@ -53,6 +56,7 @@ export class WebhookProcessorService implements OnModuleInit, OnModuleDestroy {

async onModuleDestroy() {
this.logger.log('Stopping webhook processor service');
this.isShuttingDown = true;
this.stopRetryProcessor();

// Gracefully wait for active retries to complete
Expand Down Expand Up @@ -80,39 +84,49 @@ export class WebhookProcessorService implements OnModuleInit, OnModuleDestroy {
}

/**
* Start the retry processor background job
* Start the retry processor background job with adaptive polling.
* Schedules itself faster when the previous cycle found pending retries.
*/
private startRetryProcessor(): void {
this.retryInterval = setInterval(() => {
this.processRetries().catch(error => {
this.logger.error('Error in retry processor:', error);
});
}, this.RETRY_CHECK_INTERVAL_MS);
const schedule = (delayMs: number): void => {
this.retryInterval = setTimeout(() => {
this.processRetries()
.then((hadRetries) => {
if (!this.isShuttingDown) {
schedule(hadRetries ? this.FAST_RETRY_CHECK_INTERVAL_MS : this.BASE_RETRY_CHECK_INTERVAL_MS);
}
})
.catch((error) => {
this.logger.error('Error in retry processor:', error);
if (!this.isShuttingDown) {
schedule(this.BASE_RETRY_CHECK_INTERVAL_MS);
}
});
}, delayMs);
};

// Run immediately on start
this.processRetries().catch(error => {
this.logger.error('Error in initial retry processor run:', error);
});
schedule(0);
}

/**
* Stop the retry processor
*/
private stopRetryProcessor(): void {
if (this.retryInterval) {
clearInterval(this.retryInterval);
clearTimeout(this.retryInterval);
this.retryInterval = null;
}
}

/**
* Process pending retries
* Process pending retries. Returns true when at least one delivery was
* found so the caller can schedule the next poll at the faster interval.
*/
async processRetries(): Promise<void> {
async processRetries(): Promise<boolean> {
// Prevent concurrent processing
if (this.isProcessing) {
this.logger.debug('Retry processing already in progress, skipping');
return;
return false;
}

this.isProcessing = true;
Expand All @@ -125,7 +139,7 @@ export class WebhookProcessorService implements OnModuleInit, OnModuleDestroy {

if (retriableDeliveries.length === 0) {
this.logger.debug('No deliveries ready for retry');
return;
return false;
}

this.logger.log(`Processing ${retriableDeliveries.length} delivery retries`);
Expand All @@ -135,8 +149,10 @@ export class WebhookProcessorService implements OnModuleInit, OnModuleDestroy {
retriableDeliveries.map(delivery => this.retryDelivery(delivery))
);

return true;
} catch (error) {
this.logger.error('Failed to process retries:', error);
return false;
} finally {
this.isProcessing = false;
}
Expand Down
Loading