diff --git a/packages/txm/lib/BlockMonitor.ts b/packages/txm/lib/BlockMonitor.ts index 282b101028..542ce78eb2 100644 --- a/packages/txm/lib/BlockMonitor.ts +++ b/packages/txm/lib/BlockMonitor.ts @@ -41,6 +41,8 @@ export class BlockMonitor { TxmMetrics.getInstance().currentBlockGauge.record(Number(block.number)) TxmMetrics.getInstance().newBlockDelayHistogram.record(Date.now() - Number(block.timestamp) * 1000) + this.txmgr.rpcLivenessMonitor.trackSuccess() + this.scheduleTimeout() } @@ -53,6 +55,7 @@ export class BlockMonitor { private resetBlockSubscription() { TxmMetrics.getInstance().resetBlockMonitorCounter.add(1) + this.txmgr.rpcLivenessMonitor.trackError() if (this.unwatch) { this.unwatch() } diff --git a/packages/txm/lib/EventBus.ts b/packages/txm/lib/EventBus.ts index 058bb2c4ae..bbf36832ed 100644 --- a/packages/txm/lib/EventBus.ts +++ b/packages/txm/lib/EventBus.ts @@ -5,6 +5,8 @@ export enum Topics { TransactionStatusChanged = "TransactionStatusChanged", TransactionSaveFailed = "TransactionSaveFailed", TransactionSubmissionFailed = "TransactionSubmissionFailed", + RpcIsDown = "RpcIsDown", + RpcIsUp = "RpcIsUp", } export type EventBus = EventEmitter diff --git a/packages/txm/lib/HookManager.ts b/packages/txm/lib/HookManager.ts index 4090d60832..f8e155f9dc 100644 --- a/packages/txm/lib/HookManager.ts +++ b/packages/txm/lib/HookManager.ts @@ -9,6 +9,8 @@ export enum TxmHookType { TransactionSaveFailed = "TransactionSaveFailed", NewBlock = "NewBlock", TransactionSubmissionFailed = "TransactionSubmissionFailed", + RpcIsDown = "RpcIsDown", + RpcIsUp = "RpcIsUp", } export type TxmTransactionStatusChangedHookPayload = { @@ -33,11 +35,21 @@ export type TxmTransactionSubmissionFailedHookPayload = { cause: AttemptSubmissionErrorCause } +export type TxmRpcIsDownHookPayload = { + type: TxmHookType.RpcIsDown +} + +export type TxmRpcIsUpHookPayload = { + type: TxmHookType.RpcIsUp +} + export type TxmHookPayload = | TxmTransactionStatusChangedHookPayload | TxmNewBlockHookPayload | TxmTransactionSaveFailedHookPayload | TxmTransactionSubmissionFailedHookPayload + | TxmRpcIsDownHookPayload + | TxmRpcIsUpHookPayload export type TxmHooksRecord = { [TxmHookType.All]: ((event: TxmHookPayload) => void)[] @@ -49,6 +61,8 @@ export type TxmHooksRecord = { description: string, cause: AttemptSubmissionErrorCause, ) => void)[] + [TxmHookType.RpcIsDown]: (() => void)[] + [TxmHookType.RpcIsUp]: (() => void)[] } export type TxmHookHandler = TxmHooksRecord[T][number] @@ -75,11 +89,15 @@ export class HookManager { [TxmHookType.TransactionSaveFailed]: [], [TxmHookType.NewBlock]: [], [TxmHookType.TransactionSubmissionFailed]: [], + [TxmHookType.RpcIsDown]: [], + [TxmHookType.RpcIsUp]: [], } eventBus.on(Topics.TransactionStatusChanged, this.onTransactionStatusChanged.bind(this)) eventBus.on(Topics.TransactionSaveFailed, this.onTransactionSaveFailed.bind(this)) eventBus.on(Topics.NewBlock, this.onNewBlock.bind(this)) eventBus.on(Topics.TransactionSubmissionFailed, this.onTransactionSubmissionFailed.bind(this)) + eventBus.on(Topics.RpcIsDown, this.onRpcIsDown.bind(this)) + eventBus.on(Topics.RpcIsUp, this.onRpcIsUp.bind(this)) } public addHook(type: T, handler: TxmHookHandler): () => void { @@ -149,4 +167,16 @@ export class HookManager { }), ) } + + private async onRpcIsDown(): Promise { + this.hooks[TxmHookType.RpcIsDown].forEach((handler) => handler()) + + this.hooks[TxmHookType.All].forEach((handler) => handler({ type: TxmHookType.RpcIsDown })) + } + + private async onRpcIsUp(): Promise { + this.hooks[TxmHookType.RpcIsUp].forEach((handler) => handler()) + + this.hooks[TxmHookType.All].forEach((handler) => handler({ type: TxmHookType.RpcIsUp })) + } } diff --git a/packages/txm/lib/NonceManager.ts b/packages/txm/lib/NonceManager.ts index 426e75655b..62c684b510 100644 --- a/packages/txm/lib/NonceManager.ts +++ b/packages/txm/lib/NonceManager.ts @@ -110,9 +110,12 @@ export class NonceManager { Logger.instance.error(LogTag.TXM, `Failed to get transaction count for address ${address}`, { error: blockchainNonceResult.error, }) + this.txmgr.rpcLivenessMonitor.trackError() return } + this.txmgr.rpcLivenessMonitor.trackSuccess() + this.maxExecutedNonce = blockchainNonceResult.value - 1 } } diff --git a/packages/txm/lib/RpcLivenessMonitor.ts b/packages/txm/lib/RpcLivenessMonitor.ts new file mode 100644 index 0000000000..c068f4b571 --- /dev/null +++ b/packages/txm/lib/RpcLivenessMonitor.ts @@ -0,0 +1,129 @@ +import { LogTag, Logger } from "@happy.tech/common" +import { Topics } from "./EventBus" +import { eventBus } from "./EventBus" +import type { TransactionManager } from "./TransactionManager" +import { TxmMetrics } from "./telemetry/metrics" + +interface SecondCounters { + successCount: number + errorCount: number +} + +export class RpcLivenessMonitor { + private txmgr: TransactionManager + private counters: Record + public isAlive: boolean + private isDownSince: Date | null + private checkIfHealthyInterval: NodeJS.Timer | null + private consecutiveSuccessesWhileCheckingIfHealthy: number + + constructor(txmgr: TransactionManager) { + this.txmgr = txmgr + this.counters = {} + this.isAlive = true + this.isDownSince = null + this.checkIfHealthyInterval = null + this.consecutiveSuccessesWhileCheckingIfHealthy = 0 + TxmMetrics.getInstance().rpcLivenessMonitorGauge.record(this.isAlive ? 1 : 0) + + setInterval(() => { + this.cleanOldCounters() + }, 1000) + } + + private getCurrentSecond(): number { + return Math.floor(Date.now() / 1000) + } + + private cleanOldCounters(): void { + const now = this.getCurrentSecond() + const oldestAllowedSecond = now - Math.floor(this.txmgr.livenessWindow / 1000) + + Object.keys(this.counters).forEach((secondStr) => { + const second = Number.parseInt(secondStr, 10) + if (second < oldestAllowedSecond) { + delete this.counters[second] + } + }) + } + + trackSuccess() { + const currentSecond = this.getCurrentSecond() + + if (!this.counters[currentSecond]) { + this.counters[currentSecond] = { successCount: 0, errorCount: 0 } + } + + this.counters[currentSecond].successCount++ + this.checkIfDown() + } + + trackError() { + const currentSecond = this.getCurrentSecond() + + if (!this.counters[currentSecond]) { + this.counters[currentSecond] = { successCount: 0, errorCount: 0 } + } + + this.counters[currentSecond].errorCount++ + this.checkIfDown() + } + + private checkIfDown() { + if (this.isAlive && this.ratioOfSuccess() < this.txmgr.livenessThreshold) { + this.isAlive = false + this.isDownSince = new Date() + this.consecutiveSuccessesWhileCheckingIfHealthy = 0 + this.updateLivenessMetrics() + eventBus.emit(Topics.RpcIsDown) + Logger.instance.error(LogTag.TXM, "Detected that the RPC is not healthy") + + this.checkIfHealthyInterval = setInterval(() => { + this.checkIfHealthy() + }, this.txmgr.livenessCheckInterval) + } + } + + private async checkIfHealthy() { + if (this.isDownSince && this.isDownSince.getTime() + this.txmgr.livenessDownDelay > new Date().getTime()) { + return + } + + const chainIdResult = await this.txmgr.viemClient.safeGetChainId() + + if (chainIdResult.isOk()) { + this.consecutiveSuccessesWhileCheckingIfHealthy++ + } else { + this.consecutiveSuccessesWhileCheckingIfHealthy = 0 + } + + if (this.consecutiveSuccessesWhileCheckingIfHealthy > this.txmgr.livenessSuccessCount) { + Logger.instance.info(LogTag.TXM, "Detected that the RPC is healthy") + this.isAlive = true + this.isDownSince = null + this.consecutiveSuccessesWhileCheckingIfHealthy = 0 + this.counters = {} + this.updateLivenessMetrics() + eventBus.emit(Topics.RpcIsUp) + if (this.checkIfHealthyInterval) { + clearInterval(this.checkIfHealthyInterval) + } + } + } + + updateLivenessMetrics() { + TxmMetrics.getInstance().rpcLivenessMonitorGauge.record(this.isAlive ? 1 : 0) + } + + ratioOfSuccess() { + let totalSuccesses = 0 + let totalEvents = 0 + + Object.values(this.counters).forEach((counter) => { + totalSuccesses += counter.successCount + totalEvents += counter.successCount + counter.errorCount + }) + + return totalEvents > 0 ? totalSuccesses / totalEvents : 1 + } +} diff --git a/packages/txm/lib/TransactionCollector.ts b/packages/txm/lib/TransactionCollector.ts index a7afaeec98..dff647cecf 100644 --- a/packages/txm/lib/TransactionCollector.ts +++ b/packages/txm/lib/TransactionCollector.ts @@ -44,6 +44,11 @@ export class TransactionCollector { TxmMetrics.getInstance().transactionCollectedCounter.add(transactionsBatch.length) + if (!this.txmgr.rpcLivenessMonitor.isAlive) { + Logger.instance.error(LogTag.TXM, "RPC is not alive, skipping attempt to submit transactions") + return + } + await Promise.all( transactionsBatch.map(async (transaction) => { const nonce = this.txmgr.nonceManager.requestNonce() diff --git a/packages/txm/lib/TransactionManager.ts b/packages/txm/lib/TransactionManager.ts index 20c627ab43..f01b003a50 100644 --- a/packages/txm/lib/TransactionManager.ts +++ b/packages/txm/lib/TransactionManager.ts @@ -19,6 +19,7 @@ import { GasPriceOracle } from "./GasPriceOracle.js" import { HookManager, type TxmHookHandler, type TxmHookType } from "./HookManager.js" import { NonceManager } from "./NonceManager.js" import { DefaultRetryPolicyManager, type RetryPolicyManager } from "./RetryPolicyManager.js" +import { RpcLivenessMonitor } from "./RpcLivenessMonitor" import { Transaction, type TransactionConstructorConfig } from "./Transaction.js" import { TransactionCollector } from "./TransactionCollector.js" import { TransactionRepository } from "./TransactionRepository.js" @@ -75,6 +76,46 @@ export type TransactionManagerConfig = { * Defaults to 4000 milliseconds. */ blockInactivityTimeout?: number + + /** + * The minimum success rate of RPC calls required to consider the RPC healthy. + * Expressed as a decimal between 0 and 1. + * Example: 0.85 means 85% of calls must be successful. + * @default 0.85 + */ + livenessThreshold?: number + + /** + * The monitoring window duration for evaluating RPC health. + * The success rate is calculated based on RPC calls within this time frame. + * @default 10000 (10 seconds) + * @unit milliseconds + */ + livenessWindow?: number + + /** + * Number of successful consecutive chainId requests required to mark the RPC healthy again. + * When marked unhealthy, the system will periodically send chainId requests + * to check if the RPC has recovered. + * @default 3 + */ + livenessSuccessCount?: number + + /** + * Margin of time after the RPC is marked as unhealthy before the txm starts checking if it is healthy again. + * @default 5000 (5 seconds) + * @unit milliseconds + */ + livenessDownDelay?: number + + /** + * The interval between health check attempts for the RPC. + * When unhealthy, the system will send chainId requests at this interval + * until either the RPC recovers or the connection is terminated. + * @default 2000 (2 seconds) + * @unit milliseconds + */ + livenessCheckInterval?: number } /** The private key of the account used for signing transactions. */ privateKey: Hex @@ -187,6 +228,7 @@ export class TransactionManager { public readonly transactionSubmitter: TransactionSubmitter public readonly hookManager: HookManager public readonly retryPolicyManager: RetryPolicyManager + public readonly rpcLivenessMonitor: RpcLivenessMonitor public readonly chainId: number public readonly eip1559: EIP1559Parameters @@ -198,6 +240,11 @@ export class TransactionManager { public readonly pollingInterval: number public readonly transportProtocol: "http" | "websocket" public readonly blockInactivityTimeout: number + public readonly livenessWindow: number + public readonly livenessThreshold: number + public readonly livenessSuccessCount: number + public readonly livenessDownDelay: number + public readonly livenessCheckInterval: number constructor(_config: TransactionManagerConfig) { initializeTelemetry({ @@ -293,6 +340,7 @@ export class TransactionManager { this.transactionSubmitter = new TransactionSubmitter(this) this.hookManager = new HookManager() this.retryPolicyManager = _config.retryPolicyManager ?? new DefaultRetryPolicyManager() + this.rpcLivenessMonitor = new RpcLivenessMonitor(this) this.chainId = _config.chainId this.eip1559 = _config.eip1559 ?? opStackDefaultEIP1559Parameters @@ -307,6 +355,12 @@ export class TransactionManager { this.pollingInterval = _config.rpc.pollingInterval ?? (Number(this.blockTime) * 1000) / 2 this.blockInactivityTimeout = _config.rpc.blockInactivityTimeout ?? 4000 + + this.livenessWindow = _config.rpc.livenessWindow ?? 10000 + this.livenessThreshold = _config.rpc.livenessThreshold ?? 0.85 + this.livenessSuccessCount = _config.rpc.livenessSuccessCount ?? 3 + this.livenessDownDelay = _config.rpc.livenessDownDelay ?? 5000 + this.livenessCheckInterval = _config.rpc.livenessCheckInterval ?? 2000 } /** diff --git a/packages/txm/lib/TransactionSubmitter.ts b/packages/txm/lib/TransactionSubmitter.ts index 305d2431fd..7c07e8ccaf 100644 --- a/packages/txm/lib/TransactionSubmitter.ts +++ b/packages/txm/lib/TransactionSubmitter.ts @@ -155,6 +155,9 @@ export class TransactionSubmitter { ) { this.txmgr.nonceManager.resync() } + + this.txmgr.rpcLivenessMonitor.trackError() + return err({ cause: AttemptSubmissionErrorCause.FailedToSendRawTransaction, description: `Failed to send raw transaction ${transaction.intentId}. Details: ${sendRawTransactionResult.error}`, @@ -162,6 +165,8 @@ export class TransactionSubmitter { }) } + this.txmgr.rpcLivenessMonitor.trackSuccess() + return ok(undefined) } } diff --git a/packages/txm/lib/TxMonitor.ts b/packages/txm/lib/TxMonitor.ts index 7265c13da9..67220416b9 100644 --- a/packages/txm/lib/TxMonitor.ts +++ b/packages/txm/lib/TxMonitor.ts @@ -64,6 +64,11 @@ export class TxMonitor { } private async handleNewBlock(block: LatestBlock) { + if (!this.transactionManager.rpcLivenessMonitor.isAlive) { + Logger.instance.warn(LogTag.TXM, "RPC is not alive, skipping attempt to monitor transactions") + return + } + const transactions = this.transactionManager.transactionRepository.getNotFinalizedTransactionsOlderThan( block.number, ) @@ -98,11 +103,15 @@ export class TxMonitor { isResolved = true resolve(attemptWithReceipt) } + this.transactionManager.rpcLivenessMonitor.trackSuccess() return ok(attemptWithReceipt) } if (receiptResult.error instanceof TransactionReceiptNotFoundError) { + this.transactionManager.rpcLivenessMonitor.trackSuccess() return ok(null) } + + this.transactionManager.rpcLivenessMonitor.trackError() return err(receiptResult.error) }, ) diff --git a/packages/txm/lib/telemetry/metrics.ts b/packages/txm/lib/telemetry/metrics.ts index 54056692c0..23c44d05af 100644 --- a/packages/txm/lib/telemetry/metrics.ts +++ b/packages/txm/lib/telemetry/metrics.ts @@ -271,6 +271,16 @@ export class TxmMetrics { valueType: ValueType.INT, }) + /* RPC LIVENESS MONITOR METRICS */ + private readonly rpcLivenessMonitorMeter = metrics.getMeter("txm.rpc-liveness-monitor") + + public readonly rpcLivenessMonitorGauge = this.rpcLivenessMonitorMeter.createGauge( + "txm.rpc-liveness-monitor.is-alive", + { + description: "Whether the RPC is alive", + }, + ) + // Singleton instance private static instance: TxmMetrics diff --git a/packages/txm/test/txm.test.ts b/packages/txm/test/txm.test.ts index 18b07a27dd..d3913881b1 100644 --- a/packages/txm/test/txm.test.ts +++ b/packages/txm/test/txm.test.ts @@ -38,6 +38,8 @@ const txm = new TransactionManager({ url: PROXY_URL, pollingInterval: 200, allowDebug: true, + livenessCheckInterval: 500, + livenessDownDelay: 1000, }, abis: abis, gasEstimator: new TestGasEstimator(), @@ -576,10 +578,16 @@ test("Correctly calculates baseFeePerGas after a block with high gas usage", asy }) test("Transaction manager successfully processes transactions despite random RPC failures", async () => { + const previousLivenessThreshold = txm.livenessThreshold + Object.defineProperty(txm, "livenessThreshold", { + value: 0, + configurable: true, + }) + proxyServer.setMode(ProxyMode.Random, { - [ProxyBehavior.NotAnswer]: 0.1, - [ProxyBehavior.Fail]: 0.2, - [ProxyBehavior.Forward]: 0.7, + [ProxyBehavior.NotAnswer]: 0.05, + [ProxyBehavior.Fail]: 0.05, + [ProxyBehavior.Forward]: 0.9, }) const previousBlock = await getCurrentBlock() @@ -619,6 +627,11 @@ test("Transaction manager successfully processes transactions despite random RPC expect(successfulTransactions).toBeGreaterThan(numTransactions * 0.6) proxyServer.setMode(ProxyMode.Deterministic) + + Object.defineProperty(txm, "livenessThreshold", { + value: previousLivenessThreshold, + configurable: true, + }) }) test("Transaction succeeds in congested blocks", async () => { @@ -675,52 +688,117 @@ test("Transaction succeeds in congested blocks", async () => { expect(executedIncrementerTransaction.collectionBlock).toBe(previousBlock.number! + 1n) }) -test( - "Finalized transactions are automatically purged from db after finalizedTransactionPurgeTime elapses", - async () => { - const previousFinalizedTransactionPurgeTime = txm.finalizedTransactionPurgeTime +test("Finalized transactions are automatically purged from db after finalizedTransactionPurgeTime elapses", async () => { + const previousFinalizedTransactionPurgeTime = txm.finalizedTransactionPurgeTime - const mockedFinalizedTransactionPurgeTime = 6000 + const mockedFinalizedTransactionPurgeTime = 6000 - Object.defineProperty(txm, "finalizedTransactionPurgeTime", { - value: mockedFinalizedTransactionPurgeTime, - configurable: true, - }) + Object.defineProperty(txm, "finalizedTransactionPurgeTime", { + value: mockedFinalizedTransactionPurgeTime, + configurable: true, + }) + + const transaction = await createCounterTransaction() + + transactionQueue.push(transaction) + + await mineBlock(2) + + const transactionPersisted = await getPersistedTransaction(transaction.intentId) + + if (!assertIsDefined(transactionPersisted)) return + + expect(transactionPersisted.status).toBe(TransactionStatus.Success) + + const updatedAt = transactionPersisted.updatedAt + const purgeTime = updatedAt + mockedFinalizedTransactionPurgeTime + + while (Date.now() < purgeTime) { + const transactionPersisted = await getPersistedTransaction(transaction.intentId) + + expect(transactionPersisted).toBeDefined() + expect(transactionPersisted?.status).toBe(TransactionStatus.Success) + + await mineBlock() + } + + await mineBlock() + + const persistedTransaction = await getPersistedTransaction(transaction.intentId) + + expect(persistedTransaction).toBeUndefined() + + Object.defineProperty(txm, "finalizedTransactionPurgeTime", { + value: previousFinalizedTransactionPurgeTime, + configurable: true, + }) +}) +test("RPC liveness monitor works correctly", async () => { + const previousLivenessWindow = txm.livenessWindow + Object.defineProperty(txm, "livenessWindow", { + value: 2000, + configurable: true, + }) + + proxyServer.setMode(ProxyMode.Deterministic) + + while (!txm.rpcLivenessMonitor.isAlive) { const transaction = await createCounterTransaction() transactionQueue.push(transaction) - await mineBlock(2) + await mineBlock() + } - const transactionPersisted = await getPersistedTransaction(transaction.intentId) + proxyServer.setMode(ProxyMode.Random, { + [ProxyBehavior.NotAnswer]: 0, + [ProxyBehavior.Fail]: 0.5, + [ProxyBehavior.Forward]: 0.5, + }) - if (!assertIsDefined(transactionPersisted)) return + let isDownHookTriggered = false + let isUpHookTriggered = false - expect(transactionPersisted.status).toBe(TransactionStatus.Success) + const cleanIsDownHook = await txm.addHook(TxmHookType.RpcIsDown, () => { + isDownHookTriggered = true + }) - const updatedAt = transactionPersisted.updatedAt - const purgeTime = updatedAt + mockedFinalizedTransactionPurgeTime + const cleanIsUpHook = await txm.addHook(TxmHookType.RpcIsUp, () => { + isUpHookTriggered = true + }) - while (Date.now() < purgeTime) { - const transactionPersisted = await getPersistedTransaction(transaction.intentId) + expect(txm.rpcLivenessMonitor.isAlive).toBe(true) - expect(transactionPersisted).toBeDefined() - expect(transactionPersisted?.status).toBe(TransactionStatus.Success) + while (txm.rpcLivenessMonitor.isAlive) { + const transaction = await createCounterTransaction() - await mineBlock() - } + transactionQueue.push(transaction) + + await mineBlock() + } + + expect(isDownHookTriggered).toBe(true) + expect(txm.rpcLivenessMonitor.isAlive).toBe(false) + + proxyServer.setMode(ProxyMode.Deterministic) + + while (!txm.rpcLivenessMonitor.isAlive) { + const transaction = await createCounterTransaction() + + transactionQueue.push(transaction) await mineBlock() + } - const persistedTransaction = await getPersistedTransaction(transaction.intentId) + expect(isUpHookTriggered).toBe(true) + expect(txm.rpcLivenessMonitor.isAlive).toBe(true) - expect(persistedTransaction).toBeUndefined() + cleanIsDownHook() + cleanIsUpHook() - Object.defineProperty(txm, "finalizedTransactionPurgeTime", { - value: previousFinalizedTransactionPurgeTime, - configurable: true, - }) - }, - { timeout: 20000 }, -) + Object.defineProperty(txm, "livenessWindow", { + value: previousLivenessWindow, + configurable: true, + }) +}) diff --git a/packages/txm/vitest.config.ts b/packages/txm/vitest.config.ts index d7b3c51876..84478b1d09 100644 --- a/packages/txm/vitest.config.ts +++ b/packages/txm/vitest.config.ts @@ -5,8 +5,8 @@ export default defineConfig({ globals: true, environment: "node", setupFiles: "./vitest.setup.ts", - testTimeout: 10000, - hookTimeout: 30000, + testTimeout: 60000, + hookTimeout: 60000, watch: false, }, })