From b89ef745f9ba24d545726d8353c5021da0264b2a Mon Sep 17 00:00:00 2001 From: Gabriel Claramunt Date: Thu, 1 May 2025 13:21:08 -0300 Subject: [PATCH 1/2] fix legacy balance calc --- .../mapper/CurrencySnapshotMapper.scala | 12 +++++++++--- .../mapper/GlobalSnapshotMapper.scala | 2 +- .../snapshotstreaming/mapper/SnapshotMapper.scala | 8 ++++---- .../mapper/CurrencySnapshotMapperSuite.scala | 10 +++++----- .../mapper/GlobalSnapshotMapperSuite.scala | 8 ++++---- 5 files changed, 23 insertions(+), 17 deletions(-) diff --git a/src/main/scala/org/constellation/snapshotstreaming/mapper/CurrencySnapshotMapper.scala b/src/main/scala/org/constellation/snapshotstreaming/mapper/CurrencySnapshotMapper.scala index e9c19198..4c0a54ce 100644 --- a/src/main/scala/org/constellation/snapshotstreaming/mapper/CurrencySnapshotMapper.scala +++ b/src/main/scala/org/constellation/snapshotstreaming/mapper/CurrencySnapshotMapper.scala @@ -110,8 +110,14 @@ object CurrencySnapshotMapper { transactions <- fullMapper .mapTransactions(full, timestamp, txHasher, hasher) .map(_.map(CurrencyData(identifierStr, _))) + prevBalances = aggLastBalances.get(identifier) + filteredBalances = fullMapper.balanceDiff( + full, + prevBalances, + full.info.balances + ) balances = fullMapper - .mapBalances(full, full.info.balances, timestamp) + .mapBalances(full, filteredBalances, timestamp) .map(CurrencyData(identifierStr, _)) } yield ( aggCurrencySnap :+ snapshot, @@ -124,7 +130,7 @@ object CurrencySnapshotMapper { aggSpendExpirations, aggTokenLocks, aggTokenUnlocks, - aggLastBalances + (identifier -> full.info.balances) + aggLastBalances + (identifier -> filteredBalances) ) case Right((incremental, info, binary)) => @@ -151,7 +157,7 @@ object CurrencySnapshotMapper { filteredBalances = incrementalMapper.balanceDiff( incremental, prevBalances, - info + info.balances ) balances = incrementalMapper .mapBalances(incremental, filteredBalances, timestamp) diff --git a/src/main/scala/org/constellation/snapshotstreaming/mapper/GlobalSnapshotMapper.scala b/src/main/scala/org/constellation/snapshotstreaming/mapper/GlobalSnapshotMapper.scala index 5ed1923a..d6a3c968 100644 --- a/src/main/scala/org/constellation/snapshotstreaming/mapper/GlobalSnapshotMapper.scala +++ b/src/main/scala/org/constellation/snapshotstreaming/mapper/GlobalSnapshotMapper.scala @@ -57,7 +57,7 @@ abstract class GlobalSnapshotMapper[F[_]: Async] extends SnapshotMapper[F, Globa filteredBalances = balanceDiff( globalSnapshot.signed.value, maybePrevSnapshotInfo.map(prev => prev.balances), - snapshotInfo + snapshotInfo.balances ) balances = mapBalances(globalSnapshot, filteredBalances, timestamp) diff --git a/src/main/scala/org/constellation/snapshotstreaming/mapper/SnapshotMapper.scala b/src/main/scala/org/constellation/snapshotstreaming/mapper/SnapshotMapper.scala index 33bc4048..93a6fbef 100644 --- a/src/main/scala/org/constellation/snapshotstreaming/mapper/SnapshotMapper.scala +++ b/src/main/scala/org/constellation/snapshotstreaming/mapper/SnapshotMapper.scala @@ -122,11 +122,11 @@ abstract class SnapshotMapper[F[_]: Async, S <: OriginalSnapshot] { def balanceDiff( snapshot: S, prevBalances: Option[SortedMap[Address, Balance]], - info: SnapshotInfo[_], + newBalances: SortedMap[Address, Balance], ): SortedMap[Address, Balance] = prevBalances match { case Some(prev) => - val changed = info.balances.filterNot { case (address, balance) => + val changed = newBalances.filterNot { case (address, balance) => prev.get(address).exists(_ === balance) } @@ -136,14 +136,14 @@ abstract class SnapshotMapper[F[_]: Async, S <: OriginalSnapshot] { */ val explicitlyZeroed = { val srcTransactions = extractSnapshotReferredAddresses(snapshot).source - (srcTransactions -- info.balances.keys) + (srcTransactions -- newBalances.keys) .map(address => address -> Balance.empty) .toSortedMap } changed ++ explicitlyZeroed case None => - info.balances + newBalances } def extractSnapshotReferredAddresses(snapshot: S): SnapshotReferredAddresses diff --git a/src/test/scala/org/constellation/snapshotstreaming/opensearch/mapper/CurrencySnapshotMapperSuite.scala b/src/test/scala/org/constellation/snapshotstreaming/opensearch/mapper/CurrencySnapshotMapperSuite.scala index 527fd35e..d8e9ceb3 100644 --- a/src/test/scala/org/constellation/snapshotstreaming/opensearch/mapper/CurrencySnapshotMapperSuite.scala +++ b/src/test/scala/org/constellation/snapshotstreaming/opensearch/mapper/CurrencySnapshotMapperSuite.scala @@ -98,7 +98,7 @@ object CurrencySnapshotMapperSuite extends MutableIOSuite { result = CurrencyIncrementalSnapshotMapper .make() - .balanceDiff(snapshot, initialBalances.some, emptyCurrencySnapshotInfo) + .balanceDiff(snapshot, initialBalances.some, emptyCurrencySnapshotInfo.balances) } yield expect.all( initialBalances(address1) === Balance(1000L), initialBalances(address2) === Balance(1000L), @@ -144,7 +144,7 @@ object CurrencySnapshotMapperSuite extends MutableIOSuite { result = CurrencyIncrementalSnapshotMapper .make() - .balanceDiff(snapshot, initialBalances.some, updatedInfo) + .balanceDiff(snapshot, initialBalances.some, updatedInfo.balances) } yield expect.same( result, updatedBalances - address1 - address2 @@ -182,7 +182,7 @@ object CurrencySnapshotMapperSuite extends MutableIOSuite { result = CurrencyIncrementalSnapshotMapper .make() - .balanceDiff(snapshot, initialBalances.some, updatedInfo) + .balanceDiff(snapshot, initialBalances.some, updatedInfo.balances) } yield expect.same( result, updatedBalances - address1 - address2 @@ -226,7 +226,7 @@ object CurrencySnapshotMapperSuite extends MutableIOSuite { result = CurrencyIncrementalSnapshotMapper .make() - .balanceDiff(snapshot, initialBalances.some, updatedInfo) + .balanceDiff(snapshot, initialBalances.some, updatedInfo.balances) } yield expect.same( result, updatedBalances - address4 @@ -262,7 +262,7 @@ object CurrencySnapshotMapperSuite extends MutableIOSuite { rewards = rewards ) - result = CurrencyIncrementalSnapshotMapper.make().balanceDiff(snapshot, initialBalances.some, updatedInfo) + result = CurrencyIncrementalSnapshotMapper.make().balanceDiff(snapshot, initialBalances.some, updatedInfo.balances) } yield expect.same( result, updatedBalances - address3 - address4 diff --git a/src/test/scala/org/constellation/snapshotstreaming/opensearch/mapper/GlobalSnapshotMapperSuite.scala b/src/test/scala/org/constellation/snapshotstreaming/opensearch/mapper/GlobalSnapshotMapperSuite.scala index 39359b55..e99aca1b 100644 --- a/src/test/scala/org/constellation/snapshotstreaming/opensearch/mapper/GlobalSnapshotMapperSuite.scala +++ b/src/test/scala/org/constellation/snapshotstreaming/opensearch/mapper/GlobalSnapshotMapperSuite.scala @@ -115,7 +115,7 @@ object GlobalSnapshotMapperSuite extends MutableIOSuite { result = GlobalSnapshotMapper .make() - .balanceDiff(snapshot, initialBalances.some, GlobalSnapshotInfo.empty) + .balanceDiff(snapshot, initialBalances.some, GlobalSnapshotInfo.empty.balances) } yield expect.all( initialBalances(address1) === Balance(1000L), initialBalances(address2) === Balance(1000L), @@ -176,7 +176,7 @@ object GlobalSnapshotMapperSuite extends MutableIOSuite { result = GlobalSnapshotMapper .make() - .balanceDiff(snapshot, initialBalances.some, updatedInfo) + .balanceDiff(snapshot, initialBalances.some, updatedInfo.balances) } yield expect.same( result, updatedBalances - address1 - address2 @@ -235,7 +235,7 @@ object GlobalSnapshotMapperSuite extends MutableIOSuite { result = GlobalSnapshotMapper .make() - .balanceDiff(snapshot, initialBalances.some, updatedInfo) + .balanceDiff(snapshot, initialBalances.some, updatedInfo.balances) } yield expect.same( result, updatedBalances - address4 @@ -287,7 +287,7 @@ object GlobalSnapshotMapperSuite extends MutableIOSuite { rewards = rewards ) - result = GlobalSnapshotMapper.make().balanceDiff(snapshot, initialBalances.some, updatedInfo) + result = GlobalSnapshotMapper.make().balanceDiff(snapshot, initialBalances.some, updatedInfo.balances) } yield expect.same( result, updatedBalances - address3 - address4 From 44ec5cb78053beabef0eb2031e48e4be0b47fc4d Mon Sep 17 00:00:00 2001 From: Gabriel Claramunt Date: Mon, 12 May 2025 12:11:55 -0300 Subject: [PATCH 2/2] skip unchanged balances generation --- .../snapshotstreaming/db/SnapshotDAO.scala | 11 +- .../snapshotstreaming/db/package.scala | 15 ++- .../CurrencyIncrementalSnapshotMapper.scala | 118 +++++++++++------- .../mapper/CurrencySnapshotMapper.scala | 112 ++++++++++++----- .../mapper/GlobalSnapshotMapper.scala | 29 ++--- .../mapper/SnapshotMapper.scala | 89 ++++++------- .../snapshotstreaming/schema/schema.scala | 8 +- 7 files changed, 233 insertions(+), 149 deletions(-) diff --git a/src/main/scala/org/constellation/snapshotstreaming/db/SnapshotDAO.scala b/src/main/scala/org/constellation/snapshotstreaming/db/SnapshotDAO.scala index 7a7377be..66161739 100644 --- a/src/main/scala/org/constellation/snapshotstreaming/db/SnapshotDAO.scala +++ b/src/main/scala/org/constellation/snapshotstreaming/db/SnapshotDAO.scala @@ -24,6 +24,7 @@ import org.constellation.snapshotstreaming.schema.{ } import io.constellationnetwork.security.signature.signature.SignatureProof import org.constellation.snapshotstreaming.schema.TokenLocks.{TokenLock, TokenUnlock} +import org.typelevel.log4cats.slf4j.Slf4jLogger import skunk._ import skunk.codec.all._ import skunk.implicits._ @@ -654,8 +655,10 @@ object SnapshotDAO { def make[F[_]: Async](pool: Resource[F, Session[F]]): SnapshotDAO[F] = new SnapshotDAO[F] { + private implicit val logger = Slf4jLogger.getLogger[F] + def insertGlobalData(snapshot: GlobalData, mgSnaphotsCount: Int): F[Unit] = - pool.use { session => + retryF(pool.use { session => session.transaction.use { xa => val gsHash = snapshot.snapshot.hash val blockParents = snapshot.blocks.toList.flatMap(b => b.parent.map((b.hash, _))) @@ -697,10 +700,10 @@ object SnapshotDAO { _ <- xa.commit } yield () } - } + }) def insertMetagraphData(globalSnapshotHash: String, mgSnapshot: MetagraphData): F[Unit] = - pool.use { session => + retryF(pool.use { session => session.transaction.use { xa => val blockParents = mgSnapshot.blocks.flatMap { currencyData => currencyData.data.parent.map(parent => (currencyData.identifier, currencyData.data.hash, parent)) @@ -742,7 +745,7 @@ object SnapshotDAO { _ <- xa.commit } yield () } - } + }) } diff --git a/src/main/scala/org/constellation/snapshotstreaming/db/package.scala b/src/main/scala/org/constellation/snapshotstreaming/db/package.scala index 9f8d107e..d2d86bfc 100644 --- a/src/main/scala/org/constellation/snapshotstreaming/db/package.scala +++ b/src/main/scala/org/constellation/snapshotstreaming/db/package.scala @@ -1,11 +1,13 @@ package org.constellation.snapshotstreaming -import cats.Applicative +import cats.{Applicative, MonadError} import cats.effect.std.Console import cats.effect.{Resource, Temporal} import cats.syntax.all._ import fs2.io.net.Network +import org.typelevel.log4cats.Logger import org.typelevel.otel4s.trace.Tracer +import skunk.exception.EofException import skunk.{PreparedCommand, Session} package object db { @@ -26,4 +28,15 @@ package object db { def executeCmd[T, F[_]: Applicative](cmd: PreparedCommand[F, T])(entities: Seq[T]): F[Unit] = entities.traverse(e => cmd.execute(e)).whenA(entities.nonEmpty) + def retryF[F[_]: Logger, A](effect: F[A], maxRetries: Int = 5)(implicit + F: MonadError[F, Throwable], + timer: Temporal[F] + ): F[A] = + effect.handleErrorWith { case err: EofException => + if (maxRetries > 0) + Logger[F].warn(s"Error $err from db, retrying... (${maxRetries})") *> retryF(effect, maxRetries - 1) + else + F.raiseError(err) + } + } diff --git a/src/main/scala/org/constellation/snapshotstreaming/mapper/CurrencyIncrementalSnapshotMapper.scala b/src/main/scala/org/constellation/snapshotstreaming/mapper/CurrencyIncrementalSnapshotMapper.scala index 4871a9d3..efa5e23a 100644 --- a/src/main/scala/org/constellation/snapshotstreaming/mapper/CurrencyIncrementalSnapshotMapper.scala +++ b/src/main/scala/org/constellation/snapshotstreaming/mapper/CurrencyIncrementalSnapshotMapper.scala @@ -17,33 +17,48 @@ import io.constellationnetwork.security.signature.Signed import io.constellationnetwork.security.{Hashed, Hasher} import io.constellationnetwork.statechannel.StateChannelSnapshotBinary import org.constellation.snapshotstreaming.schema.AllowSpends.{AllowSpend, SpendTransaction} -import org.constellation.snapshotstreaming.schema.{CurrencySnapshot, FeeTransaction, RewardTransaction, TransactionReference} +import org.constellation.snapshotstreaming.schema.{ + CurrencySnapshot, + FeeTransaction, + RewardTransaction, + TransactionReference +} import java.time.LocalDateTime import scala.collection.immutable.SortedSet -abstract class CurrencyIncrementalSnapshotMapper[F[_]: Async] - extends SnapshotMapper[F, CurrencyIncrementalSnapshot] { +abstract class CurrencyIncrementalSnapshotMapper[F[_]: Async] extends SnapshotMapper[F, CurrencyIncrementalSnapshot] { def mapSnapshot( - snapshot: Hashed[CurrencyIncrementalSnapshot], - binary: Signed[StateChannelSnapshotBinary], - info: CurrencySnapshotInfo, - timestamp: LocalDateTime, - hasher: Hasher[F] - ): F[CurrencySnapshot] + snapshot: Hashed[CurrencyIncrementalSnapshot], + binary: Signed[StateChannelSnapshotBinary], + info: CurrencySnapshotInfo, + timestamp: LocalDateTime, + hasher: Hasher[F] + ): F[CurrencySnapshot] def mapFeeTransactions( - snapshot: Hashed[CurrencyIncrementalSnapshot], - timestamp: LocalDateTime, - hasher: Hasher[F] - ): F[List[FeeTransaction]] + snapshot: Hashed[CurrencyIncrementalSnapshot], + timestamp: LocalDateTime, + hasher: Hasher[F] + ): F[List[FeeTransaction]] - def mapAllowSpends(snapshot: Hashed[CurrencyIncrementalSnapshot], timestamp: LocalDateTime, hasher: Hasher[F]): F[List[AllowSpend]] + def mapAllowSpends( + snapshot: Hashed[CurrencyIncrementalSnapshot], + timestamp: LocalDateTime, + hasher: Hasher[F] + ): F[List[AllowSpend]] - def mapTokenLocks(snapshot: Hashed[CurrencyIncrementalSnapshot], timestamp: LocalDateTime, hasher: Hasher[F]): F[List[TokenLock]] + def mapTokenLocks( + snapshot: Hashed[CurrencyIncrementalSnapshot], + timestamp: LocalDateTime, + hasher: Hasher[F] + ): F[List[TokenLock]] - def mapArtifacts(snapshot: Hashed[CurrencyIncrementalSnapshot], hasher: Hasher[F]): F[(List[SpendTransaction], List[TokenUnlock], List[AllowSpendExpiration])] + def mapArtifacts( + snapshot: Hashed[CurrencyIncrementalSnapshot], + hasher: Hasher[F] + ): F[(List[SpendTransaction], List[TokenUnlock], List[AllowSpendExpiration])] } @@ -64,10 +79,10 @@ object CurrencyIncrementalSnapshotMapper { } def mapFeeTransactions( - snapshot: Hashed[CurrencyIncrementalSnapshot], - timestamp: LocalDateTime, - hasher: Hasher[F] - ): F[List[FeeTransaction]] = { + snapshot: Hashed[CurrencyIncrementalSnapshot], + timestamp: LocalDateTime, + hasher: Hasher[F] + ): F[List[FeeTransaction]] = { implicit val hs: Hasher[F] = hasher snapshot.feeTransactions.toList.flatTraverse( _.toList.traverse(mapFeeTransaction(snapshot.hash.value, snapshot.ordinal.value, timestamp)) @@ -75,12 +90,12 @@ object CurrencyIncrementalSnapshotMapper { } def mapSnapshot( - snapshot: Hashed[CurrencyIncrementalSnapshot], - binary: Signed[StateChannelSnapshotBinary], - info: CurrencySnapshotInfo, - timestamp: LocalDateTime, - hasher: Hasher[F] - ): F[CurrencySnapshot] = for { + snapshot: Hashed[CurrencyIncrementalSnapshot], + binary: Signed[StateChannelSnapshotBinary], + info: CurrencySnapshotInfo, + timestamp: LocalDateTime, + hasher: Hasher[F] + ): F[CurrencySnapshot] = for { blocksHashes <- snapshot.blocks.unsorted.map(_.block).map(hashBlock(_, hasher)).toList.sequence sizeInKb <- SizeCalculator.kilobytes(binary) } yield CurrencySnapshot( @@ -143,10 +158,14 @@ object CurrencyIncrementalSnapshotMapper { ) } - def mapAllowSpends(snapshot: Hashed[CurrencyIncrementalSnapshot], timestamp: LocalDateTime, hasher: Hasher[F]): F[List[AllowSpend]] = { + def mapAllowSpends( + snapshot: Hashed[CurrencyIncrementalSnapshot], + timestamp: LocalDateTime, + hasher: Hasher[F] + ): F[List[AllowSpend]] = { implicit val hs: Hasher[F] = hasher snapshot.allowSpendBlocks.toList.flatTraverse( - _.toList.flatTraverse( alb => alb.transactions.toList.traverse(mapAllowSpend(snapshot.hash, alb.roundId))) + _.toList.flatTraverse(alb => alb.transactions.toList.traverse(mapAllowSpend(snapshot.hash, alb.roundId))) ) } @@ -166,17 +185,21 @@ object CurrencyIncrementalSnapshotMapper { ) } - def mapTokenLocks(snapshot: Hashed[CurrencyIncrementalSnapshot], timestamp: LocalDateTime, hasher: Hasher[F]): F[List[TokenLock]] = { + def mapTokenLocks( + snapshot: Hashed[CurrencyIncrementalSnapshot], + timestamp: LocalDateTime, + hasher: Hasher[F] + ): F[List[TokenLock]] = { implicit val hs: Hasher[F] = hasher snapshot.tokenLockBlocks.toList.flatTraverse( - _.toList.flatTraverse( tlb => tlb.tokenLocks.toList.traverse(mapTokenLock(snapshot.hash, tlb.roundId))) + _.toList.flatTraverse(tlb => tlb.tokenLocks.toList.traverse(mapTokenLock(snapshot.hash, tlb.roundId))) ) } def mapSpendTx(snapshotHash: Hash)( spendTx: artifact.SpendTransaction - )(implicit hasher: Hasher[F]): F[SpendTransaction] = hasher.hash(spendTx).map { - hash => SpendTransaction( + )(implicit hasher: Hasher[F]): F[SpendTransaction] = hasher.hash(spendTx).map { hash => + SpendTransaction( hash.value, spendTx.source.value, spendTx.destination.value, @@ -186,47 +209,50 @@ object CurrencyIncrementalSnapshotMapper { ) } - def mapTokenUnlock( snapshotHash: Hash, tokenUnlock: artifact.TokenUnlock ) - (implicit hasher: Hasher[F]): F[TokenUnlock] = hasher.hash(tokenUnlock).map { - hash => TokenUnlock( + def mapTokenUnlock(snapshotHash: Hash, tokenUnlock: artifact.TokenUnlock)(implicit + hasher: Hasher[F] + ): F[TokenUnlock] = hasher.hash(tokenUnlock).map { hash => + TokenUnlock( snapshotHash.value, hash.value, tokenUnlock.tokenLockRef.value, tokenUnlock.amount.value, - tokenUnlock.source.value, + tokenUnlock.source.value ) } - def mapExpiration(snapshotHash: Hash, expiry: artifact.AllowSpendExpiration) - (implicit hasher: Hasher[F]): F[AllowSpendExpiration] = hasher.hash(expiry).map { - hash => AllowSpendExpiration( + def mapExpiration(snapshotHash: Hash, expiry: artifact.AllowSpendExpiration)(implicit + hasher: Hasher[F] + ): F[AllowSpendExpiration] = hasher.hash(expiry).map { hash => + AllowSpendExpiration( snapshotHash.value, hash.value, - expiry.allowSpendRef.value, + expiry.allowSpendRef.value ) } - - def mapArtifacts(snapshot: Hashed[CurrencyIncrementalSnapshot], hasher: Hasher[F]): F[(List[SpendTransaction], List[TokenUnlock], List[AllowSpendExpiration])] = { + def mapArtifacts( + snapshot: Hashed[CurrencyIncrementalSnapshot], + hasher: Hasher[F] + ): F[(List[SpendTransaction], List[TokenUnlock], List[AllowSpendExpiration])] = { implicit val hs: Hasher[F] = hasher val events = snapshot.artifacts.toList.flatten val spendTxs = events.flatTraverse { case artifact.SpendAction(spendTransactions) => spendTransactions.toList.traverse(mapSpendTx(snapshot.hash)) - case _ => List.empty[SpendTransaction].pure + case _ => List.empty[SpendTransaction].pure } val tokenUnlocks = events.flatTraverse { case tu: artifact.TokenUnlock => mapTokenUnlock(snapshot.hash, tu).map(List(_)) - case _ => List.empty[TokenUnlock].pure + case _ => List.empty[TokenUnlock].pure } val expirations = events.flatTraverse { case exp: artifact.AllowSpendExpiration => mapExpiration(snapshot.hash, exp).map(List(_)) - case _ => List.empty[AllowSpendExpiration].pure + case _ => List.empty[AllowSpendExpiration].pure } (spendTxs, tokenUnlocks, expirations).tupled } - } } diff --git a/src/main/scala/org/constellation/snapshotstreaming/mapper/CurrencySnapshotMapper.scala b/src/main/scala/org/constellation/snapshotstreaming/mapper/CurrencySnapshotMapper.scala index 4c0a54ce..bed56382 100644 --- a/src/main/scala/org/constellation/snapshotstreaming/mapper/CurrencySnapshotMapper.scala +++ b/src/main/scala/org/constellation/snapshotstreaming/mapper/CurrencySnapshotMapper.scala @@ -10,12 +10,19 @@ import org.constellation.snapshotstreaming.SnapshotProcessor.GlobalSnapshotWithS import org.constellation.snapshotstreaming.schema.AllowSpends.{AllowSpend, AllowSpendExpiration, SpendTransaction} import org.constellation.snapshotstreaming.schema.TokenLocks.{TokenLock, TokenUnlock} import org.constellation.snapshotstreaming.schema.schema.{MetagraphData, toIncremental} -import org.constellation.snapshotstreaming.schema.{AddressBalance, Block, CurrencyData, FeeTransaction, Snapshot, Transaction, CurrencySnapshot => OSCurrencySnapshot} +import org.constellation.snapshotstreaming.schema.{ + AddressBalance, + Block, + CurrencyData, + CurrencySnapshot => OSCurrencySnapshot, + FeeTransaction, + Snapshot, + Transaction +} import java.time.LocalDateTime import scala.collection.immutable.SortedMap - trait CurrencySnapshotMapper[F[_]] { def mapCurrencySnapshots( @@ -42,18 +49,18 @@ object CurrencySnapshotMapper { new CurrencySnapshotMapper[F] { type Acc = ( - Seq[CurrencyData[OSCurrencySnapshot]], - Seq[CurrencyData[Block]], - Seq[CurrencyData[Transaction]], - Seq[CurrencyData[FeeTransaction]], - Seq[CurrencyData[AddressBalance]], - Seq[CurrencyData[AllowSpend]], - Seq[CurrencyData[SpendTransaction]], - Seq[CurrencyData[AllowSpendExpiration]], - Seq[CurrencyData[TokenLock]], - Seq[CurrencyData[TokenUnlock]], - Map[Address, SortedMap[Address, Balance]], - ) + Seq[CurrencyData[OSCurrencySnapshot]], + Seq[CurrencyData[Block]], + Seq[CurrencyData[Transaction]], + Seq[CurrencyData[FeeTransaction]], + Seq[CurrencyData[AddressBalance]], + Seq[CurrencyData[AllowSpend]], + Seq[CurrencyData[SpendTransaction]], + Seq[CurrencyData[AllowSpendExpiration]], + Seq[CurrencyData[TokenLock]], + Seq[CurrencyData[TokenUnlock]], + Map[Address, Map[Address, Balance]] + ) type CurrencySnapshotMapperResult = MetagraphData @@ -92,11 +99,23 @@ object CurrencySnapshotMapper { currencySnapshots.toList.flatMap { case (i, s) => s.toList.map((i, _)) } .foldLeftM[F, Acc](initialAcc) { case ( - (aggCurrencySnap, aggBlocks, aggTxs, aggFeeTxs, aggBalances, aggAllowSpends, aggSpendTxs, aggSpendExpirations, aggTokenLocks, aggTokenUnlocks, aggLastBalances), - (identifier, fullOrIncremental) - ) => + ( + aggCurrencySnap, + aggBlocks, + aggTxs, + aggFeeTxs, + aggChangedBalances, + aggAllowSpends, + aggSpendTxs, + aggSpendExpirations, + aggTokenLocks, + aggTokenUnlocks, + aggLastBalances + ), + (identifier, fullOrIncremental) + ) => val identifierStr = identifier.value.value - def toCurrency[A](a:A) = CurrencyData(identifierStr, a) + def toCurrency[A](a: A) = CurrencyData(identifierStr, a) fullOrIncremental match { case Left(full) => @@ -111,26 +130,26 @@ object CurrencySnapshotMapper { .mapTransactions(full, timestamp, txHasher, hasher) .map(_.map(CurrencyData(identifierStr, _))) prevBalances = aggLastBalances.get(identifier) - filteredBalances = fullMapper.balanceDiff( + onlyUpdatedBalances = fullMapper.balanceDiff( full, prevBalances, full.info.balances ) - balances = fullMapper - .mapBalances(full, filteredBalances, timestamp) + changedAddressBalances = fullMapper + .mapBalances(full, onlyUpdatedBalances, timestamp) .map(CurrencyData(identifierStr, _)) } yield ( aggCurrencySnap :+ snapshot, aggBlocks ++ blocks, aggTxs ++ transactions, aggFeeTxs, - aggBalances ++ balances, + aggChangedBalances ++ changedAddressBalances, aggAllowSpends, aggSpendTxs, aggSpendExpirations, aggTokenLocks, aggTokenUnlocks, - aggLastBalances + (identifier -> filteredBalances) + aggLastBalances + (identifier -> full.info.balances) ) case Right((incremental, info, binary)) => @@ -154,34 +173,67 @@ object CurrencySnapshotMapper { tokenLocks <- incrementalMapper .mapTokenLocks(incremental, timestamp, hasher) prevBalances = aggLastBalances.get(identifier) - filteredBalances = incrementalMapper.balanceDiff( + // in theory, we won't need this + onlyUpdatedBalances = incrementalMapper.balanceDiff( incremental, prevBalances, info.balances ) - balances = incrementalMapper - .mapBalances(incremental, filteredBalances, timestamp) + changedAddressBalances = incrementalMapper + .mapBalances(incremental, onlyUpdatedBalances, timestamp) .map(CurrencyData(identifierStr, _)) } yield ( aggCurrencySnap :+ snapshot, aggBlocks ++ blocks, aggTxs ++ transactions, aggFeeTxs ++ feeTransactions, - aggBalances ++ balances, + aggChangedBalances ++ changedAddressBalances, aggAllowSpends ++ allowSpends.map(toCurrency), aggSpendTxs ++ spendsTx.map(toCurrency), aggSpendExpirations ++ spendExpirations.map(toCurrency), aggTokenLocks ++ tokenLocks.map(toCurrency), aggTokenUnlocks ++ tokenUnlocks.map(toCurrency), - aggLastBalances + (identifier -> filteredBalances) + updateBalanceMap(identifier, aggLastBalances, onlyUpdatedBalances) ) } } - .map { case (aggCurrencySnap, aggBlocks, aggTxs, aggFeeTxs, aggBalances, aggAllowSpends, aggSpendTxs, aggSpendExpirations, aggTokenLocks, aggTokenUnlocks, _) => - MetagraphData(aggCurrencySnap, aggBlocks, aggTxs, aggFeeTxs, aggBalances, aggAllowSpends, aggSpendTxs, aggSpendExpirations, aggTokenLocks, aggTokenUnlocks) + .map { + case ( + aggCurrencySnap, + aggBlocks, + aggTxs, + aggFeeTxs, + changedBalances, + aggAllowSpends, + aggSpendTxs, + aggSpendExpirations, + aggTokenLocks, + aggTokenUnlocks, + newBalanceState + ) => + MetagraphData( + aggCurrencySnap, + aggBlocks, + aggTxs, + aggFeeTxs, + changedBalances, + aggAllowSpends, + aggSpendTxs, + aggSpendExpirations, + aggTokenLocks, + aggTokenUnlocks, + newBalanceState + ) } } } + def updateBalanceMap( + identifier: Address, + previous: Map[Address, Map[Address, Balance]], + changedBalances: Map[Address, Balance] + ) = + previous.updatedWith(identifier)(_.map(_ ++ changedBalances).orElse(changedBalances.some)) + } diff --git a/src/main/scala/org/constellation/snapshotstreaming/mapper/GlobalSnapshotMapper.scala b/src/main/scala/org/constellation/snapshotstreaming/mapper/GlobalSnapshotMapper.scala index d6a3c968..d43bf70c 100644 --- a/src/main/scala/org/constellation/snapshotstreaming/mapper/GlobalSnapshotMapper.scala +++ b/src/main/scala/org/constellation/snapshotstreaming/mapper/GlobalSnapshotMapper.scala @@ -4,18 +4,11 @@ import cats.effect.Async import cats.syntax.all._ import eu.timepit.refined.auto._ import io.constellationnetwork.schema.address.Address +import io.constellationnetwork.schema.balance.Balance import io.constellationnetwork.schema.delegatedStake.{DelegatedStakeRecord, PendingDelegatedStakeWithdrawal} import io.constellationnetwork.schema.peer.PeerId import io.constellationnetwork.schema.round.RoundId -import io.constellationnetwork.schema.{ - GlobalIncrementalSnapshot, - GlobalSnapshotInfo, - SnapshotOrdinal, - artifact, - swap, - tokenLock, - transaction -} +import io.constellationnetwork.schema.{GlobalIncrementalSnapshot, GlobalSnapshotInfo, SnapshotOrdinal, artifact, swap, tokenLock, transaction} import io.constellationnetwork.security.hash.Hash import io.constellationnetwork.security.signature.Signed import io.constellationnetwork.security.{Hashed, Hasher} @@ -23,17 +16,7 @@ import org.constellation.snapshotstreaming.SnapshotProcessor.GlobalSnapshotWithS import org.constellation.snapshotstreaming.schema.AllowSpends.{AllowSpend, AllowSpendExpiration, SpendTransaction} import org.constellation.snapshotstreaming.schema.TokenLocks.{TokenLock, TokenUnlock} import org.constellation.snapshotstreaming.schema.schema.GlobalData -import org.constellation.snapshotstreaming.schema.{ - DelegatedStakingBalanceChanges, - DelegatedStakingCreate, - DelegatedStakingReward, - DelegatedStakingWithdraw, - RewardTransaction, - Snapshot, - StakingEventCreate, - StakingEventWithdraw, - TransactionReference -} +import org.constellation.snapshotstreaming.schema.{DelegatedStakingBalanceChanges, DelegatedStakingCreate, DelegatedStakingReward, DelegatedStakingWithdraw, RewardTransaction, Snapshot, StakingEventCreate, StakingEventWithdraw, TransactionReference} import java.time.LocalDateTime import scala.collection.immutable.{SortedMap, SortedSet} @@ -48,12 +31,13 @@ abstract class GlobalSnapshotMapper[F[_]: Async] extends SnapshotMapper[F, Globa txHasher: Hasher[F], hasher: Hasher[F] ): F[GlobalData] = { - val GlobalSnapshotWithState(globalSnapshot, maybePrevSnapshotInfo, snapshotInfo, _, ts) = + val GlobalSnapshotWithState(globalSnapshot, maybePrevSnapshotInfo, snapshotInfo, currencySnapshots, timestamp) = globalSnapshotWithState for { snapshot <- mapSnapshot(globalSnapshot, timestamp, hasher) blocks <- mapBlocks(globalSnapshot, timestamp, txHasher, hasher) transactions <- mapTransactions(globalSnapshot, timestamp, txHasher, hasher) + prevBalances = maybePrevSnapshotInfo.map(prev => prev.balances).getOrElse(Map[Address, Balance]()) filteredBalances = balanceDiff( globalSnapshot.signed.value, maybePrevSnapshotInfo.map(prev => prev.balances), @@ -103,7 +87,8 @@ abstract class GlobalSnapshotMapper[F[_]: Async] extends SnapshotMapper[F, Globa stakingRewards, stakingBalances, spendTransactions, - allowSpendExpirations + allowSpendExpirations, + prevBalances ++ filteredBalances ) } diff --git a/src/main/scala/org/constellation/snapshotstreaming/mapper/SnapshotMapper.scala b/src/main/scala/org/constellation/snapshotstreaming/mapper/SnapshotMapper.scala index 93a6fbef..b772b190 100644 --- a/src/main/scala/org/constellation/snapshotstreaming/mapper/SnapshotMapper.scala +++ b/src/main/scala/org/constellation/snapshotstreaming/mapper/SnapshotMapper.scala @@ -3,19 +3,18 @@ package org.constellation.snapshotstreaming.mapper import cats.effect.Async import cats.syntax.all._ import eu.timepit.refined.auto._ -import io.constellationnetwork.currency.schema.currency.{CurrencySnapshot => OriginalCurrencySnapshot} import io.constellationnetwork.schema.address.Address import io.constellationnetwork.schema.balance.Balance -import io.constellationnetwork.schema.snapshot.{SnapshotInfo, Snapshot => OriginalSnapshot} +import io.constellationnetwork.schema.snapshot.{Snapshot => OriginalSnapshot} import io.constellationnetwork.schema.transaction.{RewardTransaction => OriginalRewardTransaction, Transaction => OriginalTransaction, TransactionReference => OriginalTransactionReference} import io.constellationnetwork.schema.{Block => OriginalBlock} -import io.constellationnetwork.security.{Hashed, Hasher} import io.constellationnetwork.security.signature.Signed +import io.constellationnetwork.security.{Hashed, Hasher} import io.constellationnetwork.syntax.sortedCollection._ import org.constellation.snapshotstreaming.schema.{AddressBalance, Block, BlockReference, Transaction, TransactionReference} import java.time.LocalDateTime -import scala.collection.immutable.{SortedMap, SortedSet} +import scala.collection.immutable.{HashMap, SortedSet} case class SnapshotReferredAddresses(source: Set[Address], destination: Set[Address]) @@ -33,23 +32,24 @@ abstract class SnapshotMapper[F[_]: Async, S <: OriginalSnapshot] { transaction.toHashed.map(_.hash.value) } - def mapBlocks(snapshot: Hashed[S], timestamp: LocalDateTime, txHasher: Hasher[F], hasher: Hasher[F]): F[Seq[Block]] = for { - blocks <- snapshot.blocks.unsorted - .map(_.block) - .map(mapBlock(snapshot.hash.value, snapshot.ordinal.value.value, timestamp, txHasher, hasher)) - .toList - .sequence - } yield blocks + def mapBlocks(snapshot: Hashed[S], timestamp: LocalDateTime, txHasher: Hasher[F], hasher: Hasher[F]): F[Seq[Block]] = + for { + blocks <- snapshot.blocks.unsorted + .map(_.block) + .map(mapBlock(snapshot.hash.value, snapshot.ordinal.value.value, timestamp, txHasher, hasher)) + .toList + .sequence + } yield blocks private def mapBlock( - snapshotHash: String, - snapshotOrdinal: Long, - timestamp: LocalDateTime, - txHasher: Hasher[F], - hasher: Hasher[F] - )( - block: Signed[OriginalBlock] - ): F[Block] = + snapshotHash: String, + snapshotOrdinal: Long, + timestamp: LocalDateTime, + txHasher: Hasher[F], + hasher: Hasher[F] + )( + block: Signed[OriginalBlock] + ): F[Block] = for { blockHash <- hashBlock(block, hasher) transactionsHashes <- block.value.transactions.toSortedSet.unsorted @@ -75,14 +75,14 @@ abstract class SnapshotMapper[F[_]: Async, S <: OriginalSnapshot] { } yield transactions.flatten private def mapTransactionsFromBlock( - snapshotHash: String, - snapshotOrdinal: Long, - timestamp: LocalDateTime, - txHasher: Hasher[F], - hasher: Hasher[F] - )( - block: Signed[OriginalBlock] - ) = for { + snapshotHash: String, + snapshotOrdinal: Long, + timestamp: LocalDateTime, + txHasher: Hasher[F], + hasher: Hasher[F] + )( + block: Signed[OriginalBlock] + ) = for { blockHash <- hashBlock(block, hasher) transactions <- block.transactions.toSortedSet.unsorted .map(mapTransaction(blockHash, snapshotHash, snapshotOrdinal, timestamp, txHasher)) @@ -91,14 +91,14 @@ abstract class SnapshotMapper[F[_]: Async, S <: OriginalSnapshot] { } yield transactions private def mapTransaction( - blockHash: String, - snapshotHash: String, - snapshotOrdinal: Long, - timestamp: LocalDateTime, - txHasher: Hasher[F] - )( - transaction: Signed[OriginalTransaction] - ): F[Transaction] = for { + blockHash: String, + snapshotHash: String, + snapshotOrdinal: Long, + timestamp: LocalDateTime, + txHasher: Hasher[F] + )( + transaction: Signed[OriginalTransaction] + ): F[Transaction] = for { transactionHash <- hashTransaction(transaction, txHasher) } yield Transaction( hash = transactionHash, @@ -120,14 +120,15 @@ abstract class SnapshotMapper[F[_]: Async, S <: OriginalSnapshot] { TransactionReference(nodeRef.hash.value, nodeRef.ordinal.value) def balanceDiff( - snapshot: S, - prevBalances: Option[SortedMap[Address, Balance]], - newBalances: SortedMap[Address, Balance], - ): SortedMap[Address, Balance] = + snapshot: S, + prevBalances: Option[Map[Address, Balance]], + newBalances: Map[Address, Balance] + ): Map[Address, Balance] = prevBalances match { case Some(prev) => + val optimizedPrev = HashMap.from(prev) val changed = newBalances.filterNot { case (address, balance) => - prev.get(address).exists(_ === balance) + optimizedPrev.get(address).exists(_ === balance) } /* NOTE: SnapshotInfo calculation optimization gets rid of addresses that have empty balances. @@ -149,10 +150,10 @@ abstract class SnapshotMapper[F[_]: Async, S <: OriginalSnapshot] { def extractSnapshotReferredAddresses(snapshot: S): SnapshotReferredAddresses def mapBalances( - globalSnapshot: Hashed[S], - balances: SortedMap[Address, Balance], - timestamp: LocalDateTime - ): Seq[AddressBalance] = + globalSnapshot: Hashed[S], + balances: Map[Address, Balance], + timestamp: LocalDateTime + ): Seq[AddressBalance] = balances.toSeq.map { case (address, balance) => AddressBalance( address = address.value.value, diff --git a/src/main/scala/org/constellation/snapshotstreaming/schema/schema.scala b/src/main/scala/org/constellation/snapshotstreaming/schema/schema.scala index b3b4cdbc..39a1019a 100644 --- a/src/main/scala/org/constellation/snapshotstreaming/schema/schema.scala +++ b/src/main/scala/org/constellation/snapshotstreaming/schema/schema.scala @@ -4,6 +4,8 @@ import io.circe.Encoder import io.constellationnetwork.security.signature.signature.SignatureProof import AllowSpends.{AllowSpend, AllowSpendExpiration, SpendTransaction} import org.constellation.snapshotstreaming.schema.TokenLocks.{TokenLock, TokenUnlock} +import io.constellationnetwork.schema.address.Address +import io.constellationnetwork.schema.balance.Balance import java.util.Date @@ -23,7 +25,8 @@ object schema { delegatedStakingRewards: Seq[DelegatedStakingReward], delegatedStakingBalanceChanges: Seq[DelegatedStakingBalanceChanges], spendTransactions: Seq[SpendTransaction], - allowSpendExpirations: Seq[AllowSpendExpiration] + allowSpendExpirations: Seq[AllowSpendExpiration], + balanceState: Map[Address, Balance] ) case class MetagraphData( @@ -36,7 +39,8 @@ object schema { spendTransactions: Seq[CurrencyData[SpendTransaction]], allowSpendExpirations: Seq[CurrencyData[AllowSpendExpiration]], tokenLocks: Seq[CurrencyData[TokenLock]] = Seq.empty, - tokenUnlocks: Seq[CurrencyData[TokenUnlock]] = Seq.empty + tokenUnlocks: Seq[CurrencyData[TokenUnlock]] = Seq.empty, + balanceState: Map[Address, Map[Address, Balance]] ) def toIncremental(snapshot: Snapshot): CurrencySnapshot =