Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/txm/lib/BlockMonitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ export class BlockMonitor {
TxmMetrics.getInstance().currentBlockGauge.record(Number(block.number))
TxmMetrics.getInstance().newBlockDelayHistogram.record(Date.now() - Number(block.timestamp) * 1000)

this.txmgr.rpcLivenessMonitor.trackSuccess()

this.scheduleTimeout()
}

Expand All @@ -53,6 +55,7 @@ export class BlockMonitor {

private resetBlockSubscription() {
TxmMetrics.getInstance().resetBlockMonitorCounter.add(1)
this.txmgr.rpcLivenessMonitor.trackError()
if (this.unwatch) {
this.unwatch()
}
Expand Down
2 changes: 2 additions & 0 deletions packages/txm/lib/EventBus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ export enum Topics {
TransactionStatusChanged = "TransactionStatusChanged",
TransactionSaveFailed = "TransactionSaveFailed",
TransactionSubmissionFailed = "TransactionSubmissionFailed",
RpcIsDown = "RpcIsDown",
RpcIsUp = "RpcIsUp",
}

export type EventBus = EventEmitter<Topics>
Expand Down
30 changes: 30 additions & 0 deletions packages/txm/lib/HookManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ export enum TxmHookType {
TransactionSaveFailed = "TransactionSaveFailed",
NewBlock = "NewBlock",
TransactionSubmissionFailed = "TransactionSubmissionFailed",
RpcIsDown = "RpcIsDown",
RpcIsUp = "RpcIsUp",
}

export type TxmTransactionStatusChangedHookPayload = {
Expand All @@ -33,11 +35,21 @@ export type TxmTransactionSubmissionFailedHookPayload = {
cause: AttemptSubmissionErrorCause
}

export type TxmRpcIsDownHookPayload = {
type: TxmHookType.RpcIsDown
}

export type TxmRpcIsUpHookPayload = {
type: TxmHookType.RpcIsUp
}

export type TxmHookPayload =
| TxmTransactionStatusChangedHookPayload
| TxmNewBlockHookPayload
| TxmTransactionSaveFailedHookPayload
| TxmTransactionSubmissionFailedHookPayload
| TxmRpcIsDownHookPayload
| TxmRpcIsUpHookPayload

export type TxmHooksRecord = {
[TxmHookType.All]: ((event: TxmHookPayload) => void)[]
Expand All @@ -49,6 +61,8 @@ export type TxmHooksRecord = {
description: string,
cause: AttemptSubmissionErrorCause,
) => void)[]
[TxmHookType.RpcIsDown]: (() => void)[]
[TxmHookType.RpcIsUp]: (() => void)[]
}

export type TxmHookHandler<T extends TxmHookType = TxmHookType.All> = TxmHooksRecord[T][number]
Expand All @@ -75,11 +89,15 @@ export class HookManager {
[TxmHookType.TransactionSaveFailed]: [],
[TxmHookType.NewBlock]: [],
[TxmHookType.TransactionSubmissionFailed]: [],
[TxmHookType.RpcIsDown]: [],
[TxmHookType.RpcIsUp]: [],
}
eventBus.on(Topics.TransactionStatusChanged, this.onTransactionStatusChanged.bind(this))
eventBus.on(Topics.TransactionSaveFailed, this.onTransactionSaveFailed.bind(this))
eventBus.on(Topics.NewBlock, this.onNewBlock.bind(this))
eventBus.on(Topics.TransactionSubmissionFailed, this.onTransactionSubmissionFailed.bind(this))
eventBus.on(Topics.RpcIsDown, this.onRpcIsDown.bind(this))
eventBus.on(Topics.RpcIsUp, this.onRpcIsUp.bind(this))
}

public addHook<T extends TxmHookType>(type: T, handler: TxmHookHandler<T>): () => void {
Expand Down Expand Up @@ -149,4 +167,16 @@ export class HookManager {
}),
)
}

private async onRpcIsDown(): Promise<void> {
this.hooks[TxmHookType.RpcIsDown].forEach((handler) => handler())

this.hooks[TxmHookType.All].forEach((handler) => handler({ type: TxmHookType.RpcIsDown }))
}

private async onRpcIsUp(): Promise<void> {
this.hooks[TxmHookType.RpcIsUp].forEach((handler) => handler())

this.hooks[TxmHookType.All].forEach((handler) => handler({ type: TxmHookType.RpcIsUp }))
}
}
3 changes: 3 additions & 0 deletions packages/txm/lib/NonceManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,12 @@ export class NonceManager {
Logger.instance.error(LogTag.TXM, `Failed to get transaction count for address ${address}`, {
error: blockchainNonceResult.error,
})
this.txmgr.rpcLivenessMonitor.trackError()
return
}

this.txmgr.rpcLivenessMonitor.trackSuccess()

this.maxExecutedNonce = blockchainNonceResult.value - 1
}
}
129 changes: 129 additions & 0 deletions packages/txm/lib/RpcLivenessMonitor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import { LogTag, Logger } from "@happy.tech/common"
import { Topics } from "./EventBus"
import { eventBus } from "./EventBus"
import type { TransactionManager } from "./TransactionManager"
import { TxmMetrics } from "./telemetry/metrics"

interface SecondCounters {
successCount: number
errorCount: number
}

export class RpcLivenessMonitor {
private txmgr: TransactionManager
private counters: Record<number, SecondCounters>
public isAlive: boolean
private isDownSince: Date | null
private checkIfHealthyInterval: NodeJS.Timer | null
private consecutiveSuccessesWhileCheckingIfHealthy: number

constructor(txmgr: TransactionManager) {
this.txmgr = txmgr
this.counters = {}
this.isAlive = true
this.isDownSince = null
this.checkIfHealthyInterval = null
this.consecutiveSuccessesWhileCheckingIfHealthy = 0
TxmMetrics.getInstance().rpcLivenessMonitorGauge.record(this.isAlive ? 1 : 0)

setInterval(() => {
this.cleanOldCounters()
}, 1000)
}

private getCurrentSecond(): number {
return Math.floor(Date.now() / 1000)
}

private cleanOldCounters(): void {
const now = this.getCurrentSecond()
const oldestAllowedSecond = now - Math.floor(this.txmgr.livenessWindow / 1000)

Object.keys(this.counters).forEach((secondStr) => {
const second = Number.parseInt(secondStr, 10)
if (second < oldestAllowedSecond) {
delete this.counters[second]
}
})
}

trackSuccess() {
const currentSecond = this.getCurrentSecond()

if (!this.counters[currentSecond]) {
this.counters[currentSecond] = { successCount: 0, errorCount: 0 }
}

this.counters[currentSecond].successCount++
this.checkIfDown()
}

trackError() {
const currentSecond = this.getCurrentSecond()

if (!this.counters[currentSecond]) {
this.counters[currentSecond] = { successCount: 0, errorCount: 0 }
}

this.counters[currentSecond].errorCount++
this.checkIfDown()
}

private checkIfDown() {
if (this.isAlive && this.ratioOfSuccess() < this.txmgr.livenessThreshold) {
this.isAlive = false
this.isDownSince = new Date()
this.consecutiveSuccessesWhileCheckingIfHealthy = 0
this.updateLivenessMetrics()
eventBus.emit(Topics.RpcIsDown)
Logger.instance.error(LogTag.TXM, "Detected that the RPC is not healthy")

this.checkIfHealthyInterval = setInterval(() => {
this.checkIfHealthy()
}, this.txmgr.livenessCheckInterval)
}
}

private async checkIfHealthy() {
if (this.isDownSince && this.isDownSince.getTime() + this.txmgr.livenessDownDelay > new Date().getTime()) {
return
}

const chainIdResult = await this.txmgr.viemClient.safeGetChainId()

if (chainIdResult.isOk()) {
this.consecutiveSuccessesWhileCheckingIfHealthy++
} else {
this.consecutiveSuccessesWhileCheckingIfHealthy = 0
}

if (this.consecutiveSuccessesWhileCheckingIfHealthy > this.txmgr.livenessSuccessCount) {
Logger.instance.info(LogTag.TXM, "Detected that the RPC is healthy")
this.isAlive = true
this.isDownSince = null
this.consecutiveSuccessesWhileCheckingIfHealthy = 0
this.counters = {}
this.updateLivenessMetrics()
eventBus.emit(Topics.RpcIsUp)
if (this.checkIfHealthyInterval) {
clearInterval(this.checkIfHealthyInterval)
}
}
}

updateLivenessMetrics() {
TxmMetrics.getInstance().rpcLivenessMonitorGauge.record(this.isAlive ? 1 : 0)
}

ratioOfSuccess() {
let totalSuccesses = 0
let totalEvents = 0

Object.values(this.counters).forEach((counter) => {
totalSuccesses += counter.successCount
totalEvents += counter.successCount + counter.errorCount
})

return totalEvents > 0 ? totalSuccesses / totalEvents : 1
}
}
5 changes: 5 additions & 0 deletions packages/txm/lib/TransactionCollector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ export class TransactionCollector {

TxmMetrics.getInstance().transactionCollectedCounter.add(transactionsBatch.length)

if (!this.txmgr.rpcLivenessMonitor.isAlive) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice to add a unit test for this feature

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think it's a good thing to test. I'm going to implement one

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test added

Logger.instance.error(LogTag.TXM, "RPC is not alive, skipping attempt to submit transactions")
return
}

await Promise.all(
transactionsBatch.map(async (transaction) => {
const nonce = this.txmgr.nonceManager.requestNonce()
Expand Down
54 changes: 54 additions & 0 deletions packages/txm/lib/TransactionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { GasPriceOracle } from "./GasPriceOracle.js"
import { HookManager, type TxmHookHandler, type TxmHookType } from "./HookManager.js"
import { NonceManager } from "./NonceManager.js"
import { DefaultRetryPolicyManager, type RetryPolicyManager } from "./RetryPolicyManager.js"
import { RpcLivenessMonitor } from "./RpcLivenessMonitor"
import { Transaction, type TransactionConstructorConfig } from "./Transaction.js"
import { TransactionCollector } from "./TransactionCollector.js"
import { TransactionRepository } from "./TransactionRepository.js"
Expand Down Expand Up @@ -75,6 +76,46 @@ export type TransactionManagerConfig = {
* Defaults to 4000 milliseconds.
*/
blockInactivityTimeout?: number

/**
* The minimum success rate of RPC calls required to consider the RPC healthy.
* Expressed as a decimal between 0 and 1.
* Example: 0.85 means 85% of calls must be successful.
* @default 0.85
*/
livenessThreshold?: number

/**
* The monitoring window duration for evaluating RPC health.
* The success rate is calculated based on RPC calls within this time frame.
* @default 10000 (10 seconds)
* @unit milliseconds
*/
livenessWindow?: number

/**
* Number of successful consecutive chainId requests required to mark the RPC healthy again.
* When marked unhealthy, the system will periodically send chainId requests
* to check if the RPC has recovered.
* @default 3
*/
livenessSuccessCount?: number
Comment thread
aodhgan marked this conversation as resolved.

/**
* Margin of time after the RPC is marked as unhealthy before the txm starts checking if it is healthy again.
* @default 5000 (5 seconds)
* @unit milliseconds
*/
livenessDownDelay?: number

/**
* The interval between health check attempts for the RPC.
* When unhealthy, the system will send chainId requests at this interval
* until either the RPC recovers or the connection is terminated.
* @default 2000 (2 seconds)
* @unit milliseconds
*/
livenessCheckInterval?: number
}
/** The private key of the account used for signing transactions. */
privateKey: Hex
Expand Down Expand Up @@ -187,6 +228,7 @@ export class TransactionManager {
public readonly transactionSubmitter: TransactionSubmitter
public readonly hookManager: HookManager
public readonly retryPolicyManager: RetryPolicyManager
public readonly rpcLivenessMonitor: RpcLivenessMonitor

public readonly chainId: number
public readonly eip1559: EIP1559Parameters
Expand All @@ -198,6 +240,11 @@ export class TransactionManager {
public readonly pollingInterval: number
public readonly transportProtocol: "http" | "websocket"
public readonly blockInactivityTimeout: number
public readonly livenessWindow: number
public readonly livenessThreshold: number
public readonly livenessSuccessCount: number
public readonly livenessDownDelay: number
public readonly livenessCheckInterval: number

constructor(_config: TransactionManagerConfig) {
initializeTelemetry({
Expand Down Expand Up @@ -293,6 +340,7 @@ export class TransactionManager {
this.transactionSubmitter = new TransactionSubmitter(this)
this.hookManager = new HookManager()
this.retryPolicyManager = _config.retryPolicyManager ?? new DefaultRetryPolicyManager()
this.rpcLivenessMonitor = new RpcLivenessMonitor(this)

this.chainId = _config.chainId
this.eip1559 = _config.eip1559 ?? opStackDefaultEIP1559Parameters
Expand All @@ -307,6 +355,12 @@ export class TransactionManager {

this.pollingInterval = _config.rpc.pollingInterval ?? (Number(this.blockTime) * 1000) / 2
this.blockInactivityTimeout = _config.rpc.blockInactivityTimeout ?? 4000

this.livenessWindow = _config.rpc.livenessWindow ?? 10000
this.livenessThreshold = _config.rpc.livenessThreshold ?? 0.85
this.livenessSuccessCount = _config.rpc.livenessSuccessCount ?? 3
this.livenessDownDelay = _config.rpc.livenessDownDelay ?? 5000
this.livenessCheckInterval = _config.rpc.livenessCheckInterval ?? 2000
}

/**
Expand Down
5 changes: 5 additions & 0 deletions packages/txm/lib/TransactionSubmitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,18 @@ export class TransactionSubmitter {
) {
this.txmgr.nonceManager.resync()
}

this.txmgr.rpcLivenessMonitor.trackError()

return err({
cause: AttemptSubmissionErrorCause.FailedToSendRawTransaction,
description: `Failed to send raw transaction ${transaction.intentId}. Details: ${sendRawTransactionResult.error}`,
flushed: true,
})
}

this.txmgr.rpcLivenessMonitor.trackSuccess()

return ok(undefined)
}
}
9 changes: 9 additions & 0 deletions packages/txm/lib/TxMonitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ export class TxMonitor {
}

private async handleNewBlock(block: LatestBlock) {
if (!this.transactionManager.rpcLivenessMonitor.isAlive) {
Logger.instance.warn(LogTag.TXM, "RPC is not alive, skipping attempt to monitor transactions")
return
}

const transactions = this.transactionManager.transactionRepository.getNotFinalizedTransactionsOlderThan(
block.number,
)
Expand Down Expand Up @@ -98,11 +103,15 @@ export class TxMonitor {
isResolved = true
resolve(attemptWithReceipt)
}
this.transactionManager.rpcLivenessMonitor.trackSuccess()
return ok(attemptWithReceipt)
}
if (receiptResult.error instanceof TransactionReceiptNotFoundError) {
this.transactionManager.rpcLivenessMonitor.trackSuccess()
return ok(null)
}

this.transactionManager.rpcLivenessMonitor.trackError()
return err(receiptResult.error)
},
)
Expand Down
Loading