From e9e3de0b67c30ce1c974f7ecaf4d7c78ec7e6be9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Cabrero-Holgueras?= Date: Mon, 13 Apr 2026 15:02:46 +0200 Subject: [PATCH 1/2] fix(nilcc-api): treat duplicate payment inserts as idempotent no-ops MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit With desired_count=2 both replicas share a single block_cursors row and both poll the burn contract. When they race on the same event, one replica wins the transaction and the other fails the payments.tx_hash unique constraint. The QueryFailedError was bubbling up to the poller as a WARN plus a refusal to advance the cursor, causing every subsequent poll to rescan the same block range. Catch the unique-constraint violation inside PaymentService.processEvent via the existing isUniqueConstraint helper, log at INFO ("Payment already processed: "), and return null. The poller sees a successful no-op and advances the cursor normally; the row has already been committed by the winning replica. This is a defense-in-depth fix only — the follow-up commit eliminates the race entirely by serializing the poller on the cursor row. Keeping the catch guards against manual cursor rollbacks, restart replays, and any future caller of processEvent outside the poller. --- nilcc-api/src/payment/payment.service.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/nilcc-api/src/payment/payment.service.ts b/nilcc-api/src/payment/payment.service.ts index 10d7226..f177069 100644 --- a/nilcc-api/src/payment/payment.service.ts +++ b/nilcc-api/src/payment/payment.service.ts @@ -1,6 +1,7 @@ import type { QueryRunner, Repository } from "typeorm"; import { v4 as uuidv4 } from "uuid"; import type { AccountEntity } from "#/account/account.entity"; +import { isUniqueConstraint } from "#/common/errors"; import { microdollarsToUsd, nilToMicrodollars, @@ -116,6 +117,10 @@ export class PaymentService { return payment; } catch (e) { await queryRunner.rollbackTransaction(); + if (isUniqueConstraint(e)) { + bindings.log.info(`Payment already processed: ${event.txHash}`); + return null; + } throw e; } finally { await queryRunner.release(); From c8e5b00df1c59dd221be231c9bf947be136a847a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Cabrero-Holgueras?= Date: Mon, 13 Apr 2026 15:46:34 +0200 Subject: [PATCH 2/2] fix(nilcc-api): serialize multi-replica payment poller on cursor row MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit With desired_count=2 both replicas were polling the burn contract independently, racing on the shared block_cursors row. That produced duplicate getLogs calls, duplicate processEvent invocations, and non-monotonic cursor writes that caused repeated block rescans. Wrap the entire doPoll body in a single transaction that begins with SELECT last_processed_block FROM block_cursors WHERE id = $1 FOR UPDATE. The row lock serializes replicas inside Postgres: the waiter blocks until the holder commits, then reads the already-advanced cursor and polls only the new range — no wasted RPC, no duplicate processing, and the read-modify-write of the cursor is atomic so it can no longer regress. PaymentPoller.start() seeds the cursor row via INSERT ... ON CONFLICT DO NOTHING so FOR UPDATE always has a row to lock. Also harden the startup path in main.ts: move paymentPoller.start() after the SIGTERM/SIGINT handlers are installed and trigger shutdown if start() rejects, so a poller init failure cannot leave the process up with signal handlers pointing at a half-constructed poller. Add a reentrancy guard to shutdown() so concurrent signals don't double-close the servers. Verified with two nilcc-api replicas in docker-compose against shared Postgres and anvil: ranges alternate cleanly between replicas with no overlap, cursor advances monotonically, and burn events are processed exactly once. --- nilcc-api/src/main.ts | 12 +- nilcc-api/src/payment/payment-poller.ts | 179 +++++++++++++----------- 2 files changed, 106 insertions(+), 85 deletions(-) diff --git a/nilcc-api/src/main.ts b/nilcc-api/src/main.ts index 7900840..6792e3e 100644 --- a/nilcc-api/src/main.ts +++ b/nilcc-api/src/main.ts @@ -32,7 +32,6 @@ async function main() { const { app, metrics } = await buildApp(bindings); const paymentPoller = new PaymentPoller(bindings, bindings.services.payment); - paymentPoller.start(); bindings.log.info("Starting servers ..."); const appServer = serve( @@ -55,7 +54,13 @@ async function main() { }, ); + let shuttingDown = false; const shutdown = async (): Promise => { + if (shuttingDown) { + return; + } + shuttingDown = true; + bindings.log.info( "Received shutdown signal. Starting graceful shutdown...", ); @@ -80,6 +85,11 @@ async function main() { process.on("SIGTERM", shutdown); process.on("SIGINT", shutdown); + + void paymentPoller.start().catch(async (error) => { + bindings.log.error(error, "Failed to start payment poller"); + await shutdown(); + }); } main().catch((error) => { diff --git a/nilcc-api/src/payment/payment-poller.ts b/nilcc-api/src/payment/payment-poller.ts index f766e6d..ddf19c6 100644 --- a/nilcc-api/src/payment/payment-poller.ts +++ b/nilcc-api/src/payment/payment-poller.ts @@ -1,7 +1,6 @@ import type { Logger } from "pino"; import { createPublicClient, http } from "viem"; import type { AppBindings } from "#/env"; -import { BlockCursorEntity } from "./block-cursor.entity"; import { burnWithDigestEventAbi } from "./burn-contract"; import type { PaymentService } from "./payment.service"; @@ -19,7 +18,7 @@ export class PaymentPoller { this.log = bindings.log.child({ component: "payment-poller" }); } - start(): void { + async start(): Promise { const { rpcUrl, burnContractAddress } = this.bindings.config; if (!rpcUrl || !burnContractAddress) { this.log.info( @@ -28,10 +27,20 @@ export class PaymentPoller { return; } + // Seed the cursor row so SELECT ... FOR UPDATE always finds a row to lock. + const seedBlock = ( + BigInt(this.bindings.config.paymentStartBlock) - 1n + ).toString(); + await this.bindings.dataSource.query( + `INSERT INTO block_cursors (id, last_processed_block, updated_at) + VALUES ($1, $2, NOW()) + ON CONFLICT (id) DO NOTHING`, + [CURSOR_ID, seedBlock], + ); + const intervalMs = this.bindings.config.paymentPollerIntervalMs; this.log.info(`Starting payment poller with interval ${intervalMs}ms`); - // Run immediately, then on interval this.poll(); this.intervalId = setInterval(() => this.poll(), intervalMs); } @@ -74,101 +83,103 @@ export class PaymentPoller { transport: http(rpcUrl), }); - // Get block cursor - const cursorRepo = - this.bindings.dataSource.getRepository(BlockCursorEntity); - const cursor = await cursorRepo.findOneBy({ id: CURSOR_ID }); - const fromBlock = cursor - ? BigInt(cursor.lastProcessedBlock) + 1n - : BigInt(this.bindings.config.paymentStartBlock); - - // Get current block - const currentBlock = await client.getBlockNumber(); - if (fromBlock > currentBlock) { - this.log.debug( - `No new blocks to process (cursor: ${fromBlock}, current: ${currentBlock})`, + const queryRunner = this.bindings.dataSource.createQueryRunner(); + await queryRunner.connect(); + await queryRunner.startTransaction(); + try { + // Single-writer gate across replicas: blocks until any concurrent poll commits, + // then reads the already-advanced cursor and processes only the new range. + const rows: Array<{ last_processed_block: string }> = + await queryRunner.query( + `SELECT last_processed_block FROM block_cursors + WHERE id = $1 FOR UPDATE`, + [CURSOR_ID], + ); + const fromBlock = BigInt(rows[0].last_processed_block) + 1n; + + const currentBlock = await client.getBlockNumber(); + if (fromBlock > currentBlock) { + this.log.debug( + `No new blocks to process (cursor: ${fromBlock}, current: ${currentBlock})`, + ); + await queryRunner.commitTransaction(); + return; + } + + const toBlock = + currentBlock - fromBlock > BigInt(paymentPollerMaxBlockRange) + ? fromBlock + BigInt(paymentPollerMaxBlockRange) - 1n + : currentBlock; + + this.log.info( + `Polling blocks ${fromBlock} to ${toBlock} for LogBurnWithDigest events`, ); - return; - } - // Clamp range - const toBlock = - currentBlock - fromBlock > BigInt(paymentPollerMaxBlockRange) - ? fromBlock + BigInt(paymentPollerMaxBlockRange) - 1n - : currentBlock; + const logs = await client.getLogs({ + address: burnContractAddress as `0x${string}`, + event: burnWithDigestEventAbi[0], + fromBlock, + toBlock, + }); - this.log.info( - `Polling blocks ${fromBlock} to ${toBlock} for LogBurnWithDigest events`, - ); + this.log.info(`Found ${logs.length} LogBurnWithDigest events`); - // Fetch logs - const logs = await client.getLogs({ - address: burnContractAddress as `0x${string}`, - event: burnWithDigestEventAbi[0], - fromBlock, - toBlock, - }); + let firstFailedBlock: bigint | null = null; + for (const log of logs) { + if (!log.transactionHash || !log.args.account || !log.args.amount) { + this.log.warn("Skipping malformed log entry"); + continue; + } - this.log.info(`Found ${logs.length} LogBurnWithDigest events`); + try { + await this.paymentService.processEvent(this.bindings, { + txHash: log.transactionHash, + logIndex: log.logIndex ?? 0, + blockNumber: Number(log.blockNumber), + fromAddress: log.args.account, + amount: log.args.amount, + digest: log.args.digest ?? "0x", + }); + } catch (e) { + if (firstFailedBlock === null) { + firstFailedBlock = log.blockNumber ?? fromBlock; + } + this.log.warn( + `Failed to process event from tx ${log.transactionHash}: ${e}`, + ); + } + } - // Process each log - let firstFailedBlock: bigint | null = null; - for (const log of logs) { - if (!log.transactionHash || !log.args.account || !log.args.amount) { - this.log.warn("Skipping malformed log entry"); - continue; + // If any event failed, only advance up to the block before the first failure + // so failed events are retried next tick. + let nextCursorBlock: bigint | null = toBlock; + if (firstFailedBlock !== null) { + nextCursorBlock = + firstFailedBlock <= fromBlock ? null : firstFailedBlock - 1n; } - try { - await this.paymentService.processEvent(this.bindings, { - txHash: log.transactionHash, - logIndex: log.logIndex ?? 0, - blockNumber: Number(log.blockNumber), - fromAddress: log.args.account, - amount: log.args.amount, - digest: log.args.digest ?? "0x", - }); - } catch (e) { - if (firstFailedBlock === null) { - firstFailedBlock = log.blockNumber ?? fromBlock; - } + if (nextCursorBlock === null) { this.log.warn( - `Failed to process event from tx ${log.transactionHash}: ${e}`, + `Not advancing block cursor due to processing failure at block ${firstFailedBlock?.toString()}`, ); + await queryRunner.commitTransaction(); + return; } - } - - // Update cursor. If any event failed, only advance up to the block before - // the first failure so failed events are retried in the next poll. - let nextCursorBlock: bigint | null = toBlock; - if (firstFailedBlock !== null) { - if (firstFailedBlock <= fromBlock) { - // Would only happen if firstFailedBlock == fromBlock, other states should be impossible - nextCursorBlock = null; - } else { - nextCursorBlock = firstFailedBlock - 1n; - } - } - if (nextCursorBlock === null) { - this.log.warn( - `Not advancing block cursor due to processing failure at block ${firstFailedBlock?.toString()}`, + await queryRunner.query( + `UPDATE block_cursors + SET last_processed_block = $1, updated_at = NOW() + WHERE id = $2`, + [nextCursorBlock.toString(), CURSOR_ID], ); - return; - } + await queryRunner.commitTransaction(); - if (cursor) { - cursor.lastProcessedBlock = nextCursorBlock.toString(); - cursor.updatedAt = new Date(); - await cursorRepo.save(cursor); - } else { - await cursorRepo.save({ - id: CURSOR_ID, - lastProcessedBlock: nextCursorBlock.toString(), - updatedAt: new Date(), - }); + this.log.debug(`Updated block cursor to ${nextCursorBlock}`); + } catch (e) { + await queryRunner.rollbackTransaction(); + throw e; + } finally { + await queryRunner.release(); } - - this.log.debug(`Updated block cursor to ${nextCursorBlock}`); } }