Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.constellation.snapshotstreaming.schema.{
}
import io.constellationnetwork.security.signature.signature.SignatureProof
import org.constellation.snapshotstreaming.schema.TokenLocks.{TokenLock, TokenUnlock}
import org.typelevel.log4cats.slf4j.Slf4jLogger
import skunk._
import skunk.codec.all._
import skunk.implicits._
Expand Down Expand Up @@ -654,8 +655,10 @@ object SnapshotDAO {

def make[F[_]: Async](pool: Resource[F, Session[F]]): SnapshotDAO[F] = new SnapshotDAO[F] {

private implicit val logger = Slf4jLogger.getLogger[F]

def insertGlobalData(snapshot: GlobalData, mgSnaphotsCount: Int): F[Unit] =
pool.use { session =>
retryF(pool.use { session =>
session.transaction.use { xa =>
val gsHash = snapshot.snapshot.hash
val blockParents = snapshot.blocks.toList.flatMap(b => b.parent.map((b.hash, _)))
Expand Down Expand Up @@ -697,10 +700,10 @@ object SnapshotDAO {
_ <- xa.commit
} yield ()
}
}
})

def insertMetagraphData(globalSnapshotHash: String, mgSnapshot: MetagraphData): F[Unit] =
pool.use { session =>
retryF(pool.use { session =>
session.transaction.use { xa =>
val blockParents = mgSnapshot.blocks.flatMap { currencyData =>
currencyData.data.parent.map(parent => (currencyData.identifier, currencyData.data.hash, parent))
Expand Down Expand Up @@ -742,7 +745,7 @@ object SnapshotDAO {
_ <- xa.commit
} yield ()
}
}
})

}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package org.constellation.snapshotstreaming

import cats.Applicative
import cats.{Applicative, MonadError}
import cats.effect.std.Console
import cats.effect.{Resource, Temporal}
import cats.syntax.all._
import fs2.io.net.Network
import org.typelevel.log4cats.Logger
import org.typelevel.otel4s.trace.Tracer
import skunk.exception.EofException
import skunk.{PreparedCommand, Session}

package object db {
Expand All @@ -26,4 +28,15 @@ package object db {
def executeCmd[T, F[_]: Applicative](cmd: PreparedCommand[F, T])(entities: Seq[T]): F[Unit] =
entities.traverse(e => cmd.execute(e)).whenA(entities.nonEmpty)

def retryF[F[_]: Logger, A](effect: F[A], maxRetries: Int = 5)(implicit
F: MonadError[F, Throwable],
timer: Temporal[F]
): F[A] =
effect.handleErrorWith { case err: EofException =>
if (maxRetries > 0)
Logger[F].warn(s"Error $err from db, retrying... (${maxRetries})") *> retryF(effect, maxRetries - 1)
else
F.raiseError(err)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,48 @@ import io.constellationnetwork.security.signature.Signed
import io.constellationnetwork.security.{Hashed, Hasher}
import io.constellationnetwork.statechannel.StateChannelSnapshotBinary
import org.constellation.snapshotstreaming.schema.AllowSpends.{AllowSpend, SpendTransaction}
import org.constellation.snapshotstreaming.schema.{CurrencySnapshot, FeeTransaction, RewardTransaction, TransactionReference}
import org.constellation.snapshotstreaming.schema.{
CurrencySnapshot,
FeeTransaction,
RewardTransaction,
TransactionReference
}

import java.time.LocalDateTime
import scala.collection.immutable.SortedSet

abstract class CurrencyIncrementalSnapshotMapper[F[_]: Async]
extends SnapshotMapper[F, CurrencyIncrementalSnapshot] {
abstract class CurrencyIncrementalSnapshotMapper[F[_]: Async] extends SnapshotMapper[F, CurrencyIncrementalSnapshot] {

def mapSnapshot(
snapshot: Hashed[CurrencyIncrementalSnapshot],
binary: Signed[StateChannelSnapshotBinary],
info: CurrencySnapshotInfo,
timestamp: LocalDateTime,
hasher: Hasher[F]
): F[CurrencySnapshot]
snapshot: Hashed[CurrencyIncrementalSnapshot],
binary: Signed[StateChannelSnapshotBinary],
info: CurrencySnapshotInfo,
timestamp: LocalDateTime,
hasher: Hasher[F]
): F[CurrencySnapshot]

def mapFeeTransactions(
snapshot: Hashed[CurrencyIncrementalSnapshot],
timestamp: LocalDateTime,
hasher: Hasher[F]
): F[List[FeeTransaction]]
snapshot: Hashed[CurrencyIncrementalSnapshot],
timestamp: LocalDateTime,
hasher: Hasher[F]
): F[List[FeeTransaction]]

def mapAllowSpends(snapshot: Hashed[CurrencyIncrementalSnapshot], timestamp: LocalDateTime, hasher: Hasher[F]): F[List[AllowSpend]]
def mapAllowSpends(
snapshot: Hashed[CurrencyIncrementalSnapshot],
timestamp: LocalDateTime,
hasher: Hasher[F]
): F[List[AllowSpend]]

def mapTokenLocks(snapshot: Hashed[CurrencyIncrementalSnapshot], timestamp: LocalDateTime, hasher: Hasher[F]): F[List[TokenLock]]
def mapTokenLocks(
snapshot: Hashed[CurrencyIncrementalSnapshot],
timestamp: LocalDateTime,
hasher: Hasher[F]
): F[List[TokenLock]]

def mapArtifacts(snapshot: Hashed[CurrencyIncrementalSnapshot], hasher: Hasher[F]): F[(List[SpendTransaction], List[TokenUnlock], List[AllowSpendExpiration])]
def mapArtifacts(
snapshot: Hashed[CurrencyIncrementalSnapshot],
hasher: Hasher[F]
): F[(List[SpendTransaction], List[TokenUnlock], List[AllowSpendExpiration])]

}

Expand All @@ -64,23 +79,23 @@ object CurrencyIncrementalSnapshotMapper {
}

def mapFeeTransactions(
snapshot: Hashed[CurrencyIncrementalSnapshot],
timestamp: LocalDateTime,
hasher: Hasher[F]
): F[List[FeeTransaction]] = {
snapshot: Hashed[CurrencyIncrementalSnapshot],
timestamp: LocalDateTime,
hasher: Hasher[F]
): F[List[FeeTransaction]] = {
implicit val hs: Hasher[F] = hasher
snapshot.feeTransactions.toList.flatTraverse(
_.toList.traverse(mapFeeTransaction(snapshot.hash.value, snapshot.ordinal.value, timestamp))
)
}

def mapSnapshot(
snapshot: Hashed[CurrencyIncrementalSnapshot],
binary: Signed[StateChannelSnapshotBinary],
info: CurrencySnapshotInfo,
timestamp: LocalDateTime,
hasher: Hasher[F]
): F[CurrencySnapshot] = for {
snapshot: Hashed[CurrencyIncrementalSnapshot],
binary: Signed[StateChannelSnapshotBinary],
info: CurrencySnapshotInfo,
timestamp: LocalDateTime,
hasher: Hasher[F]
): F[CurrencySnapshot] = for {
blocksHashes <- snapshot.blocks.unsorted.map(_.block).map(hashBlock(_, hasher)).toList.sequence
sizeInKb <- SizeCalculator.kilobytes(binary)
} yield CurrencySnapshot(
Expand Down Expand Up @@ -143,10 +158,14 @@ object CurrencyIncrementalSnapshotMapper {
)
}

def mapAllowSpends(snapshot: Hashed[CurrencyIncrementalSnapshot], timestamp: LocalDateTime, hasher: Hasher[F]): F[List[AllowSpend]] = {
def mapAllowSpends(
snapshot: Hashed[CurrencyIncrementalSnapshot],
timestamp: LocalDateTime,
hasher: Hasher[F]
): F[List[AllowSpend]] = {
implicit val hs: Hasher[F] = hasher
snapshot.allowSpendBlocks.toList.flatTraverse(
_.toList.flatTraverse( alb => alb.transactions.toList.traverse(mapAllowSpend(snapshot.hash, alb.roundId)))
_.toList.flatTraverse(alb => alb.transactions.toList.traverse(mapAllowSpend(snapshot.hash, alb.roundId)))
)
}

Expand All @@ -166,17 +185,21 @@ object CurrencyIncrementalSnapshotMapper {
)
}

def mapTokenLocks(snapshot: Hashed[CurrencyIncrementalSnapshot], timestamp: LocalDateTime, hasher: Hasher[F]): F[List[TokenLock]] = {
def mapTokenLocks(
snapshot: Hashed[CurrencyIncrementalSnapshot],
timestamp: LocalDateTime,
hasher: Hasher[F]
): F[List[TokenLock]] = {
implicit val hs: Hasher[F] = hasher
snapshot.tokenLockBlocks.toList.flatTraverse(
_.toList.flatTraverse( tlb => tlb.tokenLocks.toList.traverse(mapTokenLock(snapshot.hash, tlb.roundId)))
_.toList.flatTraverse(tlb => tlb.tokenLocks.toList.traverse(mapTokenLock(snapshot.hash, tlb.roundId)))
)
}

def mapSpendTx(snapshotHash: Hash)(
spendTx: artifact.SpendTransaction
)(implicit hasher: Hasher[F]): F[SpendTransaction] = hasher.hash(spendTx).map {
hash => SpendTransaction(
)(implicit hasher: Hasher[F]): F[SpendTransaction] = hasher.hash(spendTx).map { hash =>
SpendTransaction(
hash.value,
spendTx.source.value,
spendTx.destination.value,
Expand All @@ -186,47 +209,50 @@ object CurrencyIncrementalSnapshotMapper {
)
}

def mapTokenUnlock( snapshotHash: Hash, tokenUnlock: artifact.TokenUnlock )
(implicit hasher: Hasher[F]): F[TokenUnlock] = hasher.hash(tokenUnlock).map {
hash => TokenUnlock(
def mapTokenUnlock(snapshotHash: Hash, tokenUnlock: artifact.TokenUnlock)(implicit
hasher: Hasher[F]
): F[TokenUnlock] = hasher.hash(tokenUnlock).map { hash =>
TokenUnlock(
snapshotHash.value,
hash.value,
tokenUnlock.tokenLockRef.value,
tokenUnlock.amount.value,
tokenUnlock.source.value,
tokenUnlock.source.value
)
}

def mapExpiration(snapshotHash: Hash, expiry: artifact.AllowSpendExpiration)
(implicit hasher: Hasher[F]): F[AllowSpendExpiration] = hasher.hash(expiry).map {
hash => AllowSpendExpiration(
def mapExpiration(snapshotHash: Hash, expiry: artifact.AllowSpendExpiration)(implicit
hasher: Hasher[F]
): F[AllowSpendExpiration] = hasher.hash(expiry).map { hash =>
AllowSpendExpiration(
snapshotHash.value,
hash.value,
expiry.allowSpendRef.value,
expiry.allowSpendRef.value
)
}


def mapArtifacts(snapshot: Hashed[CurrencyIncrementalSnapshot], hasher: Hasher[F]): F[(List[SpendTransaction], List[TokenUnlock], List[AllowSpendExpiration])] = {
def mapArtifacts(
snapshot: Hashed[CurrencyIncrementalSnapshot],
hasher: Hasher[F]
): F[(List[SpendTransaction], List[TokenUnlock], List[AllowSpendExpiration])] = {
implicit val hs: Hasher[F] = hasher
val events = snapshot.artifacts.toList.flatten
val spendTxs = events.flatTraverse {
case artifact.SpendAction(spendTransactions) => spendTransactions.toList.traverse(mapSpendTx(snapshot.hash))
case _ => List.empty[SpendTransaction].pure
case _ => List.empty[SpendTransaction].pure
}
val tokenUnlocks = events.flatTraverse {
case tu: artifact.TokenUnlock => mapTokenUnlock(snapshot.hash, tu).map(List(_))
case _ => List.empty[TokenUnlock].pure
case _ => List.empty[TokenUnlock].pure
}
val expirations = events.flatTraverse {
case exp: artifact.AllowSpendExpiration => mapExpiration(snapshot.hash, exp).map(List(_))
case _ => List.empty[AllowSpendExpiration].pure
case _ => List.empty[AllowSpendExpiration].pure
}

(spendTxs, tokenUnlocks, expirations).tupled
}


}

}
Loading