Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions docs/indexer/RETRY_BACKOFF.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Indexer Retry Policy & Backoff

This document describes how indexer jobs retry on failure, the backoff strategy
between attempts, and what happens when a job exhausts its retries. It exists so
contributors adding or modifying indexer jobs understand the expected behavior
and avoid silent job loss.

See also: [`FEATURE_FLAGS.md`](./FEATURE_FLAGS.md), [`DLQ_WORKFLOW.md`](./DLQ_WORKFLOW.md),
[`EVENT_PROCESSING.md`](./EVENT_PROCESSING.md).

## Backoff strategy

Retry delays are computed by `getBackoffWithJitter()` in
`src/utils/jitter.utils.ts`:

```
delay = applyJitter( min(maxDelayMs, baseDelayMs * 2^attempt), jitterFactor )
```

- **Exponential growth** — the base delay doubles with each attempt
(`baseDelayMs * 2^attempt`), so retries back off progressively.
- **Cap** — the delay is clamped to `maxDelayMs` so it never grows unbounded.
- **Jitter** — `applyJitter()` multiplies the delay by a random factor in
`[1 - jitterFactor, 1 + jitterFactor]` to avoid a thundering herd when many
jobs fail at once.

| Parameter | Default | Meaning |
| -------------- | --------- | -------------------------------------------------------- |
| `baseDelayMs` | `1000` | Delay before the first retry (`attempt = 0`). |
| `maxDelayMs` | `30000` | Upper bound on any single retry delay. |
| `jitterFactor` | env value | Random spread, from `INDEXER_JITTER_FACTOR` (see below). |

`attempt` is 0-indexed, so successive delays are roughly `1s, 2s, 4s, 8s, 16s,
30s, 30s, …` (before jitter).

## Configuration values

The retry/backoff behavior is controlled by these environment variables
(validated at boot by `runIndexerFeatureFlagsStartupCheck()`):

| Env var | Type | Default | Controls |
| -------------------------------------- | --------------- | -------- | ----------------------------------------------------------------------- |
| `INDEXER_JITTER_FACTOR` | number `[0, 1]` | `0.1` | Jitter spread applied to every backoff delay. |
| `ENABLE_INDEXER_DLQ` | boolean | `true` | Route retry-exhausted / terminal jobs to the dead-letter queue. |
| `ENABLE_INDEXER_DEDUPE` | boolean | `true` | Required when the DLQ is enabled; dedupe keys identify repeat failures. |
| `INDEXER_HEARTBEAT_STALE_THRESHOLD_MS` | number ms | `300000` | When the indexer is considered stalled (no progress). |
| `BACKGROUND_JOB_LOCK_TTL_MS` | integer ms | `300000` | Lock TTL that prevents two workers from retrying the same job at once. |

## Exhaustion behavior

When a job exhausts its retry attempts (or hits a terminal, non-retryable
error), it is moved to the **Dead-Letter Queue** via `moveToDLQ()` in
`src/utils/indexer-dlq.utils.ts`. The DLQ record (`indexerDLQ` table) captures:

- `jobType` — the kind of job that failed,
- `payload` — the original job payload, so it can be replayed,
- `retryCount` — how many attempts were made before giving up,
- `failureReason` / `errorDetails` — why it ultimately failed.

Consequences:

- **Jobs are parked, not lost.** A retry-exhausted job is preserved in the DLQ
for inspection and manual replay rather than disappearing.
- **DLQ depth is observable.** `getDLQDepth()` and `syncDLQMetrics()` expose the
current backlog to the metrics registry; a growing DLQ indicates a systemic
failure that needs attention.
- **DLQ disabled is risky.** If `ENABLE_INDEXER_DLQ=false`, retry-exhausted jobs
are dropped with only a log line — enable the DLQ in any environment where
silent job loss is unacceptable. (`ENABLE_INDEXER_DLQ=true` also requires
`ENABLE_INDEXER_DEDUPE=true`.)

## Guidance for new indexer jobs

- Use `getBackoffWithJitter(attempt, …)` for retry delays instead of a fixed
sleep, so behavior is consistent and jittered.
- Choose a sensible maximum attempt count for the job, then call `moveToDLQ()`
once it is reached — do not loop forever.
- Keep payloads in the DLQ replayable: store everything needed to re-run the job.
26 changes: 20 additions & 6 deletions src/middlewares/error.middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ import { z } from 'zod';
import { ErrorCode, ErrorCodeType } from '../constants/error.constants';
import { logger } from '../utils/logger.utils';
import { mapUnknownRouteError } from '../utils/route-error.utils';
import { buildErrorContext } from '../utils/error-context.utils';

export class ApiError extends Error {
statusCode: number;
isOperational: boolean;
errorCode?: ErrorCodeType;

constructor(
statusCode: number,
message: string,
Expand Down Expand Up @@ -55,10 +56,19 @@ export const errorHandler: ErrorRequestHandler = (
res: Response,
_next: NextFunction
): void => {
// Log error details
console.error('🚨 Error caught by global handler:');
console.error('URL:', req.method, req.originalUrl);
console.error('Error:', err);
// Log a consistent, structured error context (request id + normalized code
// together) so failures can be correlated with the response envelope. Stack
// traces are only attached in development builds.
logger.error(
{
...buildErrorContext(err, {
requestId: req.requestId,
includeStack: envConfig.MODE === 'development',
}),
route: `${req.method} ${req.originalUrl}`,
},
'Error caught by global handler'
);

// Handle Zod validation errors
if (err instanceof z.ZodError || err.name === 'ZodError') {
Expand Down Expand Up @@ -127,7 +137,11 @@ export const errorHandler: ErrorRequestHandler = (
}

// Handle oversized request payload (413)
if (err.type === 'entity.too.large' || err.status === 413 || err.statusCode === 413) {
if (
err.type === 'entity.too.large' ||
err.status === 413 ||
err.statusCode === 413
) {
logger.warn({
msg: 'Request payload too large',
route: `${req.method} ${req.originalUrl}`,
Expand Down
106 changes: 106 additions & 0 deletions src/modules/creators/creator-list-last-page.integration.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Integration test: creator list cursor advancing to a PARTIAL last page.
//
// The existing cursor round-trip test covers an even split (6 items / two full
// pages of 3). This test covers the end-of-list edge where the final page holds
// fewer than `limit` items: a known total of 5 paginated at limit=2 yields pages
// of [2, 2, 1]. It asserts the last page returns only the remaining item and
// that the response indicates there are no further pages (hasMore=false).
//
// Uses Jest mocks — no database required.

import { httpListCreators } from './creators.controllers';
import * as creatorsUtils from './creators.utils';
import type { CreatorProfile } from '../../types/profile.types';

function makeReq(query: Record<string, string> = {}): any {
return { query };
}

function makeRes(): any {
const res: any = {};
res.status = jest.fn().mockReturnValue(res);
res.json = jest.fn().mockReturnValue(res);
res.setHeader = jest.fn().mockReturnValue(res);
res.set = jest.fn().mockReturnValue(res);
return res;
}

function makeNext(): jest.Mock {
return jest.fn();
}

function makeFixture(index: number): CreatorProfile {
return {
id: `cuid-${index}`,
userId: `user-${index}`,
handle: `creator_${index}`,
displayName: `Creator ${index}`,
isVerified: false,
createdAt: new Date(`2024-0${index}-01T00:00:00.000Z`),
updatedAt: new Date(`2024-0${index}-01T00:00:00.000Z`),
};
}

// Known total of 5 → at limit=2 the pages are [2, 2, 1].
const TOTAL = 5;
const ALL_FIXTURES = [1, 2, 3, 4, 5].map(makeFixture);
const LIMIT = 2;
const LAST_PAGE_OFFSET = 4; // pages start at offsets 0, 2, 4
const LAST_PAGE_FIXTURES = ALL_FIXTURES.slice(LAST_PAGE_OFFSET); // 1 item

async function fetchPage(offset: number, pageItems: CreatorProfile[]) {
jest
.spyOn(creatorsUtils, 'fetchCreatorList')
.mockResolvedValue([pageItems, TOTAL]);

const res = makeRes();
await httpListCreators(
makeReq({ limit: String(LIMIT), offset: String(offset) }),
res,
makeNext()
);
jest.restoreAllMocks();
return res.json.mock.calls[0][0].data;
}

describe('creator list — cursor pointing to the last page', () => {
afterEach(() => {
jest.restoreAllMocks();
});

it('returns only the remaining item and signals no further pages', async () => {
const data = await fetchPage(LAST_PAGE_OFFSET, LAST_PAGE_FIXTURES);

// Only the remaining item (5th of 5) is returned, not a full page.
expect(data.items).toHaveLength(1);
expect(data.items[0].id).toBe('cuid-5');

// Meta reflects the end of the list.
expect(data.meta.offset).toBe(LAST_PAGE_OFFSET);
expect(data.meta.limit).toBe(LIMIT);
expect(data.meta.total).toBe(TOTAL);
expect(data.meta.hasMore).toBe(false);
});

it('advances through every page and ends with hasMore=false on a partial page', async () => {
const pages: CreatorProfile[][] = [
ALL_FIXTURES.slice(0, 2),
ALL_FIXTURES.slice(2, 4),
ALL_FIXTURES.slice(4, 5),
];

const collected: string[] = [];
let lastHasMore = true;

for (let i = 0; i < pages.length; i++) {
const data = await fetchPage(i * LIMIT, pages[i]);
collected.push(...data.items.map((item: { id: string }) => item.id));
lastHasMore = data.meta.hasMore;
}

// Traversal reconstructs the full fixture set exactly once...
expect(collected).toEqual(ALL_FIXTURES.map(f => f.id));
// ...and the final page is the end of the list.
expect(lastHasMore).toBe(false);
});
});
8 changes: 8 additions & 0 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
runIndexerFeatureFlagsStartupCheck,
} from './utils/indexer-flags-startup-check.utils';
import { checkOptionalDependencies } from './utils/startup.utils';
import { describeDatabasePoolConfig } from './utils/db-pool-config.utils';
import { stopOwnershipSnapshotCleanupJob } from './jobs/ownership-snapshot-cleanup.job';

async function startServer() {
Expand All @@ -33,6 +34,13 @@ async function startServer() {
await prisma.$connect();
logger.info('Connected to database');

// Surface connection-pool settings (no credentials) so connection
// exhaustion is diagnosable. Logged before the server accepts requests.
logger.info(
describeDatabasePoolConfig(),
'Database connection pool configured'
);

// Verify migrations on startup
await verifyMigrationChecksums();

Expand Down
48 changes: 48 additions & 0 deletions src/utils/db-pool-config.utils.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { describeDatabasePoolConfig } from './db-pool-config.utils';

describe('describeDatabasePoolConfig', () => {
it('parses pool params from the database URL', () => {
const url =
'postgresql://user:secret@db.example.com:5432/app?connection_limit=15&pool_timeout=20&connect_timeout=8';

const config = describeDatabasePoolConfig(url, 5000);

expect(config).toEqual({
poolSize: 15,
poolTimeoutSeconds: 20,
connectTimeoutSeconds: 8,
queryTimeoutMs: 5000,
});
});

it('reports defaults when pool params are absent', () => {
const config = describeDatabasePoolConfig(
'postgresql://user:secret@db.example.com:5432/app',
3000
);

expect(config).toEqual({
poolSize: 'default',
poolTimeoutSeconds: 'default',
connectTimeoutSeconds: 'default',
queryTimeoutMs: 3000,
});
});

it('never leaks credentials or host details', () => {
const url =
'postgresql://admin:topsecret@db.internal:5432/app?connection_limit=5';

const serialized = JSON.stringify(describeDatabasePoolConfig(url, 5000));

expect(serialized).not.toContain('topsecret');
expect(serialized).not.toContain('admin');
expect(serialized).not.toContain('db.internal');
});

it('degrades to defaults on an unparseable URL', () => {
const config = describeDatabasePoolConfig('not a url', 5000);
expect(config.poolSize).toBe('default');
expect(config.queryTimeoutMs).toBe(5000);
});
});
57 changes: 57 additions & 0 deletions src/utils/db-pool-config.utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { envConfig } from '../config';

/**
* Connection-pool settings safe to log at startup.
*
* Prisma reads pool settings from the `DATABASE_URL` query string
* (`connection_limit`, `pool_timeout`, `connect_timeout`). When a value is not
* present in the URL, Prisma applies its own default, reported here as
* `'default'`. No host, credentials, or other connection-string details are
* included so this object is safe to emit to logs.
*/
export interface DatabasePoolConfig {
/** Max connections in the pool (`connection_limit`); Prisma default ≈ num_cpus * 2 + 1. */
poolSize: number | 'default';
/** Seconds to wait for a free connection before timing out (`pool_timeout`). */
poolTimeoutSeconds: number | 'default';
/** Seconds to wait when opening a new connection (`connect_timeout`). */
connectTimeoutSeconds: number | 'default';
/** Per-query timeout enforced by the Prisma client extension. */
queryTimeoutMs: number;
}

function readNumericParam(
params: URLSearchParams | undefined,
key: string
): number | 'default' {
const raw = params?.get(key);
if (raw == null || raw === '') {
return 'default';
}
const value = Number(raw);
return Number.isFinite(value) ? value : 'default';
}

/**
* Extracts the loggable connection-pool configuration from the database URL.
* Parsing failures degrade gracefully to all-default values rather than
* throwing during startup.
*/
export function describeDatabasePoolConfig(
databaseUrl: string = envConfig.DATABASE_URL,
queryTimeoutMs: number = envConfig.DB_QUERY_TIMEOUT_MS
): DatabasePoolConfig {
let params: URLSearchParams | undefined;
try {
params = new URL(databaseUrl).searchParams;
} catch {
params = undefined;
}

return {
poolSize: readNumericParam(params, 'connection_limit'),
poolTimeoutSeconds: readNumericParam(params, 'pool_timeout'),
connectTimeoutSeconds: readNumericParam(params, 'connect_timeout'),
queryTimeoutMs,
};
}
Loading
Loading