diff --git a/nilcc-api/src/main.ts b/nilcc-api/src/main.ts index 7900840..6792e3e 100644 --- a/nilcc-api/src/main.ts +++ b/nilcc-api/src/main.ts @@ -32,7 +32,6 @@ async function main() { const { app, metrics } = await buildApp(bindings); const paymentPoller = new PaymentPoller(bindings, bindings.services.payment); - paymentPoller.start(); bindings.log.info("Starting servers ..."); const appServer = serve( @@ -55,7 +54,13 @@ async function main() { }, ); + let shuttingDown = false; const shutdown = async (): Promise => { + if (shuttingDown) { + return; + } + shuttingDown = true; + bindings.log.info( "Received shutdown signal. Starting graceful shutdown...", ); @@ -80,6 +85,11 @@ async function main() { process.on("SIGTERM", shutdown); process.on("SIGINT", shutdown); + + void paymentPoller.start().catch(async (error) => { + bindings.log.error(error, "Failed to start payment poller"); + await shutdown(); + }); } main().catch((error) => { diff --git a/nilcc-api/src/payment/payment-poller.ts b/nilcc-api/src/payment/payment-poller.ts index f766e6d..ddf19c6 100644 --- a/nilcc-api/src/payment/payment-poller.ts +++ b/nilcc-api/src/payment/payment-poller.ts @@ -1,7 +1,6 @@ import type { Logger } from "pino"; import { createPublicClient, http } from "viem"; import type { AppBindings } from "#/env"; -import { BlockCursorEntity } from "./block-cursor.entity"; import { burnWithDigestEventAbi } from "./burn-contract"; import type { PaymentService } from "./payment.service"; @@ -19,7 +18,7 @@ export class PaymentPoller { this.log = bindings.log.child({ component: "payment-poller" }); } - start(): void { + async start(): Promise { const { rpcUrl, burnContractAddress } = this.bindings.config; if (!rpcUrl || !burnContractAddress) { this.log.info( @@ -28,10 +27,20 @@ export class PaymentPoller { return; } + // Seed the cursor row so SELECT ... FOR UPDATE always finds a row to lock. + const seedBlock = ( + BigInt(this.bindings.config.paymentStartBlock) - 1n + ).toString(); + await this.bindings.dataSource.query( + `INSERT INTO block_cursors (id, last_processed_block, updated_at) + VALUES ($1, $2, NOW()) + ON CONFLICT (id) DO NOTHING`, + [CURSOR_ID, seedBlock], + ); + const intervalMs = this.bindings.config.paymentPollerIntervalMs; this.log.info(`Starting payment poller with interval ${intervalMs}ms`); - // Run immediately, then on interval this.poll(); this.intervalId = setInterval(() => this.poll(), intervalMs); } @@ -74,101 +83,103 @@ export class PaymentPoller { transport: http(rpcUrl), }); - // Get block cursor - const cursorRepo = - this.bindings.dataSource.getRepository(BlockCursorEntity); - const cursor = await cursorRepo.findOneBy({ id: CURSOR_ID }); - const fromBlock = cursor - ? BigInt(cursor.lastProcessedBlock) + 1n - : BigInt(this.bindings.config.paymentStartBlock); - - // Get current block - const currentBlock = await client.getBlockNumber(); - if (fromBlock > currentBlock) { - this.log.debug( - `No new blocks to process (cursor: ${fromBlock}, current: ${currentBlock})`, + const queryRunner = this.bindings.dataSource.createQueryRunner(); + await queryRunner.connect(); + await queryRunner.startTransaction(); + try { + // Single-writer gate across replicas: blocks until any concurrent poll commits, + // then reads the already-advanced cursor and processes only the new range. + const rows: Array<{ last_processed_block: string }> = + await queryRunner.query( + `SELECT last_processed_block FROM block_cursors + WHERE id = $1 FOR UPDATE`, + [CURSOR_ID], + ); + const fromBlock = BigInt(rows[0].last_processed_block) + 1n; + + const currentBlock = await client.getBlockNumber(); + if (fromBlock > currentBlock) { + this.log.debug( + `No new blocks to process (cursor: ${fromBlock}, current: ${currentBlock})`, + ); + await queryRunner.commitTransaction(); + return; + } + + const toBlock = + currentBlock - fromBlock > BigInt(paymentPollerMaxBlockRange) + ? fromBlock + BigInt(paymentPollerMaxBlockRange) - 1n + : currentBlock; + + this.log.info( + `Polling blocks ${fromBlock} to ${toBlock} for LogBurnWithDigest events`, ); - return; - } - // Clamp range - const toBlock = - currentBlock - fromBlock > BigInt(paymentPollerMaxBlockRange) - ? fromBlock + BigInt(paymentPollerMaxBlockRange) - 1n - : currentBlock; + const logs = await client.getLogs({ + address: burnContractAddress as `0x${string}`, + event: burnWithDigestEventAbi[0], + fromBlock, + toBlock, + }); - this.log.info( - `Polling blocks ${fromBlock} to ${toBlock} for LogBurnWithDigest events`, - ); + this.log.info(`Found ${logs.length} LogBurnWithDigest events`); - // Fetch logs - const logs = await client.getLogs({ - address: burnContractAddress as `0x${string}`, - event: burnWithDigestEventAbi[0], - fromBlock, - toBlock, - }); + let firstFailedBlock: bigint | null = null; + for (const log of logs) { + if (!log.transactionHash || !log.args.account || !log.args.amount) { + this.log.warn("Skipping malformed log entry"); + continue; + } - this.log.info(`Found ${logs.length} LogBurnWithDigest events`); + try { + await this.paymentService.processEvent(this.bindings, { + txHash: log.transactionHash, + logIndex: log.logIndex ?? 0, + blockNumber: Number(log.blockNumber), + fromAddress: log.args.account, + amount: log.args.amount, + digest: log.args.digest ?? "0x", + }); + } catch (e) { + if (firstFailedBlock === null) { + firstFailedBlock = log.blockNumber ?? fromBlock; + } + this.log.warn( + `Failed to process event from tx ${log.transactionHash}: ${e}`, + ); + } + } - // Process each log - let firstFailedBlock: bigint | null = null; - for (const log of logs) { - if (!log.transactionHash || !log.args.account || !log.args.amount) { - this.log.warn("Skipping malformed log entry"); - continue; + // If any event failed, only advance up to the block before the first failure + // so failed events are retried next tick. + let nextCursorBlock: bigint | null = toBlock; + if (firstFailedBlock !== null) { + nextCursorBlock = + firstFailedBlock <= fromBlock ? null : firstFailedBlock - 1n; } - try { - await this.paymentService.processEvent(this.bindings, { - txHash: log.transactionHash, - logIndex: log.logIndex ?? 0, - blockNumber: Number(log.blockNumber), - fromAddress: log.args.account, - amount: log.args.amount, - digest: log.args.digest ?? "0x", - }); - } catch (e) { - if (firstFailedBlock === null) { - firstFailedBlock = log.blockNumber ?? fromBlock; - } + if (nextCursorBlock === null) { this.log.warn( - `Failed to process event from tx ${log.transactionHash}: ${e}`, + `Not advancing block cursor due to processing failure at block ${firstFailedBlock?.toString()}`, ); + await queryRunner.commitTransaction(); + return; } - } - - // Update cursor. If any event failed, only advance up to the block before - // the first failure so failed events are retried in the next poll. - let nextCursorBlock: bigint | null = toBlock; - if (firstFailedBlock !== null) { - if (firstFailedBlock <= fromBlock) { - // Would only happen if firstFailedBlock == fromBlock, other states should be impossible - nextCursorBlock = null; - } else { - nextCursorBlock = firstFailedBlock - 1n; - } - } - if (nextCursorBlock === null) { - this.log.warn( - `Not advancing block cursor due to processing failure at block ${firstFailedBlock?.toString()}`, + await queryRunner.query( + `UPDATE block_cursors + SET last_processed_block = $1, updated_at = NOW() + WHERE id = $2`, + [nextCursorBlock.toString(), CURSOR_ID], ); - return; - } + await queryRunner.commitTransaction(); - if (cursor) { - cursor.lastProcessedBlock = nextCursorBlock.toString(); - cursor.updatedAt = new Date(); - await cursorRepo.save(cursor); - } else { - await cursorRepo.save({ - id: CURSOR_ID, - lastProcessedBlock: nextCursorBlock.toString(), - updatedAt: new Date(), - }); + this.log.debug(`Updated block cursor to ${nextCursorBlock}`); + } catch (e) { + await queryRunner.rollbackTransaction(); + throw e; + } finally { + await queryRunner.release(); } - - this.log.debug(`Updated block cursor to ${nextCursorBlock}`); } } diff --git a/nilcc-api/src/payment/payment.service.ts b/nilcc-api/src/payment/payment.service.ts index 10d7226..f177069 100644 --- a/nilcc-api/src/payment/payment.service.ts +++ b/nilcc-api/src/payment/payment.service.ts @@ -1,6 +1,7 @@ import type { QueryRunner, Repository } from "typeorm"; import { v4 as uuidv4 } from "uuid"; import type { AccountEntity } from "#/account/account.entity"; +import { isUniqueConstraint } from "#/common/errors"; import { microdollarsToUsd, nilToMicrodollars, @@ -116,6 +117,10 @@ export class PaymentService { return payment; } catch (e) { await queryRunner.rollbackTransaction(); + if (isUniqueConstraint(e)) { + bindings.log.info(`Payment already processed: ${event.txHash}`); + return null; + } throw e; } finally { await queryRunner.release();