From a5ca61d421e139cc9b77c1ba249d8b0c6252ce15 Mon Sep 17 00:00:00 2001 From: GabrielMartinezRodriguez Date: Wed, 26 Mar 2025 15:40:33 +0100 Subject: [PATCH 1/8] feat(txm): rpc liveness --- packages/txm/lib/BlockMonitor.ts | 3 + packages/txm/lib/NonceManager.ts | 3 + packages/txm/lib/RpcLivenessMonitor.ts | 96 ++++++++++++++++++++++++ packages/txm/lib/TransactionCollector.ts | 5 ++ packages/txm/lib/TransactionManager.ts | 54 +++++++++++++ packages/txm/lib/TransactionSubmitter.ts | 5 ++ packages/txm/lib/TxMonitor.ts | 9 +++ packages/txm/lib/telemetry/metrics.ts | 7 ++ 8 files changed, 182 insertions(+) create mode 100644 packages/txm/lib/RpcLivenessMonitor.ts diff --git a/packages/txm/lib/BlockMonitor.ts b/packages/txm/lib/BlockMonitor.ts index 282b101028..c0fb27eca7 100644 --- a/packages/txm/lib/BlockMonitor.ts +++ b/packages/txm/lib/BlockMonitor.ts @@ -41,6 +41,8 @@ export class BlockMonitor { TxmMetrics.getInstance().currentBlockGauge.record(Number(block.number)) TxmMetrics.getInstance().newBlockDelayHistogram.record(Date.now() - Number(block.timestamp) * 1000) + this.txmgr.rpcLivenessMonitor.onSuccess() + this.scheduleTimeout() } @@ -53,6 +55,7 @@ export class BlockMonitor { private resetBlockSubscription() { TxmMetrics.getInstance().resetBlockMonitorCounter.add(1) + this.txmgr.rpcLivenessMonitor.onFailure() if (this.unwatch) { this.unwatch() } diff --git a/packages/txm/lib/NonceManager.ts b/packages/txm/lib/NonceManager.ts index 426e75655b..2484e9fd1f 100644 --- a/packages/txm/lib/NonceManager.ts +++ b/packages/txm/lib/NonceManager.ts @@ -110,9 +110,12 @@ export class NonceManager { Logger.instance.error(LogTag.TXM, `Failed to get transaction count for address ${address}`, { error: blockchainNonceResult.error, }) + this.txmgr.rpcLivenessMonitor.onFailure() return } + this.txmgr.rpcLivenessMonitor.onSuccess() + this.maxExecutedNonce = blockchainNonceResult.value - 1 } } diff --git a/packages/txm/lib/RpcLivenessMonitor.ts b/packages/txm/lib/RpcLivenessMonitor.ts new file mode 100644 index 0000000000..b391ba2701 --- /dev/null +++ b/packages/txm/lib/RpcLivenessMonitor.ts @@ -0,0 +1,96 @@ +import { LogTag, Logger } from "@happy.tech/common" +import type { TransactionManager } from "./TransactionManager" +import { TxmMetrics } from "./telemetry/metrics" + +interface LivenessEvent { + occurredAt: Date + success: boolean +} + +export class RpcLivenessMonitor { + private txmgr: TransactionManager + private events: LivenessEvent[] + public isAlive: boolean + private isDownSince: Date | null + private checkIfHealthyInterval: NodeJS.Timer | null + private consecutiveSuccessesWhileCheckingIfHealthy: number + + constructor(txmgr: TransactionManager) { + this.txmgr = txmgr + this.events = [] + this.isAlive = true + this.isDownSince = null + this.checkIfHealthyInterval = null + this.consecutiveSuccessesWhileCheckingIfHealthy = 0 + TxmMetrics.getInstance().rpcLivenessMonitorGauge.record(this.isAlive ? 1 : 0) + } + + onSuccess() { + this.events.push({ + occurredAt: new Date(), + success: true, + }) + this.checkIfDown() + } + + onFailure() { + this.events.push({ + occurredAt: new Date(), + success: false, + }) + this.checkIfDown() + } + + cleanOldEvents() { + const now = new Date() + this.events = this.events.filter((event) => { + return now.getTime() - event.occurredAt.getTime() < this.txmgr.livenessWindow + }) + } + + private checkIfDown() { + if (this.isAlive && this.ratioOfSuccess() < this.txmgr.livenessThreshold) { + this.isAlive = false + this.isDownSince = new Date() + this.consecutiveSuccessesWhileCheckingIfHealthy = 0 + TxmMetrics.getInstance().rpcLivenessMonitorGauge.record(this.isAlive ? 1 : 0) + Logger.instance.error(LogTag.TXM, "Detected that the RPC is not healthy") + + this.checkIfHealthyInterval = setInterval(() => { + this.checkIfHealthy() + }, 2000) + } + } + + private async checkIfHealthy() { + if (this.isDownSince && this.isDownSince.getTime() + this.txmgr.livenessDownDelay > new Date().getTime()) { + return + } + + const chainIdResult = await this.txmgr.viemClient.safeGetChainId() + + if (chainIdResult.isOk()) { + this.consecutiveSuccessesWhileCheckingIfHealthy++ + } else { + this.consecutiveSuccessesWhileCheckingIfHealthy = 0 + } + + if (this.consecutiveSuccessesWhileCheckingIfHealthy > this.txmgr.livenessSuccessCount) { + Logger.instance.info(LogTag.TXM, "Detected that the RPC is healthy") + this.isAlive = true + this.isDownSince = null + this.consecutiveSuccessesWhileCheckingIfHealthy = 0 + this.events = [] + TxmMetrics.getInstance().rpcLivenessMonitorGauge.record(this.isAlive ? 1 : 0) + if (this.checkIfHealthyInterval) { + clearInterval(this.checkIfHealthyInterval) + } + } + } + + ratioOfSuccess() { + this.cleanOldEvents() + const successEvents = this.events.filter((event) => event.success) + return successEvents.length / this.events.length + } +} diff --git a/packages/txm/lib/TransactionCollector.ts b/packages/txm/lib/TransactionCollector.ts index a7afaeec98..dff647cecf 100644 --- a/packages/txm/lib/TransactionCollector.ts +++ b/packages/txm/lib/TransactionCollector.ts @@ -44,6 +44,11 @@ export class TransactionCollector { TxmMetrics.getInstance().transactionCollectedCounter.add(transactionsBatch.length) + if (!this.txmgr.rpcLivenessMonitor.isAlive) { + Logger.instance.error(LogTag.TXM, "RPC is not alive, skipping attempt to submit transactions") + return + } + await Promise.all( transactionsBatch.map(async (transaction) => { const nonce = this.txmgr.nonceManager.requestNonce() diff --git a/packages/txm/lib/TransactionManager.ts b/packages/txm/lib/TransactionManager.ts index 20c627ab43..bd009a19a0 100644 --- a/packages/txm/lib/TransactionManager.ts +++ b/packages/txm/lib/TransactionManager.ts @@ -19,6 +19,7 @@ import { GasPriceOracle } from "./GasPriceOracle.js" import { HookManager, type TxmHookHandler, type TxmHookType } from "./HookManager.js" import { NonceManager } from "./NonceManager.js" import { DefaultRetryPolicyManager, type RetryPolicyManager } from "./RetryPolicyManager.js" +import { RpcLivenessMonitor } from "./RpcLivenessMonitor" import { Transaction, type TransactionConstructorConfig } from "./Transaction.js" import { TransactionCollector } from "./TransactionCollector.js" import { TransactionRepository } from "./TransactionRepository.js" @@ -75,6 +76,46 @@ export type TransactionManagerConfig = { * Defaults to 4000 milliseconds. */ blockInactivityTimeout?: number + + /** + * The minimum success rate of RPC calls required to consider the RPC healthy. + * Expressed as a decimal between 0 and 1. + * Example: 0.85 means 85% of calls must be successful. + * @default 0.85 + */ + livenessThreshold?: number + + /** + * The monitoring window duration for evaluating RPC health. + * The success rate is calculated based on RPC calls within this time frame. + * @default 10000 (10 seconds) + * @unit milliseconds + */ + livenessWindow?: number + + /** + * Number of successful consecutive chainId requests required to mark the RPC healthy again. + * When marked unhealthy, the system will periodically send chainId requests + * to check if the RPC has recovered. + * @default 3 + */ + livenessSuccessCount?: number + + /** + * The interval between health check attempts for the RPC. + * When unhealthy, the system will send chainId requests at this interval + * until either the RPC recovers or the connection is terminated. + * @default 2000 (2 seconds) + * @unit milliseconds + */ + livenessPingInterval?: number + + /** + * Margin of time after the RPC is marked as unhealthy before the txm starts checking if it is healthy again. + * @default 5000 (5 seconds) + * @unit milliseconds + */ + livenessDownDelay?: number } /** The private key of the account used for signing transactions. */ privateKey: Hex @@ -187,6 +228,7 @@ export class TransactionManager { public readonly transactionSubmitter: TransactionSubmitter public readonly hookManager: HookManager public readonly retryPolicyManager: RetryPolicyManager + public readonly rpcLivenessMonitor: RpcLivenessMonitor public readonly chainId: number public readonly eip1559: EIP1559Parameters @@ -198,6 +240,11 @@ export class TransactionManager { public readonly pollingInterval: number public readonly transportProtocol: "http" | "websocket" public readonly blockInactivityTimeout: number + public readonly livenessWindow: number + public readonly livenessThreshold: number + public readonly livenessPingInterval: number + public readonly livenessSuccessCount: number + public readonly livenessDownDelay: number constructor(_config: TransactionManagerConfig) { initializeTelemetry({ @@ -293,6 +340,7 @@ export class TransactionManager { this.transactionSubmitter = new TransactionSubmitter(this) this.hookManager = new HookManager() this.retryPolicyManager = _config.retryPolicyManager ?? new DefaultRetryPolicyManager() + this.rpcLivenessMonitor = new RpcLivenessMonitor(this) this.chainId = _config.chainId this.eip1559 = _config.eip1559 ?? opStackDefaultEIP1559Parameters @@ -307,6 +355,12 @@ export class TransactionManager { this.pollingInterval = _config.rpc.pollingInterval ?? (Number(this.blockTime) * 1000) / 2 this.blockInactivityTimeout = _config.rpc.blockInactivityTimeout ?? 4000 + + this.livenessWindow = _config.rpc.livenessWindow ?? 10000 + this.livenessThreshold = _config.rpc.livenessThreshold ?? 0.85 + this.livenessSuccessCount = _config.rpc.livenessSuccessCount ?? 3 + this.livenessPingInterval = _config.rpc.livenessPingInterval ?? 2000 + this.livenessDownDelay = _config.rpc.livenessDownDelay ?? 5000 } /** diff --git a/packages/txm/lib/TransactionSubmitter.ts b/packages/txm/lib/TransactionSubmitter.ts index 305d2431fd..9f3b031aea 100644 --- a/packages/txm/lib/TransactionSubmitter.ts +++ b/packages/txm/lib/TransactionSubmitter.ts @@ -155,6 +155,9 @@ export class TransactionSubmitter { ) { this.txmgr.nonceManager.resync() } + + this.txmgr.rpcLivenessMonitor.onFailure() + return err({ cause: AttemptSubmissionErrorCause.FailedToSendRawTransaction, description: `Failed to send raw transaction ${transaction.intentId}. Details: ${sendRawTransactionResult.error}`, @@ -162,6 +165,8 @@ export class TransactionSubmitter { }) } + this.txmgr.rpcLivenessMonitor.onSuccess() + return ok(undefined) } } diff --git a/packages/txm/lib/TxMonitor.ts b/packages/txm/lib/TxMonitor.ts index 7265c13da9..fab272a840 100644 --- a/packages/txm/lib/TxMonitor.ts +++ b/packages/txm/lib/TxMonitor.ts @@ -64,6 +64,11 @@ export class TxMonitor { } private async handleNewBlock(block: LatestBlock) { + if (!this.transactionManager.rpcLivenessMonitor.isAlive) { + Logger.instance.warn(LogTag.TXM, "RPC is not alive, skipping attempt to monitor transactions") + return + } + const transactions = this.transactionManager.transactionRepository.getNotFinalizedTransactionsOlderThan( block.number, ) @@ -98,11 +103,15 @@ export class TxMonitor { isResolved = true resolve(attemptWithReceipt) } + this.transactionManager.rpcLivenessMonitor.onSuccess() return ok(attemptWithReceipt) } if (receiptResult.error instanceof TransactionReceiptNotFoundError) { + this.transactionManager.rpcLivenessMonitor.onSuccess() return ok(null) } + + this.transactionManager.rpcLivenessMonitor.onFailure() return err(receiptResult.error) }, ) diff --git a/packages/txm/lib/telemetry/metrics.ts b/packages/txm/lib/telemetry/metrics.ts index 54056692c0..a504c2c54a 100644 --- a/packages/txm/lib/telemetry/metrics.ts +++ b/packages/txm/lib/telemetry/metrics.ts @@ -271,6 +271,13 @@ export class TxmMetrics { valueType: ValueType.INT, }) + /* RPC LIVENESS MONITOR METRICS */ + private readonly rpcLivenessMonitorMeter = metrics.getMeter("txm.rpc-liveness-monitor") + + public readonly rpcLivenessMonitorGauge = this.rpcLivenessMonitorMeter.createGauge("txm.rpc-liveness-monitor.is-alive", { + description: "Whether the RPC is alive" + }) + // Singleton instance private static instance: TxmMetrics From 1f8449d93cf69e4c54eb4e0abae0ec8331f0a54e Mon Sep 17 00:00:00 2001 From: GabrielMartinezRodriguez Date: Wed, 26 Mar 2025 15:48:52 +0100 Subject: [PATCH 2/8] feat(txm): added hooks to notify when rpc liveness changes+ --- packages/txm/lib/EventBus.ts | 2 ++ packages/txm/lib/HookManager.ts | 32 ++++++++++++++++++++++++++ packages/txm/lib/RpcLivenessMonitor.ts | 12 ++++++++-- packages/txm/lib/telemetry/metrics.ts | 9 +++++--- 4 files changed, 50 insertions(+), 5 deletions(-) diff --git a/packages/txm/lib/EventBus.ts b/packages/txm/lib/EventBus.ts index 058bb2c4ae..bbf36832ed 100644 --- a/packages/txm/lib/EventBus.ts +++ b/packages/txm/lib/EventBus.ts @@ -5,6 +5,8 @@ export enum Topics { TransactionStatusChanged = "TransactionStatusChanged", TransactionSaveFailed = "TransactionSaveFailed", TransactionSubmissionFailed = "TransactionSubmissionFailed", + RpcIsDown = "RpcIsDown", + RpcIsUp = "RpcIsUp", } export type EventBus = EventEmitter diff --git a/packages/txm/lib/HookManager.ts b/packages/txm/lib/HookManager.ts index 4090d60832..406f916e87 100644 --- a/packages/txm/lib/HookManager.ts +++ b/packages/txm/lib/HookManager.ts @@ -9,6 +9,8 @@ export enum TxmHookType { TransactionSaveFailed = "TransactionSaveFailed", NewBlock = "NewBlock", TransactionSubmissionFailed = "TransactionSubmissionFailed", + RpcIsDown = "RpcIsDown", + RpcIsUp = "RpcIsUp", } export type TxmTransactionStatusChangedHookPayload = { @@ -33,11 +35,21 @@ export type TxmTransactionSubmissionFailedHookPayload = { cause: AttemptSubmissionErrorCause } +export type TxmRpcIsDownHookPayload = { + type: TxmHookType.RpcIsDown +} + +export type TxmRpcIsUpHookPayload = { + type: TxmHookType.RpcIsUp +} + export type TxmHookPayload = | TxmTransactionStatusChangedHookPayload | TxmNewBlockHookPayload | TxmTransactionSaveFailedHookPayload | TxmTransactionSubmissionFailedHookPayload + | TxmRpcIsDownHookPayload + | TxmRpcIsUpHookPayload export type TxmHooksRecord = { [TxmHookType.All]: ((event: TxmHookPayload) => void)[] @@ -49,6 +61,8 @@ export type TxmHooksRecord = { description: string, cause: AttemptSubmissionErrorCause, ) => void)[] + [TxmHookType.RpcIsDown]: (() => void)[] + [TxmHookType.RpcIsUp]: (() => void)[] } export type TxmHookHandler = TxmHooksRecord[T][number] @@ -75,11 +89,15 @@ export class HookManager { [TxmHookType.TransactionSaveFailed]: [], [TxmHookType.NewBlock]: [], [TxmHookType.TransactionSubmissionFailed]: [], + [TxmHookType.RpcIsDown]: [], + [TxmHookType.RpcIsUp]: [], } eventBus.on(Topics.TransactionStatusChanged, this.onTransactionStatusChanged.bind(this)) eventBus.on(Topics.TransactionSaveFailed, this.onTransactionSaveFailed.bind(this)) eventBus.on(Topics.NewBlock, this.onNewBlock.bind(this)) eventBus.on(Topics.TransactionSubmissionFailed, this.onTransactionSubmissionFailed.bind(this)) + eventBus.on(Topics.RpcIsDown, this.onRpcIsDown.bind(this)) + eventBus.on(Topics.RpcIsUp, this.onRpcIsUp.bind(this)) } public addHook(type: T, handler: TxmHookHandler): () => void { @@ -149,4 +167,18 @@ export class HookManager { }), ) } + + private async onRpcIsDown(): Promise { + this.hooks[TxmHookType.RpcIsDown].forEach((handler) => handler()) + + this.hooks[TxmHookType.All].forEach((handler) => + handler({ type: TxmHookType.RpcIsDown }), + ) + } + + private async onRpcIsUp(): Promise { + this.hooks[TxmHookType.RpcIsUp].forEach((handler) => handler()) + + this.hooks[TxmHookType.All].forEach((handler) => handler({ type: TxmHookType.RpcIsUp })) + } } diff --git a/packages/txm/lib/RpcLivenessMonitor.ts b/packages/txm/lib/RpcLivenessMonitor.ts index b391ba2701..9f4067339c 100644 --- a/packages/txm/lib/RpcLivenessMonitor.ts +++ b/packages/txm/lib/RpcLivenessMonitor.ts @@ -1,4 +1,6 @@ import { LogTag, Logger } from "@happy.tech/common" +import { Topics } from "./EventBus" +import { eventBus } from "./EventBus" import type { TransactionManager } from "./TransactionManager" import { TxmMetrics } from "./telemetry/metrics" @@ -53,7 +55,8 @@ export class RpcLivenessMonitor { this.isAlive = false this.isDownSince = new Date() this.consecutiveSuccessesWhileCheckingIfHealthy = 0 - TxmMetrics.getInstance().rpcLivenessMonitorGauge.record(this.isAlive ? 1 : 0) + this.updateLivenessMetrics() + eventBus.emit(Topics.RpcIsDown) Logger.instance.error(LogTag.TXM, "Detected that the RPC is not healthy") this.checkIfHealthyInterval = setInterval(() => { @@ -81,13 +84,18 @@ export class RpcLivenessMonitor { this.isDownSince = null this.consecutiveSuccessesWhileCheckingIfHealthy = 0 this.events = [] - TxmMetrics.getInstance().rpcLivenessMonitorGauge.record(this.isAlive ? 1 : 0) + this.updateLivenessMetrics() + eventBus.emit(Topics.RpcIsUp) if (this.checkIfHealthyInterval) { clearInterval(this.checkIfHealthyInterval) } } } + updateLivenessMetrics() { + TxmMetrics.getInstance().rpcLivenessMonitorGauge.record(this.isAlive ? 1 : 0) + } + ratioOfSuccess() { this.cleanOldEvents() const successEvents = this.events.filter((event) => event.success) diff --git a/packages/txm/lib/telemetry/metrics.ts b/packages/txm/lib/telemetry/metrics.ts index a504c2c54a..23c44d05af 100644 --- a/packages/txm/lib/telemetry/metrics.ts +++ b/packages/txm/lib/telemetry/metrics.ts @@ -274,9 +274,12 @@ export class TxmMetrics { /* RPC LIVENESS MONITOR METRICS */ private readonly rpcLivenessMonitorMeter = metrics.getMeter("txm.rpc-liveness-monitor") - public readonly rpcLivenessMonitorGauge = this.rpcLivenessMonitorMeter.createGauge("txm.rpc-liveness-monitor.is-alive", { - description: "Whether the RPC is alive" - }) + public readonly rpcLivenessMonitorGauge = this.rpcLivenessMonitorMeter.createGauge( + "txm.rpc-liveness-monitor.is-alive", + { + description: "Whether the RPC is alive", + }, + ) // Singleton instance private static instance: TxmMetrics From 7c9da011cf0196fe28c01ada3a8f40ad4bad5341 Mon Sep 17 00:00:00 2001 From: GabrielMartinezRodriguez Date: Wed, 26 Mar 2025 15:57:24 +0100 Subject: [PATCH 3/8] feat(txm): added livenessCheckInterval --- packages/txm/lib/HookManager.ts | 4 +--- packages/txm/lib/RpcLivenessMonitor.ts | 2 +- packages/txm/lib/TransactionManager.ts | 11 +++++++++++ 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/packages/txm/lib/HookManager.ts b/packages/txm/lib/HookManager.ts index 406f916e87..f8e155f9dc 100644 --- a/packages/txm/lib/HookManager.ts +++ b/packages/txm/lib/HookManager.ts @@ -171,9 +171,7 @@ export class HookManager { private async onRpcIsDown(): Promise { this.hooks[TxmHookType.RpcIsDown].forEach((handler) => handler()) - this.hooks[TxmHookType.All].forEach((handler) => - handler({ type: TxmHookType.RpcIsDown }), - ) + this.hooks[TxmHookType.All].forEach((handler) => handler({ type: TxmHookType.RpcIsDown })) } private async onRpcIsUp(): Promise { diff --git a/packages/txm/lib/RpcLivenessMonitor.ts b/packages/txm/lib/RpcLivenessMonitor.ts index 9f4067339c..b9712993af 100644 --- a/packages/txm/lib/RpcLivenessMonitor.ts +++ b/packages/txm/lib/RpcLivenessMonitor.ts @@ -61,7 +61,7 @@ export class RpcLivenessMonitor { this.checkIfHealthyInterval = setInterval(() => { this.checkIfHealthy() - }, 2000) + }, this.txmgr.livenessCheckInterval) } } diff --git a/packages/txm/lib/TransactionManager.ts b/packages/txm/lib/TransactionManager.ts index bd009a19a0..3232acdefd 100644 --- a/packages/txm/lib/TransactionManager.ts +++ b/packages/txm/lib/TransactionManager.ts @@ -116,6 +116,15 @@ export type TransactionManagerConfig = { * @unit milliseconds */ livenessDownDelay?: number + + /** + * The interval between health check attempts for the RPC. + * When unhealthy, the system will send chainId requests at this interval + * until either the RPC recovers or the connection is terminated. + * @default 2000 (2 seconds) + * @unit milliseconds + */ + livenessCheckInterval?: number } /** The private key of the account used for signing transactions. */ privateKey: Hex @@ -245,6 +254,7 @@ export class TransactionManager { public readonly livenessPingInterval: number public readonly livenessSuccessCount: number public readonly livenessDownDelay: number + public readonly livenessCheckInterval: number constructor(_config: TransactionManagerConfig) { initializeTelemetry({ @@ -361,6 +371,7 @@ export class TransactionManager { this.livenessSuccessCount = _config.rpc.livenessSuccessCount ?? 3 this.livenessPingInterval = _config.rpc.livenessPingInterval ?? 2000 this.livenessDownDelay = _config.rpc.livenessDownDelay ?? 5000 + this.livenessCheckInterval = _config.rpc.livenessCheckInterval ?? 2000 } /** From 032d0c0071c5a9129b27c0698caa8ea2ef16efa2 Mon Sep 17 00:00:00 2001 From: GabrielMartinezRodriguez Date: Thu, 3 Apr 2025 13:34:46 +0200 Subject: [PATCH 4/8] chore(txm): pr review --- packages/txm/lib/BlockMonitor.ts | 4 ++-- packages/txm/lib/NonceManager.ts | 4 ++-- packages/txm/lib/RpcLivenessMonitor.ts | 4 ++-- packages/txm/lib/TransactionManager.ts | 11 ----------- packages/txm/lib/TransactionSubmitter.ts | 4 ++-- packages/txm/lib/TxMonitor.ts | 6 +++--- 6 files changed, 11 insertions(+), 22 deletions(-) diff --git a/packages/txm/lib/BlockMonitor.ts b/packages/txm/lib/BlockMonitor.ts index c0fb27eca7..542ce78eb2 100644 --- a/packages/txm/lib/BlockMonitor.ts +++ b/packages/txm/lib/BlockMonitor.ts @@ -41,7 +41,7 @@ export class BlockMonitor { TxmMetrics.getInstance().currentBlockGauge.record(Number(block.number)) TxmMetrics.getInstance().newBlockDelayHistogram.record(Date.now() - Number(block.timestamp) * 1000) - this.txmgr.rpcLivenessMonitor.onSuccess() + this.txmgr.rpcLivenessMonitor.trackSuccess() this.scheduleTimeout() } @@ -55,7 +55,7 @@ export class BlockMonitor { private resetBlockSubscription() { TxmMetrics.getInstance().resetBlockMonitorCounter.add(1) - this.txmgr.rpcLivenessMonitor.onFailure() + this.txmgr.rpcLivenessMonitor.trackError() if (this.unwatch) { this.unwatch() } diff --git a/packages/txm/lib/NonceManager.ts b/packages/txm/lib/NonceManager.ts index 2484e9fd1f..62c684b510 100644 --- a/packages/txm/lib/NonceManager.ts +++ b/packages/txm/lib/NonceManager.ts @@ -110,11 +110,11 @@ export class NonceManager { Logger.instance.error(LogTag.TXM, `Failed to get transaction count for address ${address}`, { error: blockchainNonceResult.error, }) - this.txmgr.rpcLivenessMonitor.onFailure() + this.txmgr.rpcLivenessMonitor.trackError() return } - this.txmgr.rpcLivenessMonitor.onSuccess() + this.txmgr.rpcLivenessMonitor.trackSuccess() this.maxExecutedNonce = blockchainNonceResult.value - 1 } diff --git a/packages/txm/lib/RpcLivenessMonitor.ts b/packages/txm/lib/RpcLivenessMonitor.ts index b9712993af..86c175a8ed 100644 --- a/packages/txm/lib/RpcLivenessMonitor.ts +++ b/packages/txm/lib/RpcLivenessMonitor.ts @@ -27,7 +27,7 @@ export class RpcLivenessMonitor { TxmMetrics.getInstance().rpcLivenessMonitorGauge.record(this.isAlive ? 1 : 0) } - onSuccess() { + trackSuccess() { this.events.push({ occurredAt: new Date(), success: true, @@ -35,7 +35,7 @@ export class RpcLivenessMonitor { this.checkIfDown() } - onFailure() { + trackError() { this.events.push({ occurredAt: new Date(), success: false, diff --git a/packages/txm/lib/TransactionManager.ts b/packages/txm/lib/TransactionManager.ts index 3232acdefd..f01b003a50 100644 --- a/packages/txm/lib/TransactionManager.ts +++ b/packages/txm/lib/TransactionManager.ts @@ -101,15 +101,6 @@ export type TransactionManagerConfig = { */ livenessSuccessCount?: number - /** - * The interval between health check attempts for the RPC. - * When unhealthy, the system will send chainId requests at this interval - * until either the RPC recovers or the connection is terminated. - * @default 2000 (2 seconds) - * @unit milliseconds - */ - livenessPingInterval?: number - /** * Margin of time after the RPC is marked as unhealthy before the txm starts checking if it is healthy again. * @default 5000 (5 seconds) @@ -251,7 +242,6 @@ export class TransactionManager { public readonly blockInactivityTimeout: number public readonly livenessWindow: number public readonly livenessThreshold: number - public readonly livenessPingInterval: number public readonly livenessSuccessCount: number public readonly livenessDownDelay: number public readonly livenessCheckInterval: number @@ -369,7 +359,6 @@ export class TransactionManager { this.livenessWindow = _config.rpc.livenessWindow ?? 10000 this.livenessThreshold = _config.rpc.livenessThreshold ?? 0.85 this.livenessSuccessCount = _config.rpc.livenessSuccessCount ?? 3 - this.livenessPingInterval = _config.rpc.livenessPingInterval ?? 2000 this.livenessDownDelay = _config.rpc.livenessDownDelay ?? 5000 this.livenessCheckInterval = _config.rpc.livenessCheckInterval ?? 2000 } diff --git a/packages/txm/lib/TransactionSubmitter.ts b/packages/txm/lib/TransactionSubmitter.ts index 9f3b031aea..7c07e8ccaf 100644 --- a/packages/txm/lib/TransactionSubmitter.ts +++ b/packages/txm/lib/TransactionSubmitter.ts @@ -156,7 +156,7 @@ export class TransactionSubmitter { this.txmgr.nonceManager.resync() } - this.txmgr.rpcLivenessMonitor.onFailure() + this.txmgr.rpcLivenessMonitor.trackError() return err({ cause: AttemptSubmissionErrorCause.FailedToSendRawTransaction, @@ -165,7 +165,7 @@ export class TransactionSubmitter { }) } - this.txmgr.rpcLivenessMonitor.onSuccess() + this.txmgr.rpcLivenessMonitor.trackSuccess() return ok(undefined) } diff --git a/packages/txm/lib/TxMonitor.ts b/packages/txm/lib/TxMonitor.ts index fab272a840..67220416b9 100644 --- a/packages/txm/lib/TxMonitor.ts +++ b/packages/txm/lib/TxMonitor.ts @@ -103,15 +103,15 @@ export class TxMonitor { isResolved = true resolve(attemptWithReceipt) } - this.transactionManager.rpcLivenessMonitor.onSuccess() + this.transactionManager.rpcLivenessMonitor.trackSuccess() return ok(attemptWithReceipt) } if (receiptResult.error instanceof TransactionReceiptNotFoundError) { - this.transactionManager.rpcLivenessMonitor.onSuccess() + this.transactionManager.rpcLivenessMonitor.trackSuccess() return ok(null) } - this.transactionManager.rpcLivenessMonitor.onFailure() + this.transactionManager.rpcLivenessMonitor.trackError() return err(receiptResult.error) }, ) From 325dccaeb5c6f5e8ec0901f0a306fd994453433f Mon Sep 17 00:00:00 2001 From: GabrielMartinezRodriguez Date: Mon, 7 Apr 2025 15:22:05 +0200 Subject: [PATCH 5/8] chore(txm): added test for RpcLivenessMonitor --- packages/txm/test/txm.test.ts | 106 ++++++++++++++++++++++++---------- packages/txm/vitest.config.ts | 2 +- 2 files changed, 76 insertions(+), 32 deletions(-) diff --git a/packages/txm/test/txm.test.ts b/packages/txm/test/txm.test.ts index 18b07a27dd..4f71550eb0 100644 --- a/packages/txm/test/txm.test.ts +++ b/packages/txm/test/txm.test.ts @@ -675,52 +675,96 @@ test("Transaction succeeds in congested blocks", async () => { expect(executedIncrementerTransaction.collectionBlock).toBe(previousBlock.number! + 1n) }) -test( - "Finalized transactions are automatically purged from db after finalizedTransactionPurgeTime elapses", - async () => { - const previousFinalizedTransactionPurgeTime = txm.finalizedTransactionPurgeTime +test("Finalized transactions are automatically purged from db after finalizedTransactionPurgeTime elapses", async () => { + const previousFinalizedTransactionPurgeTime = txm.finalizedTransactionPurgeTime - const mockedFinalizedTransactionPurgeTime = 6000 + const mockedFinalizedTransactionPurgeTime = 6000 - Object.defineProperty(txm, "finalizedTransactionPurgeTime", { - value: mockedFinalizedTransactionPurgeTime, - configurable: true, - }) + Object.defineProperty(txm, "finalizedTransactionPurgeTime", { + value: mockedFinalizedTransactionPurgeTime, + configurable: true, + }) - const transaction = await createCounterTransaction() + const transaction = await createCounterTransaction() - transactionQueue.push(transaction) + transactionQueue.push(transaction) - await mineBlock(2) + await mineBlock(2) + + const transactionPersisted = await getPersistedTransaction(transaction.intentId) + + if (!assertIsDefined(transactionPersisted)) return + expect(transactionPersisted.status).toBe(TransactionStatus.Success) + + const updatedAt = transactionPersisted.updatedAt + const purgeTime = updatedAt + mockedFinalizedTransactionPurgeTime + + while (Date.now() < purgeTime) { const transactionPersisted = await getPersistedTransaction(transaction.intentId) - if (!assertIsDefined(transactionPersisted)) return + expect(transactionPersisted).toBeDefined() + expect(transactionPersisted?.status).toBe(TransactionStatus.Success) - expect(transactionPersisted.status).toBe(TransactionStatus.Success) + await mineBlock() + } - const updatedAt = transactionPersisted.updatedAt - const purgeTime = updatedAt + mockedFinalizedTransactionPurgeTime + await mineBlock() - while (Date.now() < purgeTime) { - const transactionPersisted = await getPersistedTransaction(transaction.intentId) + const persistedTransaction = await getPersistedTransaction(transaction.intentId) - expect(transactionPersisted).toBeDefined() - expect(transactionPersisted?.status).toBe(TransactionStatus.Success) + expect(persistedTransaction).toBeUndefined() - await mineBlock() - } + Object.defineProperty(txm, "finalizedTransactionPurgeTime", { + value: previousFinalizedTransactionPurgeTime, + configurable: true, + }) +}) + +test("RPC liveness monitor works correctly", async () => { + proxyServer.setMode(ProxyMode.Random, { + [ProxyBehavior.NotAnswer]: 0, + [ProxyBehavior.Fail]: 0.5, + [ProxyBehavior.Forward]: 0.5, + }) + + let isDownHookTriggered = false + let isUpHookTriggered = false + + const cleanIsDownHook = await txm.addHook(TxmHookType.RpcIsDown, () => { + isDownHookTriggered = true + }) + + const cleanIsUpHook = await txm.addHook(TxmHookType.RpcIsUp, () => { + isUpHookTriggered = true + }) + + expect(txm.rpcLivenessMonitor.isAlive).toBe(true) + + while (txm.rpcLivenessMonitor.isAlive) { + const transaction = await createCounterTransaction() + + transactionQueue.push(transaction) await mineBlock() + } - const persistedTransaction = await getPersistedTransaction(transaction.intentId) + expect(isDownHookTriggered).toBe(true) + expect(txm.rpcLivenessMonitor.isAlive).toBe(false) - expect(persistedTransaction).toBeUndefined() + proxyServer.setMode(ProxyMode.Deterministic) - Object.defineProperty(txm, "finalizedTransactionPurgeTime", { - value: previousFinalizedTransactionPurgeTime, - configurable: true, - }) - }, - { timeout: 20000 }, -) + while (!txm.rpcLivenessMonitor.isAlive) { + const transaction = await createCounterTransaction() + + transactionQueue.push(transaction) + + await mineBlock() + } + + expect(isUpHookTriggered).toBe(true) + expect(txm.rpcLivenessMonitor.isAlive).toBe(true) + + cleanIsDownHook() + cleanIsUpHook() +}) diff --git a/packages/txm/vitest.config.ts b/packages/txm/vitest.config.ts index d7b3c51876..c30eebecfa 100644 --- a/packages/txm/vitest.config.ts +++ b/packages/txm/vitest.config.ts @@ -5,7 +5,7 @@ export default defineConfig({ globals: true, environment: "node", setupFiles: "./vitest.setup.ts", - testTimeout: 10000, + testTimeout: 30000, hookTimeout: 30000, watch: false, }, From 6e8a7ede1fe31a3386e2e23b171351c687cf9cd0 Mon Sep 17 00:00:00 2001 From: GabrielMartinezRodriguez Date: Tue, 8 Apr 2025 14:44:25 +0200 Subject: [PATCH 6/8] chore(txm): increase hook timeout --- packages/txm/vitest.config.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/txm/vitest.config.ts b/packages/txm/vitest.config.ts index c30eebecfa..84478b1d09 100644 --- a/packages/txm/vitest.config.ts +++ b/packages/txm/vitest.config.ts @@ -5,8 +5,8 @@ export default defineConfig({ globals: true, environment: "node", setupFiles: "./vitest.setup.ts", - testTimeout: 30000, - hookTimeout: 30000, + testTimeout: 60000, + hookTimeout: 60000, watch: false, }, }) From b2f9c570f335c39036f39c79bd3e088846ada202 Mon Sep 17 00:00:00 2001 From: GabrielMartinezRodriguez Date: Tue, 8 Apr 2025 15:01:14 +0200 Subject: [PATCH 7/8] chore(txm): fix tests --- packages/txm/test/txm.test.ts | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/packages/txm/test/txm.test.ts b/packages/txm/test/txm.test.ts index 4f71550eb0..7828bf1c26 100644 --- a/packages/txm/test/txm.test.ts +++ b/packages/txm/test/txm.test.ts @@ -722,6 +722,16 @@ test("Finalized transactions are automatically purged from db after finalizedTra }) test("RPC liveness monitor works correctly", async () => { + proxyServer.setMode(ProxyMode.Deterministic) + + while (!txm.rpcLivenessMonitor.isAlive) { + const transaction = await createCounterTransaction() + + transactionQueue.push(transaction) + + await mineBlock() + } + proxyServer.setMode(ProxyMode.Random, { [ProxyBehavior.NotAnswer]: 0, [ProxyBehavior.Fail]: 0.5, @@ -739,6 +749,8 @@ test("RPC liveness monitor works correctly", async () => { isUpHookTriggered = true }) + + expect(txm.rpcLivenessMonitor.isAlive).toBe(true) while (txm.rpcLivenessMonitor.isAlive) { From c8848153b122316d8211f3bff0970f51a3036993 Mon Sep 17 00:00:00 2001 From: GabrielMartinezRodriguez Date: Thu, 10 Apr 2025 11:44:18 +0200 Subject: [PATCH 8/8] chore(txm): implemented a counter for every second in the rpc liveness monitor --- packages/txm/lib/RpcLivenessMonitor.ts | 73 +++++++++++++++++--------- packages/txm/test/txm.test.ts | 32 +++++++++-- 2 files changed, 76 insertions(+), 29 deletions(-) diff --git a/packages/txm/lib/RpcLivenessMonitor.ts b/packages/txm/lib/RpcLivenessMonitor.ts index 86c175a8ed..c068f4b571 100644 --- a/packages/txm/lib/RpcLivenessMonitor.ts +++ b/packages/txm/lib/RpcLivenessMonitor.ts @@ -4,14 +4,14 @@ import { eventBus } from "./EventBus" import type { TransactionManager } from "./TransactionManager" import { TxmMetrics } from "./telemetry/metrics" -interface LivenessEvent { - occurredAt: Date - success: boolean +interface SecondCounters { + successCount: number + errorCount: number } export class RpcLivenessMonitor { private txmgr: TransactionManager - private events: LivenessEvent[] + private counters: Record public isAlive: boolean private isDownSince: Date | null private checkIfHealthyInterval: NodeJS.Timer | null @@ -19,35 +19,54 @@ export class RpcLivenessMonitor { constructor(txmgr: TransactionManager) { this.txmgr = txmgr - this.events = [] + this.counters = {} this.isAlive = true this.isDownSince = null this.checkIfHealthyInterval = null this.consecutiveSuccessesWhileCheckingIfHealthy = 0 TxmMetrics.getInstance().rpcLivenessMonitorGauge.record(this.isAlive ? 1 : 0) + + setInterval(() => { + this.cleanOldCounters() + }, 1000) } - trackSuccess() { - this.events.push({ - occurredAt: new Date(), - success: true, - }) - this.checkIfDown() + private getCurrentSecond(): number { + return Math.floor(Date.now() / 1000) } - trackError() { - this.events.push({ - occurredAt: new Date(), - success: false, + private cleanOldCounters(): void { + const now = this.getCurrentSecond() + const oldestAllowedSecond = now - Math.floor(this.txmgr.livenessWindow / 1000) + + Object.keys(this.counters).forEach((secondStr) => { + const second = Number.parseInt(secondStr, 10) + if (second < oldestAllowedSecond) { + delete this.counters[second] + } }) + } + + trackSuccess() { + const currentSecond = this.getCurrentSecond() + + if (!this.counters[currentSecond]) { + this.counters[currentSecond] = { successCount: 0, errorCount: 0 } + } + + this.counters[currentSecond].successCount++ this.checkIfDown() } - cleanOldEvents() { - const now = new Date() - this.events = this.events.filter((event) => { - return now.getTime() - event.occurredAt.getTime() < this.txmgr.livenessWindow - }) + trackError() { + const currentSecond = this.getCurrentSecond() + + if (!this.counters[currentSecond]) { + this.counters[currentSecond] = { successCount: 0, errorCount: 0 } + } + + this.counters[currentSecond].errorCount++ + this.checkIfDown() } private checkIfDown() { @@ -83,7 +102,7 @@ export class RpcLivenessMonitor { this.isAlive = true this.isDownSince = null this.consecutiveSuccessesWhileCheckingIfHealthy = 0 - this.events = [] + this.counters = {} this.updateLivenessMetrics() eventBus.emit(Topics.RpcIsUp) if (this.checkIfHealthyInterval) { @@ -97,8 +116,14 @@ export class RpcLivenessMonitor { } ratioOfSuccess() { - this.cleanOldEvents() - const successEvents = this.events.filter((event) => event.success) - return successEvents.length / this.events.length + let totalSuccesses = 0 + let totalEvents = 0 + + Object.values(this.counters).forEach((counter) => { + totalSuccesses += counter.successCount + totalEvents += counter.successCount + counter.errorCount + }) + + return totalEvents > 0 ? totalSuccesses / totalEvents : 1 } } diff --git a/packages/txm/test/txm.test.ts b/packages/txm/test/txm.test.ts index 7828bf1c26..d3913881b1 100644 --- a/packages/txm/test/txm.test.ts +++ b/packages/txm/test/txm.test.ts @@ -38,6 +38,8 @@ const txm = new TransactionManager({ url: PROXY_URL, pollingInterval: 200, allowDebug: true, + livenessCheckInterval: 500, + livenessDownDelay: 1000, }, abis: abis, gasEstimator: new TestGasEstimator(), @@ -576,10 +578,16 @@ test("Correctly calculates baseFeePerGas after a block with high gas usage", asy }) test("Transaction manager successfully processes transactions despite random RPC failures", async () => { + const previousLivenessThreshold = txm.livenessThreshold + Object.defineProperty(txm, "livenessThreshold", { + value: 0, + configurable: true, + }) + proxyServer.setMode(ProxyMode.Random, { - [ProxyBehavior.NotAnswer]: 0.1, - [ProxyBehavior.Fail]: 0.2, - [ProxyBehavior.Forward]: 0.7, + [ProxyBehavior.NotAnswer]: 0.05, + [ProxyBehavior.Fail]: 0.05, + [ProxyBehavior.Forward]: 0.9, }) const previousBlock = await getCurrentBlock() @@ -619,6 +627,11 @@ test("Transaction manager successfully processes transactions despite random RPC expect(successfulTransactions).toBeGreaterThan(numTransactions * 0.6) proxyServer.setMode(ProxyMode.Deterministic) + + Object.defineProperty(txm, "livenessThreshold", { + value: previousLivenessThreshold, + configurable: true, + }) }) test("Transaction succeeds in congested blocks", async () => { @@ -722,6 +735,12 @@ test("Finalized transactions are automatically purged from db after finalizedTra }) test("RPC liveness monitor works correctly", async () => { + const previousLivenessWindow = txm.livenessWindow + Object.defineProperty(txm, "livenessWindow", { + value: 2000, + configurable: true, + }) + proxyServer.setMode(ProxyMode.Deterministic) while (!txm.rpcLivenessMonitor.isAlive) { @@ -749,8 +768,6 @@ test("RPC liveness monitor works correctly", async () => { isUpHookTriggered = true }) - - expect(txm.rpcLivenessMonitor.isAlive).toBe(true) while (txm.rpcLivenessMonitor.isAlive) { @@ -779,4 +796,9 @@ test("RPC liveness monitor works correctly", async () => { cleanIsDownHook() cleanIsUpHook() + + Object.defineProperty(txm, "livenessWindow", { + value: previousLivenessWindow, + configurable: true, + }) })