From 877811d9dcb82e674fe9b3cc106b8a28c139cedf Mon Sep 17 00:00:00 2001 From: Gabriel Martinez Rodriguez Date: Thu, 5 Dec 2024 16:28:43 +0100 Subject: [PATCH 1/4] feat(txm): flush mechanism --- .../transaction-manager/lib/Transaction.ts | 11 +++ .../lib/TransactionRepository.ts | 69 +++++++------------ .../lib/TransactionSubmitter.ts | 2 +- packages/transaction-manager/lib/TxMonitor.ts | 2 +- packages/transaction-manager/lib/db/types.ts | 1 + .../migrations/Migration20241205143000.js | 8 +++ 6 files changed, 48 insertions(+), 45 deletions(-) create mode 100644 packages/transaction-manager/migrations/Migration20241205143000.js diff --git a/packages/transaction-manager/lib/Transaction.ts b/packages/transaction-manager/lib/Transaction.ts index 015a552d08..7cc4c97db0 100644 --- a/packages/transaction-manager/lib/Transaction.ts +++ b/packages/transaction-manager/lib/Transaction.ts @@ -103,6 +103,8 @@ export class Transaction { updatedAt: Date + flushedAt?: Date + /** * Stores additional information for the transaction. * Enables originators to provide extra details, such as gas limits, which can be leveraged by customizable services. @@ -122,6 +124,7 @@ export class Transaction { attempts, createdAt, updatedAt, + flushedAt, metadata, }: TransactionConstructorConfig & { from: Address @@ -131,6 +134,7 @@ export class Transaction { attempts?: Attempt[] createdAt?: Date updatedAt?: Date + flushedAt?: Date }) { this.intentId = intentId ?? createUUID() this.from = from @@ -145,6 +149,7 @@ export class Transaction { this.createdAt = createdAt ?? new Date() this.updatedAt = updatedAt ?? new Date() this.metadata = metadata ?? {} + this.flushedAt = flushedAt } addAttempt(attempt: Attempt): void { @@ -186,6 +191,10 @@ export class Transaction { return this.attempts[this.attempts.length - 1] } + setFlushedAt(when: Date | undefined): void { + this.flushedAt = when + } + toDbRow(): Insertable { return { intentId: this.intentId, @@ -201,6 +210,7 @@ export class Transaction { metadata: this.metadata ? JSON.stringify(this.metadata, bigIntReplacer) : undefined, createdAt: this.createdAt.getTime(), updatedAt: this.updatedAt.getTime(), + flushedAt: this.flushedAt?.getTime(), } } @@ -212,6 +222,7 @@ export class Transaction { metadata: row.metadata ? JSON.parse(row.metadata, bigIntReviver) : undefined, createdAt: new Date(row.createdAt), updatedAt: new Date(row.updatedAt), + flushedAt: row.flushedAt ? new Date(row.flushedAt) : undefined, }) } } diff --git a/packages/transaction-manager/lib/TransactionRepository.ts b/packages/transaction-manager/lib/TransactionRepository.ts index 8f9e5bbc10..21e9e2a5ab 100644 --- a/packages/transaction-manager/lib/TransactionRepository.ts +++ b/packages/transaction-manager/lib/TransactionRepository.ts @@ -59,59 +59,42 @@ export class TransactionRepository { } async saveTransactions(transactions: Transaction[]): Promise> { - const result = await ResultAsync.fromPromise( - db - .insertInto("transaction") - .values(transactions.map((t) => t.toDbRow())) - .execute(), - unknownToError, - ) - - if (result.isOk()) { - for (const transaction of transactions) { - if (NotFinalizedStatuses.includes(transaction.status)) { - this.notFinalizedTransactions.push(transaction) - } - } - } - return result.map(() => undefined) - } - - async updateTransaction(transaction: Transaction): Promise> { - const result = await ResultAsync.fromPromise( - db - .updateTable("transaction") - .set(transaction.toDbRow()) - .where("intentId", "=", transaction.intentId) - .execute(), - unknownToError, + const transactionsToFlush = transactions.filter( + (t) => !t.flushedAt || t.flushedAt.getTime() <= t.updatedAt.getTime(), ) - this.notFinalizedTransactions = this.notFinalizedTransactions.filter((transaction) => - NotFinalizedStatuses.includes(transaction.status), - ) - - return result.map(() => undefined) - } + const oldFlushedAt = transactionsToFlush.map((t) => t.flushedAt) - async updateTransactions(transactions: Transaction[]): Promise> { const result = await ResultAsync.fromPromise( db.transaction().execute(async (dbTransaction) => { - const promises = transactions.map((t) => - dbTransaction - .updateTable("transaction") - .set(t.toDbRow()) - .where("intentId", "=", t.intentId) - .execute(), - ) + const promises = transactionsToFlush.map((t, index) => { + t.setFlushedAt(new Date()) + if (oldFlushedAt[index] === undefined) { + db.insertInto("transaction") + .values(transactions.map((t) => t.toDbRow())) + .execute() + } else { + dbTransaction + .updateTable("transaction") + .set(t.toDbRow()) + .where("intentId", "=", t.intentId) + .execute() + } + }) await Promise.all(promises) }), unknownToError, ) - this.notFinalizedTransactions = this.notFinalizedTransactions.filter((transaction) => - NotFinalizedStatuses.includes(transaction.status), - ) + if (result.isOk()) { + this.notFinalizedTransactions = this.notFinalizedTransactions.filter((transaction) => + NotFinalizedStatuses.includes(transaction.status), + ) + } else { + for (let i = 0; i < transactions.length; i++) { + transactions[i].setFlushedAt(oldFlushedAt[i]) + } + } return result } diff --git a/packages/transaction-manager/lib/TransactionSubmitter.ts b/packages/transaction-manager/lib/TransactionSubmitter.ts index 9950741e4b..8974121d96 100644 --- a/packages/transaction-manager/lib/TransactionSubmitter.ts +++ b/packages/transaction-manager/lib/TransactionSubmitter.ts @@ -119,7 +119,7 @@ export class TransactionSubmitter { gas: transactionRequest.gas, }) - const updateResult = await this.txmgr.transactionRepository.updateTransaction(transaction) + const updateResult = await this.txmgr.transactionRepository.saveTransactions([transaction]) if (updateResult.isErr()) { transaction.removeAttempt(hash) diff --git a/packages/transaction-manager/lib/TxMonitor.ts b/packages/transaction-manager/lib/TxMonitor.ts index c6de658312..0d68ea205f 100644 --- a/packages/transaction-manager/lib/TxMonitor.ts +++ b/packages/transaction-manager/lib/TxMonitor.ts @@ -150,7 +150,7 @@ export class TxMonitor { await Promise.all(promises) const result = await ResultAsync.fromPromise( - this.transactionManager.transactionRepository.updateTransactions(transactions), + this.transactionManager.transactionRepository.saveTransactions(transactions), unknownToError, ) diff --git a/packages/transaction-manager/lib/db/types.ts b/packages/transaction-manager/lib/db/types.ts index 5c485d2227..2699106430 100644 --- a/packages/transaction-manager/lib/db/types.ts +++ b/packages/transaction-manager/lib/db/types.ts @@ -16,6 +16,7 @@ export interface TransactionTable { metadata: string | undefined createdAt: number updatedAt: number + flushedAt: number | undefined } export interface Database { diff --git a/packages/transaction-manager/migrations/Migration20241205143000.js b/packages/transaction-manager/migrations/Migration20241205143000.js new file mode 100644 index 0000000000..13a427b6c5 --- /dev/null +++ b/packages/transaction-manager/migrations/Migration20241205143000.js @@ -0,0 +1,8 @@ +/* + SQLite does not have native time types. The SQL interface allows arbitrary type names including "DATE" and "DATETIME", + but this is invalid in this API, and results in "NUMERIC" affinity instead of "INTEGER" affinity, + which is the one we want here +*/ +export async function up(db) { + await db.schema.alterTable("transaction").addColumn("flushedAt", "integer").execute() +} From 7825c0f369fa0360eda7434487758a559d9e4819 Mon Sep 17 00:00:00 2001 From: Gabriel Martinez Rodriguez Date: Thu, 5 Dec 2024 18:21:33 +0100 Subject: [PATCH 2/4] chore(txm): flushedAt changed for a boolean --- packages/randomness-service/src/index.ts | 1 + .../transaction-manager/lib/Transaction.ts | 51 ++++++++++++++----- .../lib/TransactionRepository.ts | 37 ++++++++++---- packages/transaction-manager/lib/db/types.ts | 1 - .../migrations/Migration20241205143000.js | 8 --- 5 files changed, 66 insertions(+), 32 deletions(-) delete mode 100644 packages/transaction-manager/migrations/Migration20241205143000.js diff --git a/packages/randomness-service/src/index.ts b/packages/randomness-service/src/index.ts index 497fc717f8..7b253beb93 100644 --- a/packages/randomness-service/src/index.ts +++ b/packages/randomness-service/src/index.ts @@ -23,6 +23,7 @@ class RandomnessService { chain: anvil, abis: abis, gasEstimator: new CustomGasEstimator(), + rpcAllowDebug: true, }) this.commitmentTransactionFactory = new CommitmentTransactionFactory( this.txm, diff --git a/packages/transaction-manager/lib/Transaction.ts b/packages/transaction-manager/lib/Transaction.ts index 7cc4c97db0..b1dc8eb2fd 100644 --- a/packages/transaction-manager/lib/Transaction.ts +++ b/packages/transaction-manager/lib/Transaction.ts @@ -99,12 +99,16 @@ export class Transaction { readonly attempts: Attempt[] + // Whether the transaction has been updated and needs to be flushed to the database. This field is not persisted in the database. + pendingFlush: boolean + + // Whether the transaction has not been persisted in the database yet. This field is not persisted in the database. + notPersisted: boolean + createdAt: Date updatedAt: Date - flushedAt?: Date - /** * Stores additional information for the transaction. * Enables originators to provide extra details, such as gas limits, which can be leveraged by customizable services. @@ -124,7 +128,8 @@ export class Transaction { attempts, createdAt, updatedAt, - flushedAt, + pendingFlush, + notPersisted, metadata, }: TransactionConstructorConfig & { from: Address @@ -134,7 +139,8 @@ export class Transaction { attempts?: Attempt[] createdAt?: Date updatedAt?: Date - flushedAt?: Date + pendingFlush?: boolean + notPersisted?: boolean }) { this.intentId = intentId ?? createUUID() this.from = from @@ -149,12 +155,13 @@ export class Transaction { this.createdAt = createdAt ?? new Date() this.updatedAt = updatedAt ?? new Date() this.metadata = metadata ?? {} - this.flushedAt = flushedAt + this.pendingFlush = pendingFlush === undefined ? true : pendingFlush + this.notPersisted = notPersisted === undefined ? true : notPersisted } addAttempt(attempt: Attempt): void { this.attempts.push(attempt) - this.updatedAt = new Date() + this.markUpdated() } removeAttempt(hash: Hash): void { @@ -162,7 +169,7 @@ export class Transaction { if (index > -1) { this.attempts.splice(index, 1) } - this.updatedAt = new Date() + this.markUpdated() } getInAirAttempts(): Attempt[] { @@ -177,7 +184,7 @@ export class Transaction { changeStatus(status: TransactionStatus): void { this.status = status - this.updatedAt = new Date() + this.markUpdated() eventBus.emit(Topics.TransactionStatusChanged, { transaction: this, }) @@ -191,8 +198,28 @@ export class Transaction { return this.attempts[this.attempts.length - 1] } - setFlushedAt(when: Date | undefined): void { - this.flushedAt = when + notifyFlush(): void { + this.pendingFlush = false + + if (this.notPersisted) { + this.notPersisted = false + } + } + + notifyFlushFailed(): void { + this.pendingFlush = true + } + + notifyNotPersisted(): void { + this.notPersisted = true + } + + private markUpdated(): void { + this.updatedAt = new Date() + + if (this.pendingFlush === false) { + this.pendingFlush = true + } } toDbRow(): Insertable { @@ -210,7 +237,6 @@ export class Transaction { metadata: this.metadata ? JSON.stringify(this.metadata, bigIntReplacer) : undefined, createdAt: this.createdAt.getTime(), updatedAt: this.updatedAt.getTime(), - flushedAt: this.flushedAt?.getTime(), } } @@ -222,7 +248,8 @@ export class Transaction { metadata: row.metadata ? JSON.parse(row.metadata, bigIntReviver) : undefined, createdAt: new Date(row.createdAt), updatedAt: new Date(row.updatedAt), - flushedAt: row.flushedAt ? new Date(row.flushedAt) : undefined, + notPersisted: false, + pendingFlush: false, }) } } diff --git a/packages/transaction-manager/lib/TransactionRepository.ts b/packages/transaction-manager/lib/TransactionRepository.ts index 21e9e2a5ab..bb2bc74a72 100644 --- a/packages/transaction-manager/lib/TransactionRepository.ts +++ b/packages/transaction-manager/lib/TransactionRepository.ts @@ -59,20 +59,24 @@ export class TransactionRepository { } async saveTransactions(transactions: Transaction[]): Promise> { - const transactionsToFlush = transactions.filter( - (t) => !t.flushedAt || t.flushedAt.getTime() <= t.updatedAt.getTime(), + const transactionsToFlush = transactions.filter((t) => t.pendingFlush) + + const notPersistedTransactions = transactions.filter((t) => t.notPersisted) + + this.notFinalizedTransactions.push( + ...notPersistedTransactions.filter((t) => !this.notFinalizedTransactions.includes(t)), ) - const oldFlushedAt = transactionsToFlush.map((t) => t.flushedAt) + console.log( + "Diff", + transactions.filter((t) => !transactionsToFlush.includes(t)), + ) const result = await ResultAsync.fromPromise( db.transaction().execute(async (dbTransaction) => { - const promises = transactionsToFlush.map((t, index) => { - t.setFlushedAt(new Date()) - if (oldFlushedAt[index] === undefined) { - db.insertInto("transaction") - .values(transactions.map((t) => t.toDbRow())) - .execute() + const promises = transactionsToFlush.map((t) => { + if (t.notPersisted) { + dbTransaction.insertInto("transaction").values(t.toDbRow()).execute() } else { dbTransaction .updateTable("transaction") @@ -80,6 +84,7 @@ export class TransactionRepository { .where("intentId", "=", t.intentId) .execute() } + t.notifyFlush() }) await Promise.all(promises) }), @@ -91,8 +96,18 @@ export class TransactionRepository { NotFinalizedStatuses.includes(transaction.status), ) } else { - for (let i = 0; i < transactions.length; i++) { - transactions[i].setFlushedAt(oldFlushedAt[i]) + for (let i = 0; i < transactionsToFlush.length; i++) { + transactionsToFlush[i].notifyFlushFailed() + + if (notPersistedTransactions.includes(transactionsToFlush[i])) { + transactionsToFlush[i].notifyNotPersisted() + const index = this.notFinalizedTransactions.findIndex( + (t) => t.intentId === transactionsToFlush[i].intentId, + ) + if (index !== -1) { + this.notFinalizedTransactions.splice(index, 1) + } + } } } diff --git a/packages/transaction-manager/lib/db/types.ts b/packages/transaction-manager/lib/db/types.ts index 2699106430..5c485d2227 100644 --- a/packages/transaction-manager/lib/db/types.ts +++ b/packages/transaction-manager/lib/db/types.ts @@ -16,7 +16,6 @@ export interface TransactionTable { metadata: string | undefined createdAt: number updatedAt: number - flushedAt: number | undefined } export interface Database { diff --git a/packages/transaction-manager/migrations/Migration20241205143000.js b/packages/transaction-manager/migrations/Migration20241205143000.js deleted file mode 100644 index 13a427b6c5..0000000000 --- a/packages/transaction-manager/migrations/Migration20241205143000.js +++ /dev/null @@ -1,8 +0,0 @@ -/* - SQLite does not have native time types. The SQL interface allows arbitrary type names including "DATE" and "DATETIME", - but this is invalid in this API, and results in "NUMERIC" affinity instead of "INTEGER" affinity, - which is the one we want here -*/ -export async function up(db) { - await db.schema.alterTable("transaction").addColumn("flushedAt", "integer").execute() -} From 4e61e60ee3a367d85a0ca9ec4e795e450c81dabf Mon Sep 17 00:00:00 2001 From: Gabriel Martinez Rodriguez Date: Fri, 6 Dec 2024 11:46:43 +0100 Subject: [PATCH 3/4] fix(txm): notifyFlush after db transaction success --- .../lib/TransactionRepository.ts | 26 ++----------------- 1 file changed, 2 insertions(+), 24 deletions(-) diff --git a/packages/transaction-manager/lib/TransactionRepository.ts b/packages/transaction-manager/lib/TransactionRepository.ts index bb2bc74a72..10193c312e 100644 --- a/packages/transaction-manager/lib/TransactionRepository.ts +++ b/packages/transaction-manager/lib/TransactionRepository.ts @@ -63,15 +63,6 @@ export class TransactionRepository { const notPersistedTransactions = transactions.filter((t) => t.notPersisted) - this.notFinalizedTransactions.push( - ...notPersistedTransactions.filter((t) => !this.notFinalizedTransactions.includes(t)), - ) - - console.log( - "Diff", - transactions.filter((t) => !transactionsToFlush.includes(t)), - ) - const result = await ResultAsync.fromPromise( db.transaction().execute(async (dbTransaction) => { const promises = transactionsToFlush.map((t) => { @@ -84,7 +75,6 @@ export class TransactionRepository { .where("intentId", "=", t.intentId) .execute() } - t.notifyFlush() }) await Promise.all(promises) }), @@ -92,23 +82,11 @@ export class TransactionRepository { ) if (result.isOk()) { + this.notFinalizedTransactions.push(...notPersistedTransactions) this.notFinalizedTransactions = this.notFinalizedTransactions.filter((transaction) => NotFinalizedStatuses.includes(transaction.status), ) - } else { - for (let i = 0; i < transactionsToFlush.length; i++) { - transactionsToFlush[i].notifyFlushFailed() - - if (notPersistedTransactions.includes(transactionsToFlush[i])) { - transactionsToFlush[i].notifyNotPersisted() - const index = this.notFinalizedTransactions.findIndex( - (t) => t.intentId === transactionsToFlush[i].intentId, - ) - if (index !== -1) { - this.notFinalizedTransactions.splice(index, 1) - } - } - } + transactions.forEach((t) => t.notifyFlush()) } return result From 6d6cd5c76090b443ec8cd7212f3126da22c65549 Mon Sep 17 00:00:00 2001 From: Gabriel Martinez Rodriguez Date: Fri, 20 Dec 2024 12:22:09 +0100 Subject: [PATCH 4/4] chore(txm): pr review --- .../transaction-manager/lib/Transaction.ts | 26 +++++++------------ .../lib/TransactionRepository.ts | 2 +- 2 files changed, 11 insertions(+), 17 deletions(-) diff --git a/packages/transaction-manager/lib/Transaction.ts b/packages/transaction-manager/lib/Transaction.ts index b1dc8eb2fd..67ca68c35f 100644 --- a/packages/transaction-manager/lib/Transaction.ts +++ b/packages/transaction-manager/lib/Transaction.ts @@ -99,10 +99,12 @@ export class Transaction { readonly attempts: Attempt[] - // Whether the transaction has been updated and needs to be flushed to the database. This field is not persisted in the database. + // Whether the transaction has been updated and needs to be flushed to the database. + // This field is not persisted in the database. pendingFlush: boolean - // Whether the transaction has not been persisted in the database yet. This field is not persisted in the database. + // This is true if the transaction has never been persisted to the database yet. + // This field is not persisted in the database. notPersisted: boolean createdAt: Date @@ -116,25 +118,25 @@ export class Transaction { readonly metadata: Record constructor({ - intentId, - from, - chainId, address, functionName, contractName, args, deadline, + metadata, + intentId, + from, + chainId, status, attempts, createdAt, updatedAt, pendingFlush, notPersisted, - metadata, }: TransactionConstructorConfig & { + intentId?: UUID from: Address chainId: number - intentId?: UUID status?: TransactionStatus attempts?: Attempt[] createdAt?: Date @@ -198,7 +200,7 @@ export class Transaction { return this.attempts[this.attempts.length - 1] } - notifyFlush(): void { + markFlushed(): void { this.pendingFlush = false if (this.notPersisted) { @@ -206,14 +208,6 @@ export class Transaction { } } - notifyFlushFailed(): void { - this.pendingFlush = true - } - - notifyNotPersisted(): void { - this.notPersisted = true - } - private markUpdated(): void { this.updatedAt = new Date() diff --git a/packages/transaction-manager/lib/TransactionRepository.ts b/packages/transaction-manager/lib/TransactionRepository.ts index 10193c312e..3953d30d39 100644 --- a/packages/transaction-manager/lib/TransactionRepository.ts +++ b/packages/transaction-manager/lib/TransactionRepository.ts @@ -86,7 +86,7 @@ export class TransactionRepository { this.notFinalizedTransactions = this.notFinalizedTransactions.filter((transaction) => NotFinalizedStatuses.includes(transaction.status), ) - transactions.forEach((t) => t.notifyFlush()) + transactions.forEach((t) => t.markFlushed()) } return result