diff --git a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/TrafficBasedRewardsSvAppTimeBasedIntegrationTest.scala b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/TrafficBasedRewardsSvAppTimeBasedIntegrationTest.scala index 431ac250ad..a33cb9512d 100644 --- a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/TrafficBasedRewardsSvAppTimeBasedIntegrationTest.scala +++ b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/TrafficBasedRewardsSvAppTimeBasedIntegrationTest.scala @@ -234,8 +234,12 @@ class TrafficBasedRewardsSvAppTimeBasedIntegrationTest val round = oldestOpenRound doTransfer(bobParty) // Need to advance by two rounds, see note below about last_archived_round - advanceRoundsToNextRoundOpening - advanceRoundsToNextRoundOpening + // Note: we can't use advanceRoundsToNextRoundOpening here, as it blocks + // on summarizing and issuing round to complete, and here the + // summarizing round will block until the sv2 provides the round totals + // via bft read. + advanceTimeAndWaitForRoundOpening + advanceTimeAndWaitForRoundOpening val (calculateRewardsCid, rootHash) = clue( @@ -267,6 +271,10 @@ class TrafficBasedRewardsSvAppTimeBasedIntegrationTest .listConfirmations(startProcessingAction) .futureValue should have size 2 } + sv1Backend.appState.dsoStore + .listOldestSummarizingMiningRounds() + .futureValue + .map(_.payload.round.number) should contain(round) } // This is trying to simulate AppActivityRecordMetaT's userVersion bump diff --git a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/TrafficBasedRewardsTimeBasedIntegrationTest.scala b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/TrafficBasedRewardsTimeBasedIntegrationTest.scala index 13258f6642..d34798bba7 100644 --- a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/TrafficBasedRewardsTimeBasedIntegrationTest.scala +++ b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/TrafficBasedRewardsTimeBasedIntegrationTest.scala @@ -172,26 +172,26 @@ abstract class TrafficBasedRewardsTimeBasedIntegrationTestBase // 3 initial advances to get open rounds with staggered opensAt for (round <- 1 to 3) { - advanceRoundsToNextRoundOpening + advanceTimeAndWaitForRoundOpening assertOldestOpenRound(round.toLong) } val id0 = settleTrade(aliceParty, bobParty, venueParty) grantFeaturedAppRight(splitwellWalletClient) - advanceRoundsToNextRoundOpening + advanceTimeAndWaitForRoundOpening assertOldestOpenRound(4) val id1 = settleTrade(aliceParty, bobParty, venueParty) grantFeaturedAppRight(aliceWalletClient) - advanceRoundsToNextRoundOpening + advanceTimeAndWaitForRoundOpening assertOldestOpenRound(5) settleTrade(aliceParty, bobParty, venueParty) settleTrade(aliceParty, bobParty, venueParty) - advanceRoundsToNextRoundOpening + advanceTimeAndWaitForRoundOpening assertOldestOpenRound(6) val id3 = settleTrade(aliceParty, bobParty, venueParty) @@ -199,7 +199,7 @@ abstract class TrafficBasedRewardsTimeBasedIntegrationTestBase settleTrade(aliceParty, bobParty, venueParty) settleTrade(aliceParty, bobParty, venueParty) - advanceRoundsToNextRoundOpening + advanceTimeAndWaitForRoundOpening assertOldestOpenRound(7) val id4 = settleTrade(aliceParty, bobParty, venueParty) @@ -210,12 +210,12 @@ abstract class TrafficBasedRewardsTimeBasedIntegrationTestBase val (aliceCreateId, svExpireId) = aliceCreateAndSvExpireInstruction(aliceParty, bobParty) - advanceRoundsToNextRoundOpening + advanceTimeAndWaitForRoundOpening assertOldestOpenRound(8) // No activity for round 8 - advanceRoundsToNextRoundOpening + advanceTimeAndWaitForRoundOpening assertOldestOpenRound(9) // Do only one DvP; this would not generate enough activity to reward the parties. @@ -241,7 +241,7 @@ abstract class TrafficBasedRewardsTimeBasedIntegrationTestBase _ => sv1ScanBackend.lookupFeaturedAppRight(venueParty) shouldBe None, ) - advanceRoundsToNextRoundOpening + advanceTimeAndWaitForRoundOpening assertOldestOpenRound(10) // Do five in a round to check nested BatchOfBatches processing @@ -251,7 +251,7 @@ abstract class TrafficBasedRewardsTimeBasedIntegrationTestBase settleTrade(aliceParty, bobParty, venueParty) settleTrade(aliceParty, bobParty, venueParty) - advanceRoundsToNextRoundOpening + advanceTimeAndWaitForRoundOpening assertOldestOpenRound(11) val id7 = settleTrade(aliceParty, bobParty, venueParty) diff --git a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/util/TimeTestUtil.scala b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/util/TimeTestUtil.scala index 09ed7bd183..cff8bf51a9 100644 --- a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/util/TimeTestUtil.scala +++ b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/util/TimeTestUtil.scala @@ -107,16 +107,47 @@ trait TimeTestUtil extends TestCommon { } } } + advanceTimeAndWaitForRoundAutomation( + durationToNextRoundOpening, + synchronizeExternalPartyConfigStates, + ) + } + + /** The amount of time to advance in order to reach the next mining round opening. */ + private def durationToNextRoundOpening(implicit + env: SpliceTestConsoleEnvironment + ): Duration = { import math.Ordering.Implicits.* val now = sv1Backend.participantClient.ledger_api.time.get().toInstant val (openRounds, _) = sv1ScanBackend.getOpenAndIssuingMiningRounds() - val advanceWith = openRounds + openRounds .filter(round => now < round.contract.payload.opensAt) .map(_.contract.payload.opensAt) .minOption .map(minFutureOpen => Duration.between(now, minFutureOpen.plusSeconds(10))) .getOrElse(tickDurationWithBuffer) - advanceTimeAndWaitForRoundAutomation(advanceWith, synchronizeExternalPartyConfigStates) + } + + /** Advance time, but only waits for the next round to open and does not wait + * for the summarizing and issuing round automation to catch up. + */ + @nowarn("msg=match may not be exhaustive") + def advanceTimeAndWaitForRoundOpening(implicit env: SpliceTestConsoleEnvironment): Unit = { + val advanceWith = durationToNextRoundOpening + val (previousOpenRounds, _) = sv1ScanBackend.getOpenAndIssuingMiningRounds() + val Seq(lowestOpen, middleOpen, highestOpen) = + previousOpenRounds.map(_.contract.payload.round.number) + + actAndCheck()("advancing time", advanceTime(advanceWith))( + s"waiting for open round automation (should create OpenMiningRound ${highestOpen + 1})", + _ => { + val (newOpenRounds, _) = sv1ScanBackend.getOpenAndIssuingMiningRounds() + val Seq(newLowestOpen, newMiddleOpen, newHighestOpen) = + newOpenRounds.map(_.contract.payload.round.number) + (newLowestOpen, newMiddleOpen, newHighestOpen) shouldBe + (lowestOpen + 1, middleOpen + 1, highestOpen + 1) + }, + ) } def advanceTimeAndUpdateExternalPartyConfigStates(implicit env: SpliceTestConsoleEnvironment) = { diff --git a/apps/scan/src/main/openapi/scan.yaml b/apps/scan/src/main/openapi/scan.yaml index 98bfb59152..d92f5639f0 100644 --- a/apps/scan/src/main/openapi/scan.yaml +++ b/apps/scan/src/main/openapi/scan.yaml @@ -4208,6 +4208,10 @@ components: - total_app_activity_weight - active_parties_count - activity_records_count + - total_app_reward_minting_allowance + - total_app_reward_thresholded + - total_app_reward_unclaimed + - rewarded_app_provider_parties_count properties: status: type: string @@ -4223,6 +4227,18 @@ components: activity_records_count: type: integer format: int64 + total_app_reward_minting_allowance: + type: string + description: The total of all minting allowances granted to app providers in this round. + total_app_reward_thresholded: + type: string + description: Total amount of minting allowances that fell below the configured app reward threshold and was thus burned. + total_app_reward_unclaimed: + type: string + description: Total amount of app rewards which could not be attributed to app providers in this round because of limit on app rewards per activity (aka the app rewards cap). + rewarded_app_provider_parties_count: + type: integer + format: int64 RewardAccountingActivityTotalsUndetermined: type: object diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/BftScanConnection.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/BftScanConnection.scala index 7b380fe36c..f0f072b7f8 100644 --- a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/BftScanConnection.scala +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/BftScanConnection.scala @@ -38,6 +38,7 @@ import org.lfdecentralizedtrust.splice.http.HttpClient import org.lfdecentralizedtrust.splice.http.v0.definitions.{ AnsEntry, GetDsoInfoResponse, + GetRewardAccountingActivityTotalsResponse, GetRewardAccountingBatchResponse, GetRewardAccountingRootHashResponse, HoldingsSummaryRequestV1, @@ -45,6 +46,8 @@ import org.lfdecentralizedtrust.splice.http.v0.definitions.{ HoldingsSummaryResponseV1, LookupTransferCommandStatusResponse, MigrationSchedule, + RewardAccountingActivityTotalsOk, + RewardAccountingActivityTotalsUndetermined, RewardAccountingRootHashOk, RewardAccountingRootHashUndetermined, } @@ -845,6 +848,54 @@ class BftScanConnection( ): Future[NonNegativeInt] = bftCall(_.getActivePhysicalSynchronizerSerial(), "getActivePhysicalSynchronizerSerial") + /** This is special because in addition to 'Ok' we can receive + * 'Undetermined' - This might indicate that scan is yet to process activity totals for this round + * 'CannotProvide' - Indicates that scan does not have required app-activity data to provide a response + * + * So simple equality comparison on responses is not possible, and we treat + * the two non-Ok responses as a "no response" by throwing IgnoreResponse so + * that this does not cause grouping in executeCall. + * + * And if no response could be obtained via bft we respond with 'Undetermined' + */ + override def getRewardAccountingActivityTotals(roundNumber: Long)(implicit + ec: ExecutionContext, + tc: TraceContext, + ): Future[GetRewardAccountingActivityTotalsResponse] = { + val undetermined = + GetRewardAccountingActivityTotalsResponse( + RewardAccountingActivityTotalsUndetermined(status = "Undetermined") + ) + val callConfig = BftCallConfig.default(scanList.scanConnections) + if (!callConfig.enoughAvailableScans) Future.successful(undetermined) + else + bftCall[RewardAccountingActivityTotalsOk]( + call = scan => + scan.getRewardAccountingActivityTotals(roundNumber).flatMap { + case GetRewardAccountingActivityTotalsResponse.members + .RewardAccountingActivityTotalsOk(ok) => + Future.successful(ok) + case _: GetRewardAccountingActivityTotalsResponse.members.RewardAccountingActivityTotalsUndetermined | + _: GetRewardAccountingActivityTotalsResponse.members.RewardAccountingActivityTotalsCannotProvide => + Future.failed(BftScanConnection.IgnoreResponse(scan.url)) + }, + endpoint = "getRewardAccountingActivityTotals", + callConfig = callConfig, + consensusLogConfig = BftScanConnection.ConsensusLogConfig( + disagreementLogLevel = Level.WARN, + onlyLogDisagreementsInSuccessResponse = true, + agreementLogLevel = Some(Level.INFO), + ), + ) + .transform(tryTotals => + Success( + tryTotals.toOption.fold(undetermined)(ok => + GetRewardAccountingActivityTotalsResponse(ok) + ) + ) + ) + } + /** This is special because in addition to 'Ok' we can receive * 'Undetermined' - This might indicate that scan is yet to process root hash for this round * 'CannotProvide' - Indicates that scan does not have required app-activity data to provide a response diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/ScanConnection.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/ScanConnection.scala index 44b27db93e..f1fcb0ebd2 100644 --- a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/ScanConnection.scala +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/ScanConnection.scala @@ -25,6 +25,7 @@ import org.lfdecentralizedtrust.splice.environment.* import org.lfdecentralizedtrust.splice.http.HttpClient import org.lfdecentralizedtrust.splice.http.v0.definitions.{ GetDsoInfoResponse, + GetRewardAccountingActivityTotalsResponse, GetRewardAccountingBatchResponse, GetRewardAccountingRootHashResponse, HoldingsSummaryResponse, @@ -336,6 +337,11 @@ trait ScanConnection tc: TraceContext, ): Future[NonNegativeInt] + def getRewardAccountingActivityTotals(roundNumber: Long)(implicit + ec: ExecutionContext, + tc: TraceContext, + ): Future[GetRewardAccountingActivityTotalsResponse] + def getRewardAccountingRootHash(roundNumber: Long)(implicit ec: ExecutionContext, tc: TraceContext, diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/SingleScanConnection.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/SingleScanConnection.scala index 5f5a445ed0..56037aa066 100644 --- a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/SingleScanConnection.scala +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/SingleScanConnection.scala @@ -32,6 +32,7 @@ import org.lfdecentralizedtrust.splice.environment.{ } import org.lfdecentralizedtrust.splice.http.HttpClient import org.lfdecentralizedtrust.splice.http.v0.definitions.{ + GetRewardAccountingActivityTotalsResponse, GetRewardAccountingBatchResponse, GetRewardAccountingRootHashResponse, HoldingsSummaryRequestV1, @@ -821,6 +822,15 @@ class SingleScanConnection private[client] ( HttpScanAppClient.GetActivePhysicalSynchronizerSerial(), ) + override def getRewardAccountingActivityTotals(roundNumber: Long)(implicit + ec: ExecutionContext, + tc: TraceContext, + ): Future[GetRewardAccountingActivityTotalsResponse] = + runHttpCmd( + config.adminApi.url, + HttpScanAppClient.GetRewardAccountingActivityTotals(roundNumber), + ) + override def getRewardAccountingRootHash(roundNumber: Long)(implicit ec: ExecutionContext, tc: TraceContext, diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/http/HttpScanHandler.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/http/HttpScanHandler.scala index 51aaaabd19..3f19ba4510 100644 --- a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/http/HttpScanHandler.scala +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/http/HttpScanHandler.scala @@ -2705,54 +2705,55 @@ class HttpScanHandler( ScanResource.GetRewardAccountingActivityTotalsResponse ] = { implicit val tc = extracted + val undetermined = ScanResource.GetRewardAccountingActivityTotalsResponse.OK( + definitions.GetRewardAccountingActivityTotalsResponse( + definitions.RewardAccountingActivityTotalsUndetermined(status = "Undetermined") + ) + ) + val cannotProvide = ScanResource.GetRewardAccountingActivityTotalsResponse.OK( + definitions.GetRewardAccountingActivityTotalsResponse( + definitions.RewardAccountingActivityTotalsCannotProvide(status = "CannotProvide") + ) + ) withSpan(s"$workflowId.getRewardAccountingActivityTotals") { _ => _ => (appRewardsStoreO, appActivityStoreO) match { case (Some(appRewardsStore), Some(appActivityStore)) => appRewardsStore.getAppActivityRoundTotalByRound(roundNumber).flatMap { - case Some(roundTotal) => - Future.successful( - ScanResource.GetRewardAccountingActivityTotalsResponse.OK( - definitions.GetRewardAccountingActivityTotalsResponse( - definitions.RewardAccountingActivityTotalsOk( - status = "Ok", - roundNumber = roundTotal.roundNumber, - totalAppActivityWeight = roundTotal.totalRoundAppActivityWeight, - activePartiesCount = roundTotal.activeAppProviderPartiesCount, - activityRecordsCount = roundTotal.activityRecordsCount, - ) - ) - ) - ) - case None => - appActivityStore.earliestRoundWithCompleteAppActivity().map { - case Some(earliest) if roundNumber < earliest => + case Some(activityTotal) => + appRewardsStore.getAppRewardRoundTotalByRound(roundNumber).map { + case Some(rewardTotal) => ScanResource.GetRewardAccountingActivityTotalsResponse.OK( definitions.GetRewardAccountingActivityTotalsResponse( - definitions.RewardAccountingActivityTotalsCannotProvide( - status = "CannotProvide" + definitions.RewardAccountingActivityTotalsOk( + status = "Ok", + roundNumber = activityTotal.roundNumber, + totalAppActivityWeight = activityTotal.totalRoundAppActivityWeight, + activePartiesCount = activityTotal.activeAppProviderPartiesCount, + activityRecordsCount = activityTotal.activityRecordsCount, + totalAppRewardMintingAllowance = + rewardTotal.totalAppRewardMintingAllowance.toString, + totalAppRewardThresholded = rewardTotal.totalAppRewardThresholded.toString, + totalAppRewardUnclaimed = rewardTotal.totalAppRewardUnclaimed.toString, + rewardedAppProviderPartiesCount = + rewardTotal.rewardedAppProviderPartiesCount, ) ) ) + case None => + // We should never hit this, as both activity totals and round + // totals are added in a single DB Tx + undetermined + } + case None => + appActivityStore.earliestRoundWithCompleteAppActivity().map { + case Some(earliest) if roundNumber < earliest => + cannotProvide case _ => - ScanResource.GetRewardAccountingActivityTotalsResponse.OK( - definitions.GetRewardAccountingActivityTotalsResponse( - definitions.RewardAccountingActivityTotalsUndetermined( - status = "Undetermined" - ) - ) - ) + undetermined } } case _ => - Future.successful( - ScanResource.GetRewardAccountingActivityTotalsResponse.OK( - definitions.GetRewardAccountingActivityTotalsResponse( - definitions.RewardAccountingActivityTotalsCannotProvide( - status = "CannotProvide" - ) - ) - ) - ) + Future.successful(cannotProvide) } } } @@ -2763,6 +2764,16 @@ class HttpScanHandler( ScanResource.GetRewardAccountingRootHashResponse ] = { implicit val tc = extracted + val undetermined = ScanResource.GetRewardAccountingRootHashResponse.OK( + definitions.GetRewardAccountingRootHashResponse( + definitions.RewardAccountingRootHashUndetermined(status = "Undetermined") + ) + ) + val cannotProvide = ScanResource.GetRewardAccountingRootHashResponse.OK( + definitions.GetRewardAccountingRootHashResponse( + definitions.RewardAccountingRootHashCannotProvide(status = "CannotProvide") + ) + ) withSpan(s"$workflowId.getRewardAccountingRootHash") { _ => _ => (appRewardsStoreO, appActivityStoreO) match { case (Some(appRewardsStore), Some(appActivityStore)) => @@ -2782,33 +2793,13 @@ class HttpScanHandler( case None => appActivityStore.earliestRoundWithCompleteAppActivity().map { case Some(earliest) if roundNumber < earliest => - ScanResource.GetRewardAccountingRootHashResponse.OK( - definitions.GetRewardAccountingRootHashResponse( - definitions.RewardAccountingRootHashCannotProvide( - status = "CannotProvide" - ) - ) - ) + cannotProvide case _ => - ScanResource.GetRewardAccountingRootHashResponse.OK( - definitions.GetRewardAccountingRootHashResponse( - definitions.RewardAccountingRootHashUndetermined( - status = "Undetermined" - ) - ) - ) + undetermined } } case _ => - Future.successful( - ScanResource.GetRewardAccountingRootHashResponse.OK( - definitions.GetRewardAccountingRootHashResponse( - definitions.RewardAccountingRootHashCannotProvide( - status = "CannotProvide" - ) - ) - ) - ) + Future.successful(cannotProvide) } } } diff --git a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/BftScanConnectionTest.scala b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/BftScanConnectionTest.scala index c1ac557b2c..4fa69eb0f2 100644 --- a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/BftScanConnectionTest.scala +++ b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/BftScanConnectionTest.scala @@ -34,8 +34,12 @@ import org.lfdecentralizedtrust.splice.environment.{ } import org.lfdecentralizedtrust.splice.http.v0.definitions.{ ErrorResponse, + GetRewardAccountingActivityTotalsResponse, GetRewardAccountingBatchResponse, GetRewardAccountingRootHashResponse, + RewardAccountingActivityTotalsCannotProvide, + RewardAccountingActivityTotalsOk, + RewardAccountingActivityTotalsUndetermined, RewardAccountingBatchOfBatches, RewardAccountingRootHashCannotProvide, RewardAccountingRootHashOk, @@ -277,6 +281,54 @@ class BftScanConnectionTest ) ) ) + private def activityTotalsOk( + round: Long, + totalAppActivityWeight: Long, + activePartiesCount: Long, + activityRecordsCount: Long, + ): GetRewardAccountingActivityTotalsResponse = + GetRewardAccountingActivityTotalsResponse( + RewardAccountingActivityTotalsOk( + status = "Ok", + roundNumber = round, + totalAppActivityWeight = totalAppActivityWeight, + activePartiesCount = activePartiesCount, + activityRecordsCount = activityRecordsCount, + totalAppRewardMintingAllowance = "0", + totalAppRewardThresholded = "0", + totalAppRewardUnclaimed = "0", + rewardedAppProviderPartiesCount = 0L, + ) + ) + def makeMockReturnActivityTotalsOk( + mock: SingleScanConnection, + round: Long, + totalAppActivityWeight: Long, + activePartiesCount: Long, + activityRecordsCount: Long, + ): Unit = + when(mock.getRewardAccountingActivityTotals(round)) + .thenReturn( + Future.successful( + activityTotalsOk(round, totalAppActivityWeight, activePartiesCount, activityRecordsCount) + ) + ) + def makeMockReturnActivityTotalsUndetermined(mock: SingleScanConnection, round: Long): Unit = + when(mock.getRewardAccountingActivityTotals(round)).thenReturn( + Future.successful( + GetRewardAccountingActivityTotalsResponse( + RewardAccountingActivityTotalsUndetermined(status = "Undetermined") + ) + ) + ) + def makeMockReturnActivityTotalsCannotProvide(mock: SingleScanConnection, round: Long): Unit = + when(mock.getRewardAccountingActivityTotals(round)).thenReturn( + Future.successful( + GetRewardAccountingActivityTotalsResponse( + RewardAccountingActivityTotalsCannotProvide(status = "CannotProvide") + ) + ) + ) private val rewardAccountingBatchResponse: GetRewardAccountingBatchResponse = GetRewardAccountingBatchResponse( RewardAccountingBatchOfBatches(batchType = "BatchOfBatches", childHashes = Vector("aa", "bb")) @@ -1291,6 +1343,125 @@ class BftScanConnectionTest } } + "BftScanConnection.getRewardAccountingActivityTotals" should { + + // n=4 scans -> default BFT threshold requiredNumScanThreshold(4) = f+1 = 2. + "reaches consensus when f+1 scans agree on the same totals" in { + val round = 42L + val connections = getMockedConnections(n = 4) + makeMockReturnActivityTotalsOk(connections(0), round, 100L, 10L, 5L) + when(connections(1).getRewardAccountingActivityTotals(round)) + .thenReturn( + Future.failed(notFoundFailure), + Future.successful(activityTotalsOk(round, 100L, 10L, 5L)), + ) + makeMockReturnActivityTotalsUndetermined(connections(2), round) + makeMockFail(connections(3), notFoundFailure) + val bft = getBft(connections) + + // With n=4, we query only two connections randomly, and even with + // retries it can sometimes fail. This eventually is here to avoid flakyness. + loggerFactory + .assertEventuallyLogsSeq(SuppressionRule.LevelAndAbove(Level.INFO))( + { + eventually() { + inside(bft.getRewardAccountingActivityTotals(round).futureValue) { + case GetRewardAccountingActivityTotalsResponse.members + .RewardAccountingActivityTotalsOk(ok) => + ok.roundNumber should be(round) + ok.totalAppActivityWeight should be(100L) + ok.activePartiesCount should be(10L) + ok.activityRecordsCount should be(5L) + } + } + Future.unit + }, + logs => + logs.exists(l => + l.level == Level.INFO && l.message.contains("Reached consensus from") + ) should be(true), + ) + .map(_ => succeed) + } + + "returns Undetermined when no quorum agrees on the totals" in { + val round = 42L + val connections = getMockedConnections(n = 4) + connections.zipWithIndex.foreach { case (c, i) => + makeMockReturnActivityTotalsOk(c, round, 100L + i, 10L + i, 5L + i) + } + val bft = getBft(connections) + + for { + resp <- bft.getRewardAccountingActivityTotals(round) + } yield inside(resp) { + case _: GetRewardAccountingActivityTotalsResponse.members.RewardAccountingActivityTotalsUndetermined => + succeed + } + } + + "never treats agreement on CannotProvide as consensus" in { + val round = 42L + val connections = getMockedConnections(n = 4) + connections.foreach(makeMockReturnActivityTotalsCannotProvide(_, round)) + val bft = getBft(connections) + + for { + resp <- bft.getRewardAccountingActivityTotals(round) + } yield inside(resp) { + case _: GetRewardAccountingActivityTotalsResponse.members.RewardAccountingActivityTotalsUndetermined => + succeed + } + } + + "never treats agreement on Undetermined as consensus" in { + val round = 42L + val connections = getMockedConnections(n = 4) + connections.foreach(makeMockReturnActivityTotalsUndetermined(_, round)) + val bft = getBft(connections) + + for { + resp <- bft.getRewardAccountingActivityTotals(round) + } yield inside(resp) { + case _: GetRewardAccountingActivityTotalsResponse.members.RewardAccountingActivityTotalsUndetermined => + succeed + } + } + + "returns Undetermined when there are no peer scans" in { + val bft = getBft(Seq.empty) + + for { + resp <- bft.getRewardAccountingActivityTotals(1L) + } yield inside(resp) { + case _: GetRewardAccountingActivityTotalsResponse.members.RewardAccountingActivityTotalsUndetermined => + succeed + } + } + + "logs disagreements at WARN level" in { + val round = 42L + val connections = getMockedConnections(n = 4) + makeMockReturnActivityTotalsOk(connections(0), round, 100L, 10L, 5L) + makeMockReturnActivityTotalsOk(connections(1), round, 100L, 10L, 5L) + makeMockReturnActivityTotalsOk(connections(2), round, 200L, 20L, 9L) + makeMockReturnActivityTotalsOk(connections(3), round, 200L, 20L, 9L) + val bft = getBft(connections) + + loggerFactory + .assertEventuallyLogsSeq(SuppressionRule.Level(Level.WARN))( + bft.getRewardAccountingActivityTotals(round), + logs => + logs.exists(log => + log.level == Level.WARN && log.message.contains( + "disagreed with consensus" + ) + ) should be(true), + ) + .map(_ => succeed) + } + } + "BftScanConnection.getRewardAccountingBatch" should { "returns None when no scan has the batch" in { diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/RewardProcessingMetrics.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/RewardProcessingMetrics.scala index 8c5b0bc79b..7cfb6f8800 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/RewardProcessingMetrics.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/RewardProcessingMetrics.scala @@ -57,4 +57,15 @@ class RewardProcessingMetrics(metricsFactory: LabeledMetricsFactory)( qualification = Traffic, ) )(metricsContext) + + val summarizingRoundTotalsBftReads: Meter = + metricsFactory.meter( + MetricInfo( + name = prefix :+ "summarizing_mining_round" :+ "totals_bft_reads", + summary = "Count of BFT reads of the reward-accounting totals", + description = + "This metric counts the BFT reads of the reward-accounting totals performed by the SummarizingMiningRound trigger, i.e., the cases where this SV's own Scan could not provide the totals and it had to be obtained via a BFT read against peer Scans.", + qualification = Traffic, + ) + )(metricsContext) } diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/SvDsoAutomationService.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/SvDsoAutomationService.scala index 4c38181424..35a7ce994b 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/SvDsoAutomationService.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/SvDsoAutomationService.scala @@ -489,6 +489,8 @@ class SvDsoAutomationService( triggerContext, dsoStore, connection(SpliceLedgerConnectionPriority.Medium), + () => getOrCreateOwnScanConnection(), + () => getOrCreatePeerScanConnection(), ) ) registerTrigger( diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/confirmation/SummarizingMiningRoundTrigger.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/confirmation/SummarizingMiningRoundTrigger.scala index 77eae3c1d7..bbdb4cd33a 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/confirmation/SummarizingMiningRoundTrigger.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/confirmation/SummarizingMiningRoundTrigger.scala @@ -11,6 +11,7 @@ import org.lfdecentralizedtrust.splice.automation.{ TriggerContext, } import org.lfdecentralizedtrust.splice.codegen.java.splice +import org.lfdecentralizedtrust.splice.codegen.java.splice.amuletconfig.RewardVersion import org.lfdecentralizedtrust.splice.codegen.java.splice.amuletrules.AmuletRules_MiningRound_StartIssuing import org.lfdecentralizedtrust.splice.codegen.java.splice.issuance.OpenMiningRoundSummary import org.lfdecentralizedtrust.splice.codegen.java.splice.round.SummarizingMiningRound @@ -18,14 +19,23 @@ import org.lfdecentralizedtrust.splice.codegen.java.splice.dsorules.ActionRequir import org.lfdecentralizedtrust.splice.codegen.java.splice.dsorules.actionrequiringconfirmation.ARC_AmuletRules import org.lfdecentralizedtrust.splice.codegen.java.splice.dsorules.amuletrules_actionrequiringconfirmation.CRARC_MiningRound_StartIssuing import org.lfdecentralizedtrust.splice.environment.SpliceLedgerConnection +import org.lfdecentralizedtrust.splice.http.v0.definitions +import org.lfdecentralizedtrust.splice.http.v0.definitions.GetRewardAccountingActivityTotalsResponse.members.{ + RewardAccountingActivityTotalsCannotProvide, + RewardAccountingActivityTotalsOk, + RewardAccountingActivityTotalsUndetermined, +} +import org.lfdecentralizedtrust.splice.scan.admin.api.client.{BftScanConnection, ScanConnection} import org.lfdecentralizedtrust.splice.store.MultiDomainAcsStore.QueryResult -import org.lfdecentralizedtrust.splice.sv.store.SvDsoStore +import org.lfdecentralizedtrust.splice.sv.automation.RewardProcessingMetrics +import org.lfdecentralizedtrust.splice.sv.store.{AppRewardCouponsSum, SvDsoStore} import org.lfdecentralizedtrust.splice.util.AssignedContract import org.lfdecentralizedtrust.splice.util.PrettyInstances.* +import com.daml.metrics.api.MetricsContext import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting} import com.digitalasset.canton.topology.SynchronizerId import com.digitalasset.canton.tracing.TraceContext -import com.digitalasset.canton.util.MonadUtil +import io.grpc.Status import io.opentelemetry.api.trace.Tracer import java.util.Optional @@ -39,6 +49,8 @@ class SummarizingMiningRoundTrigger( override protected val context: TriggerContext, store: SvDsoStore, connection: SpliceLedgerConnection, + scanConnectionF: () => Future[ScanConnection], + bftScanConnectionF: () => Future[BftScanConnection], )(implicit ec: ExecutionContext, mat: Materializer, @@ -49,6 +61,8 @@ class SummarizingMiningRoundTrigger( private val svParty = store.key.svParty private val dsoParty = store.key.dsoParty + private val rewardMetrics = + new RewardProcessingMetrics(context.metricsFactory)(MetricsContext.Empty) private def amuletRulesStartIssuingAction( miningRoundCid: SummarizingMiningRound.ContractId, @@ -65,42 +79,24 @@ class SummarizingMiningRoundTrigger( override def retrieveTasks()(implicit tc: TraceContext): Future[Seq[Task]] = for { summarizingRounds <- store.listOldestSummarizingMiningRounds() - tasks <- MonadUtil - .sequentialTraverse(summarizingRounds) { round => - for { - rewards <- queryRewards( - round.payload.round.number, - round.domain, - round.payload.issuanceConfig, - ) - action = amuletRulesStartIssuingAction( - round.contractId, - rewards.summary, - ) - queryResult <- store.lookupConfirmationByActionWithOffset(svParty, action) - } yield queryResult.value match { - case None => - Some( - Task( - round, - rewards, - ) - ) - case Some(_) => None - } - } - .map(_.flatten) - } yield tasks + confirmedCids <- listConfirmedMiningRoundStartIssuingCids() + } yield summarizingRounds + .filterNot(round => confirmedCids.contains(round.contractId)) + .map(Task(_)) override def completeTask( task: Task )(implicit tc: TraceContext): Future[TaskOutcome] = { val round = task.summarizingRound.contract.payload.round.number for { + rewards <- queryRewards( + task.summarizingRound.payload, + task.summarizingRound.domain, + ) dsoRules <- store.getDsoRules() action = amuletRulesStartIssuingAction( task.summarizingRound.contractId, - task.rewards.summary, + rewards.summary, ) queryResult <- store.lookupConfirmationByActionWithOffset(svParty, action) cmd = dsoRules.exercise( @@ -153,20 +149,31 @@ class SummarizingMiningRoundTrigger( * for a SummarizingMiningRound. */ private def queryRewards( - round: Long, + payload: splice.round.SummarizingMiningRound, domain: SynchronizerId, - issuanceConfig: splice.issuance.IssuanceConfig, )(implicit ec: ExecutionContext, traceContext: TraceContext, ): Future[RoundRewards] = { + val round = payload.round.number + val issuanceConfig = payload.issuanceConfig val faucetCapIsZero = issuanceConfig.optValidatorFaucetCap.toScala .exists(_.compareTo(java.math.BigDecimal.ZERO) <= 0) for { - appRewardCoupons <- store.sumAppRewardCouponsOnDomain( - round, - domain, - ) + appRewardCoupons <- + if (useTrafficBasedAppRewards(payload)) { + fetchRewardAccountingTotals(round).map { totals => + // The total featured app rewards (in CC) is the sum of the minting + // allowance and the thresholded amount reported by Scan. + val appRewardsInCc = + (BigDecimal(totals.totalAppRewardMintingAllowance) + + BigDecimal(totals.totalAppRewardThresholded)) + .setScale(10, BigDecimal.RoundingMode.HALF_EVEN) + AppRewardCouponsSum(featured = appRewardsInCc, unfeatured = BigDecimal(0)) + } + } else { + store.sumAppRewardCouponsOnDomain(round, domain) + } validatorRewardCoupons <- store.sumValidatorRewardCouponsOnDomain( round, domain, @@ -193,6 +200,64 @@ class SummarizingMiningRoundTrigger( ) } } + + private def listConfirmedMiningRoundStartIssuingCids()(implicit + tc: TraceContext + ): Future[Set[SummarizingMiningRound.ContractId]] = + store.listConfirmationsByConfirmer(svParty).map { confirmations => + confirmations.iterator.flatMap { c => + c.payload.action match { + case arc: ARC_AmuletRules => + arc.amuletRulesAction match { + case crarc: CRARC_MiningRound_StartIssuing => + Some(crarc.amuletRules_MiningRound_StartIssuingValue.miningRoundCid) + case _ => None + } + case _ => None + } + }.toSet + } + + private def useTrafficBasedAppRewards( + payload: splice.round.SummarizingMiningRound + ): Boolean = + payload.rewardConfig.toScala.exists( + _.mintingVersion == RewardVersion.REWARDVERSION_TRAFFICBASEDAPPREWARDS + ) + + private def fetchRewardAccountingTotals( + round: Long + )(implicit tc: TraceContext): Future[definitions.RewardAccountingActivityTotalsOk] = { + def totalsUnavailable(reason: String): Nothing = + throw Status.FAILED_PRECONDITION + .withDescription(s"For round $round: $reason") + .asRuntimeException() + + def bftReadTotals: Future[definitions.RewardAccountingActivityTotalsOk] = { + rewardMetrics.summarizingRoundTotalsBftReads.mark() + for { + bftScan <- bftScanConnectionF() + response <- bftScan.getRewardAccountingActivityTotals(round) + } yield response match { + case RewardAccountingActivityTotalsOk(ok) => + logger.info(s"Obtained the reward accounting totals for round $round via BFT read.") + ok + case _ => totalsUnavailable("could not obtain reward accounting totals via BFT read.") + } + } + + for { + ownScan <- scanConnectionF() + response <- ownScan.getRewardAccountingActivityTotals(round) + totals <- response match { + case RewardAccountingActivityTotalsOk(ok) => + Future.successful(ok) + case RewardAccountingActivityTotalsUndetermined(_) => + totalsUnavailable("our own Scan has not yet computed the reward accounting totals.") + case RewardAccountingActivityTotalsCannotProvide(_) => bftReadTotals + } + } yield totals + } } object SummarizingMiningRoundTrigger { @@ -227,13 +292,11 @@ object SummarizingMiningRoundTrigger { summarizingRound: AssignedContract[ splice.round.SummarizingMiningRound.ContractId, splice.round.SummarizingMiningRound, - ], - rewards: RoundRewards, + ] ) extends PrettyPrinting { override def pretty: Pretty[this.type] = prettyOfClass( - param("summarizingRound", _.summarizingRound), - param("rewards", _.rewards), + param("summarizingRound", _.summarizingRound) ) } }