diff --git a/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/ML0CustomRoutes.scala b/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/ML0CustomRoutes.scala index 41ae7870..f7208f21 100755 --- a/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/ML0CustomRoutes.scala +++ b/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/ML0CustomRoutes.scala @@ -3,21 +3,19 @@ package xyz.kd5ujc.metagraph_l0 import cats.effect.Async import cats.syntax.all._ -import io.constellationnetwork.currency.dataApplication.{DataApplicationValidationError, L0NodeContext} +import io.constellationnetwork.currency.dataApplication.DataApplicationValidationError import io.constellationnetwork.ext.http4s.error.RefinedRequestApplicationDecoder import io.constellationnetwork.metagraph_sdk.MetagraphPublicRoutes -import io.constellationnetwork.metagraph_sdk.lifecycle.CheckpointService +import io.constellationnetwork.metagraph_sdk.lifecycle.committed.CommittedReader import io.constellationnetwork.metagraph_sdk.std.Checkpoint import io.constellationnetwork.metagraph_sdk.std.JsonBinaryHasher.HasherOps -import io.constellationnetwork.metagraph_sdk.syntax.all.L0ContextOps import io.constellationnetwork.security.signature.Signed import xyz.kd5ujc.buildinfo.BuildInfo import xyz.kd5ujc.metagraph_l0.webhooks.{SubscribeRequest, SubscribeResponse, SubscriberRegistry} +import xyz.kd5ujc.schema.CalculatedState import xyz.kd5ujc.schema.Updates.OttochainMessage -import xyz.kd5ujc.schema.fiber.FiberLogEntry.{EventReceipt, OracleInvocation} -import xyz.kd5ujc.schema.fiber.{AuditRenderer, FiberStatus} -import xyz.kd5ujc.schema.{CalculatedState, OnChain} +import xyz.kd5ujc.schema.fiber.FiberStatus import io.circe.Json import io.circe.syntax.EncoderOps @@ -25,11 +23,18 @@ import org.http4s.circe.CirceEntityCodec.{circeEntityDecoder, circeEntityEncoder import org.http4s.server.Router import org.http4s.{HttpRoutes, QueryParamDecoder, Response, Status} +/** + * Custom L0 routes backed by the committed cell: every calculated-state read goes through the + * [[CommittedReader]] handed out by `CommittedApp.makeL0` (one atomic cell read per request), so + * responses are always consistent with the served `/committed/...` roots and proofs. + * + * Routes that need the latest SIGNED snapshot (on-chain state / logs) live in + * [[ML0SnapshotStateRoutes]] -- makeL0's `extraRoutes` does not receive the `L0NodeContext` + * (flagged as a metakit follow-up in `ML0Service`). + */ class ML0CustomRoutes[F[_]: Async]( - checkpointService: CheckpointService[F, CalculatedState], + reader: CommittedReader[F, CalculatedState], subscriberRegistry: SubscriberRegistry[F] -)(implicit - context: L0NodeContext[F] ) extends MetagraphPublicRoutes[F] { implicit val fiberStatusDecoder: QueryParamDecoder[FiberStatus] = @@ -39,6 +44,9 @@ class ML0CustomRoutes[F[_]: Async]( object StatusQueryParam extends OptionalQueryParamDecoderMatcher[FiberStatus]("status") + private def calculatedState: F[Checkpoint[CalculatedState]] = + reader.committed.map(c => Checkpoint(c.ordinal, c.state)) + private val v1Routes: HttpRoutes[F] = HttpRoutes.of[F] { // Version endpoint for monitoring integration @@ -63,16 +71,13 @@ class ML0CustomRoutes[F[_]: Async]( } } - case GET -> Root / "onchain" => - context.getOnChainState[OnChain].toResponse - case GET -> Root / "checkpoint" => - checkpointService.get + calculatedState .map(_.asRight[DataApplicationValidationError]) .toResponse case GET -> Root / "state-machines" :? StatusQueryParam(statusOpt) => - checkpointService.get.map { case Checkpoint(_, state) => + calculatedState.map { case Checkpoint(_, state) => statusOpt .fold(state.stateMachines) { status => state.stateMachines.filter { case (_, fiber) => fiber.status == status } @@ -81,56 +86,24 @@ class ML0CustomRoutes[F[_]: Async]( }.toResponse case GET -> Root / "state-machines" / UUIDVar(fiberId) => - checkpointService.get.map { case Checkpoint(_, state) => + calculatedState.map { case Checkpoint(_, state) => state.stateMachines.get(fiberId).asRight[DataApplicationValidationError] }.toResponse case GET -> Root / "oracles" :? StatusQueryParam(statusOpt) => - checkpointService.get.map { case Checkpoint(_, state) => + calculatedState.map { case Checkpoint(_, state) => statusOpt .fold(state.scripts) { status => - state.scripts.filter { case (_, oracle) => oracle.status == status } + state.scripts.filter { case (_, script) => script.status == status } } .asRight[DataApplicationValidationError] }.toResponse - case GET -> Root / "oracles" / UUIDVar(oracleId) => - checkpointService.get.map { case Checkpoint(_, state) => - state.scripts.get(oracleId).asRight[DataApplicationValidationError] - }.toResponse - - case GET -> Root / "state-machines" / UUIDVar(fiberId) / "events" => - context - .getOnChainState[OnChain] - .map(_.map { onChain => - onChain.latestLogs - .getOrElse(fiberId, List.empty) - .collect { case r: EventReceipt => r } - }) - .toResponse - - case GET -> Root / "state-machines" / UUIDVar(fiberId) / "audit" => - checkpointService.get.flatMap { checkpoint => - context - .getOnChainState[OnChain] - .map(_.map { onChain => - AuditRenderer.renderAll( - onChain.latestLogs.getOrElse(fiberId, List.empty), - checkpoint.state.reverseNames.toMap - ) - }) + case GET -> Root / "oracles" / UUIDVar(scriptId) => + calculatedState.map { case Checkpoint(_, state) => + state.scripts.get(scriptId).asRight[DataApplicationValidationError] }.toResponse - case GET -> Root / "oracles" / UUIDVar(oracleId) / "invocations" => - context - .getOnChainState[OnChain] - .map(_.map { onChain => - onChain.latestLogs - .getOrElse(oracleId, List.empty) - .collect { case i: OracleInvocation => i } - }) - .toResponse - // ========================================================================= // Webhook Management Endpoints // ========================================================================= diff --git a/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/ML0Service.scala b/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/ML0Service.scala index ac1ea341..fd0412a9 100755 --- a/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/ML0Service.scala +++ b/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/ML0Service.scala @@ -3,22 +3,25 @@ package xyz.kd5ujc.metagraph_l0 import cats.data.{NonEmptyList, Validated} import cats.effect.Async import cats.syntax.all._ -import cats.{Applicative, Parallel} +import cats.{Monad, Parallel} -import scala.collection.immutable.SortedMap +import scala.collection.immutable.{SortedMap, SortedSet} import io.constellationnetwork.currency.dataApplication._ -import io.constellationnetwork.currency.dataApplication.dataApplication.DataApplicationValidationErrorOr +import io.constellationnetwork.currency.dataApplication.dataApplication.{ + DataApplicationBlock, + DataApplicationValidationErrorOr +} import io.constellationnetwork.currency.schema.currency.CurrencyIncrementalSnapshot -import io.constellationnetwork.metagraph_sdk.MetagraphCommonService +import io.constellationnetwork.metagraph_sdk.lifecycle.committed.{CatalogJournal, CommittedApp} import io.constellationnetwork.metagraph_sdk.lifecycle.{CheckpointService, CombinerService, ValidationService} import io.constellationnetwork.metagraph_sdk.std.Checkpoint -import io.constellationnetwork.metagraph_sdk.std.JsonBinaryHasher.HasherOps import io.constellationnetwork.metagraph_sdk.syntax.all.CurrencyIncrementalSnapshotOps -import io.constellationnetwork.schema.SnapshotOrdinal +import io.constellationnetwork.schema.artifact.TokenUnlock +import io.constellationnetwork.schema.{GlobalIncrementalSnapshot, GlobalSnapshotInfo, SnapshotOrdinal} import io.constellationnetwork.security.hash.Hash import io.constellationnetwork.security.signature.Signed -import io.constellationnetwork.security.{Hashed, SecurityProvider} +import io.constellationnetwork.security.{Hashed, Hasher, SecurityProvider} import xyz.kd5ujc.metagraph_l0.webhooks.{NotificationStats, SubscriberRegistry, WebhookDispatcher} import xyz.kd5ujc.schema.Updates.OttochainMessage @@ -37,7 +40,9 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger object ML0Service { /** - * Create the ML0 service with webhook support + * Create the ML0 service: metakit's `CommittedApp.makeL0` (two-tier state-root commitment over + * the `CommittedView[CalculatedState]` projection, on-chain breadcrumb, `/committed/...` routes) + * plus ottochain's webhook/subscriber dispatch layered on top. * * @param httpClient HTTP client for webhook delivery * @param metagraphId The metagraph token identifier (for webhook notifications) @@ -45,10 +50,19 @@ object ML0Service { def make[F[+_]: Async: Files: Parallel: SecurityProvider]( httpClient: Option[Client[F]] = None, metagraphId: String = "DAG3KNyfeKUTuWpMMhormWgWSYMD1pDGB2uaWqxG", - genesisPath: Option[String] = None + genesisPath: Option[String] = None, + // Local catalog journal (acquired as a Resource in Main and threaded in): lets a seeded committed + // cell re-hydrate from its own persisted catalog so `combine`/`advanceWork` can resolve the parent + // breadcrumb. Without it a seeded cell stays unhydrated and the metagraph cannot advance. + journal: Option[CatalogJournal[F]] = None ): F[BaseDataApplicationL0Service[F]] = for { implicit0(logger: SelfAwareStructuredLogger[F]) <- Slf4jLogger.create[F] genesisState <- GenesisLoader.load[F](genesisPath) + + // The authoritative calculated state lives in the committed cell owned by + // CommittedApp.makeL0; this checkpoint is only the notification-side cache + // (rejection ordinals, consensus stats), seeded from the loaded genesis and + // refreshed on setCalculatedState exactly like the pre-committed implementation. checkpointService <- CheckpointService.make[F, CalculatedState](genesisState.calculated) subscriberRegistry <- SubscriberRegistry.make[F] combiner <- Combiner.make[F]().pure[F] @@ -59,123 +73,243 @@ object ML0Service { WebhookDispatcher.make[F](client, subscriberRegistry, metagraphId) } - dataApplicationL0Service <- makeBaseApplicationL0Service( + committedService <- CommittedApp.makeL0[F, OttochainMessage, OnChain, CalculatedState]( genesisState, - checkpointService, - combiner, - validator, - subscriberRegistry, - webhookDispatcher - ).pure[F] - } yield dataApplicationL0Service - - private def makeBaseApplicationL0Service[F[+_]: Async]( - genesisState: DataState[OnChain, CalculatedState], - checkpointService: CheckpointService[F, CalculatedState], - combiner: CombinerService[F, OttochainMessage, OnChain, CalculatedState], - validator: ValidationService[F, OttochainMessage, OnChain, CalculatedState], - subscriberRegistry: SubscriberRegistry[F], - webhookDispatcher: Option[WebhookDispatcher[F]] - )(implicit logger: SelfAwareStructuredLogger[F]): BaseDataApplicationL0Service[F] = - BaseDataApplicationL0Service[F, OttochainMessage, OnChain, CalculatedState]( - new MetagraphCommonService[F, OttochainMessage, OnChain, CalculatedState, L0NodeContext[F]] - with DataApplicationL0Service[F, OttochainMessage, OnChain, CalculatedState] { - - override def genesis: DataState[OnChain, CalculatedState] = genesisState - - override def onSnapshotConsensusResult(snapshot: Hashed[CurrencyIncrementalSnapshot])(implicit - A: Applicative[F] - ): F[Unit] = - (for { - signedUpdates <- snapshot.signed.value.getSignedUpdates[OttochainMessage] - _ <- logger.info(s"Got ${signedUpdates.size} updates for ordinal: ${snapshot.ordinal.value}") - - // Dispatch webhooks if dispatcher is configured - _ <- webhookDispatcher match { - case Some(dispatcher) => - // Get current state for notification stats - checkpointService.get.flatMap { case Checkpoint(_, state) => - val stats = NotificationStats( - updatesProcessed = signedUpdates.size, - stateMachinesActive = state.stateMachines.count { case (_, fiber) => - fiber.status == FiberStatus.Active - }, - scriptsActive = state.scripts.count { case (_, script) => - script.status == FiberStatus.Active - } - ) - - // Fire-and-forget: start webhook dispatch but don't wait for it - Async[F].start(dispatcher.dispatch(snapshot, stats)).void - } - - case None => - Async[F].unit - } - } yield ()).handleErrorWith(logger.error(_)("Error during onSnapshotConsensusResult")) - - override def validateData( - state: DataState[OnChain, CalculatedState], - updates: NonEmptyList[Signed[OttochainMessage]] - )(implicit context: L0NodeContext[F]): F[DataApplicationValidationErrorOr[Unit]] = - for { - // Get current ordinal for rejection tracking - ordinal <- checkpointService.get.map(_.ordinal) - - // Validate each update individually to track per-update rejections - results <- updates.toList.traverse { signedUpdate => - validator.validateSignedUpdate(state, signedUpdate).map(signedUpdate -> _) - } - - // Dispatch rejections (fire-and-forget) for failed validations - _ <- webhookDispatcher match { - case Some(dispatcher) => - results.collect { case (signedUpdate, Validated.Invalid(errors)) => - Async[F].start(dispatcher.dispatchRejection(ordinal, signedUpdate, errors)).void - }.sequence_ - case None => - Async[F].unit - } - - // Return combined result (all errors accumulated) - } yield results.map(_._2).combineAll - - override def combine( - state: DataState[OnChain, CalculatedState], - updates: List[Signed[OttochainMessage]] - )(implicit context: L0NodeContext[F]): F[DataState[OnChain, CalculatedState]] = - for { - // Order updates with a TOTAL order so every node folds the identical sequence. The message-level - // ordering (creates before transitions; per-fiber by sequence number) is only PARTIAL — same-name - // registry ops and duplicate (fiber, sequence) ops tie. Break ties by the signed update's content - // digest so, on a tie, the surviving op is identical across nodes (no fork) while the loser is - // recorded as a RejectionReceipt rather than aborting the batch. - keyed <- updates.traverse(u => u.computeDigest.map(h => u -> h.value)) - // Reuse the models ordering (OttochainMessage.signedOrdering) as the PRIMARY key and only append - // the content digest as a tiebreaker — the digest needs the Hasher effect, so it can't live inside - // the pure models Ordering. This is the partial models order completed to a total one. - totalOrdering = Ordering.Tuple2(OttochainMessage.signedOrdering, Ordering.String) - sortedUpdates = keyed.sorted(totalOrdering).map(_._1) - result <- combiner.foldLeft( - state.focus(_.onChain.latestLogs).replace(SortedMap.empty), - sortedUpdates - ) - } yield result - - override def getCalculatedState(implicit - context: L0NodeContext[F] - ): F[(SnapshotOrdinal, CalculatedState)] = - checkpointService.get.map { case Checkpoint(ordinal, state) => ordinal -> state } - - override def setCalculatedState(ordinal: SnapshotOrdinal, state: CalculatedState)(implicit - context: L0NodeContext[F] - ): F[Boolean] = checkpointService.set(Checkpoint(ordinal, state)) - - override def hashCalculatedState(state: CalculatedState)(implicit context: L0NodeContext[F]): F[Hash] = - state.computeDigest - - override def routes(implicit context: L0NodeContext[F]): HttpRoutes[F] = - new ML0CustomRoutes[F](checkpointService, subscriberRegistry).public - } + orderedCombiner(combiner), + rejectionNotifyingValidator(validator, checkpointService, webhookDispatcher), + extraRoutes = Some(reader => new ML0CustomRoutes[F](reader, subscriberRegistry).public), + journal = journal ) + } yield withConsensusHooks(committedService, checkpointService, webhookDispatcher) + + /** + * Sorts each batch with the canonical `OttochainMessage` ordering (creates before transitions, + * transitions per fiber in sequence-number order) and resets the per-snapshot `latestLogs` + * before folding -- the exact behavior of the previous hand-rolled `combine` override, expressed + * as the dev combiner handed to `CommittedApp.makeL0`. + */ + private def orderedCombiner[F[_]: Monad]( + inner: CombinerService[F, OttochainMessage, OnChain, CalculatedState] + ): CombinerService[F, OttochainMessage, OnChain, CalculatedState] = + new CombinerService[F, OttochainMessage, OnChain, CalculatedState] { + + override def foldLeft( + previous: DataState[OnChain, CalculatedState], + batch: List[Signed[OttochainMessage]] + )(implicit ctx: L0NodeContext[F]): F[DataState[OnChain, CalculatedState]] = + // OttochainMessage.signedOrdering is a TOTAL order (signature tiebreak in models), so a plain sort + // makes every node fold the identical sequence -- no per-combiner digest tiebreak needed. + inner.foldLeft( + previous.focus(_.onChain.latestLogs).replace(SortedMap.empty), + batch.sorted(OttochainMessage.signedOrdering) + ) + + override def insert( + previous: DataState[OnChain, CalculatedState], + update: Signed[OttochainMessage] + )(implicit ctx: L0NodeContext[F]): F[DataState[OnChain, CalculatedState]] = + inner.insert(previous, update) + } + + /** + * Per-update validation with fire-and-forget rejection webhooks, expressed as the dev validator + * handed to `CommittedApp.makeL0` (whose `validateData` delegates here with the unwrapped + * state). Result accumulation is identical to the previous hand-rolled override. + */ + private def rejectionNotifyingValidator[F[+_]: Async: Parallel]( + inner: ValidationService[F, OttochainMessage, OnChain, CalculatedState], + checkpointService: CheckpointService[F, CalculatedState], + webhookDispatcher: Option[WebhookDispatcher[F]] + ): ValidationService[F, OttochainMessage, OnChain, CalculatedState] = + new ValidationService[F, OttochainMessage, OnChain, CalculatedState] { + + override def validateUpdate( + update: OttochainMessage + )(implicit ctx: L1NodeContext[F]): F[DataApplicationValidationErrorOr[Unit]] = + inner.validateUpdate(update) + + override def validateSignedUpdate( + current: DataState[OnChain, CalculatedState], + signedUpdate: Signed[OttochainMessage] + )(implicit context: L0NodeContext[F]): F[DataApplicationValidationErrorOr[Unit]] = + inner.validateSignedUpdate(current, signedUpdate) + + override def validateData( + current: DataState[OnChain, CalculatedState], + batch: NonEmptyList[Signed[OttochainMessage]] + )(implicit ctx: L0NodeContext[F]): F[DataApplicationValidationErrorOr[Unit]] = + for { + // Current ordinal for rejection tracking (notification metadata only) + ordinal <- checkpointService.get.map(_.ordinal) + + // Validate each update individually to track per-update rejections + results <- batch.toList.traverse { signedUpdate => + inner.validateSignedUpdate(current, signedUpdate).map(signedUpdate -> _) + } + + // Dispatch rejections (fire-and-forget) for failed validations + _ <- webhookDispatcher match { + case Some(dispatcher) => + results.collect { case (signedUpdate, Validated.Invalid(errors)) => + Async[F].start(dispatcher.dispatchRejection(ordinal, signedUpdate, errors)).void + }.sequence_ + case None => + Async[F].unit + } + + // Return combined result (all errors accumulated) + } yield results.map(_._2).combineAll + } + + /** + * Delegating wrapper adding the consensus-result webhook dispatch and the snapshot-backed + * custom routes on top of the service assembled by `CommittedApp.makeL0`. + * + * FLAGGED metakit follow-up: `CommittedApp.makeL0` currently exposes no + * `onSnapshotConsensusResult` hook and its `extraRoutes` function receives only the + * `CommittedReader` (not the `L0NodeContext`), so (a) webhook dispatch and (b) routes that read + * the latest signed snapshot have to be layered here. Once makeL0 grows an + * `onConsensusResult` argument and context-aware `extraRoutes`, this wrapper disappears. + */ + private def withConsensusHooks[F[+_]: Async]( + underlying: BaseDataApplicationL0Service[F], + checkpointService: CheckpointService[F, CalculatedState], + webhookDispatcher: Option[WebhookDispatcher[F]] + )(implicit logger: SelfAwareStructuredLogger[F]): BaseDataApplicationL0Service[F] = + new BaseDataApplicationL0Service[F] { + + // getSignedUpdates decodes the snapshot's blocks via the service's own update codecs + implicit private val dataUpdateEncoder: io.circe.Encoder[DataUpdate] = underlying.dataEncoder + implicit private val dataUpdateDecoder: io.circe.Decoder[DataUpdate] = underlying.dataDecoder + + override def genesis: DataState.Base = underlying.genesis + + override def onSnapshotConsensusResult(snapshot: Hashed[CurrencyIncrementalSnapshot]): F[Unit] = + (for { + signedUpdates <- snapshot.signed.value.getSignedUpdates[OttochainMessage] + _ <- logger.info(s"Got ${signedUpdates.size} updates for ordinal: ${snapshot.ordinal.value}") + + // Dispatch webhooks if dispatcher is configured + _ <- webhookDispatcher match { + case Some(dispatcher) => + // Get current state for notification stats + checkpointService.get.flatMap { case Checkpoint(_, state) => + val stats = NotificationStats( + updatesProcessed = signedUpdates.size, + stateMachinesActive = state.stateMachines.count { case (_, fiber) => + fiber.status == FiberStatus.Active + }, + scriptsActive = state.scripts.count { case (_, script) => + script.status == FiberStatus.Active + } + ) + + // Fire-and-forget: start webhook dispatch but don't wait for it + Async[F].start(dispatcher.dispatch(snapshot, stats)).void + } + + case None => + Async[F].unit + } + } yield ()).handleErrorWith(logger.error(_)("Error during onSnapshotConsensusResult")) >> + underlying.onSnapshotConsensusResult(snapshot) + + override def setCalculatedState(ordinal: SnapshotOrdinal, state: DataCalculatedState)(implicit + context: L0NodeContext[F] + ): F[Boolean] = + (state match { + case cs: CalculatedState => checkpointService.set(Checkpoint(ordinal, cs)).void + case _ => Async[F].unit + }) >> underlying.setCalculatedState(ordinal, state) + + override def routes(implicit context: L0NodeContext[F]): HttpRoutes[F] = + underlying.routes <+> new ML0SnapshotStateRoutes[F].public + + // ---- everything below is pure delegation ---- + + override def onGlobalSnapshotPull( + snapshot: Hashed[GlobalIncrementalSnapshot], + context: GlobalSnapshotInfo + ): F[Unit] = + underlying.onGlobalSnapshotPull(snapshot, context) + + override def validateData(state: DataState.Base, updates: NonEmptyList[Signed[DataUpdate]])(implicit + context: L0NodeContext[F] + ): F[DataApplicationValidationErrorOr[Unit]] = + underlying.validateData(state, updates) + + override def combine(state: DataState.Base, updates: List[Signed[DataUpdate]])(implicit + context: L0NodeContext[F] + ): F[DataState.Base] = + underlying.combine(state, updates) + + override def getCalculatedState(implicit context: L0NodeContext[F]): F[(SnapshotOrdinal, DataCalculatedState)] = + underlying.getCalculatedState + + override def hashCalculatedState(state: DataCalculatedState)(implicit context: L0NodeContext[F]): F[Hash] = + underlying.hashCalculatedState(state) + + override def getTokenUnlocks(state: DataState[DataOnChainState, DataCalculatedState])(implicit + context: L0NodeContext[F], + async: Async[F], + hasher: Hasher[F] + ): F[SortedSet[TokenUnlock]] = + underlying.getTokenUnlocks(state)(context, async, hasher) + + override def serializeState(state: DataOnChainState): F[Array[Byte]] = underlying.serializeState(state) + + override def deserializeState(bytes: Array[Byte]): F[Either[Throwable, DataOnChainState]] = + underlying.deserializeState(bytes) + override def serializeUpdate(update: DataUpdate): F[Array[Byte]] = underlying.serializeUpdate(update) + + override def deserializeUpdate(bytes: Array[Byte]): F[Either[Throwable, DataUpdate]] = + underlying.deserializeUpdate(bytes) + + override def serializeBlock(block: Signed[DataApplicationBlock]): F[Array[Byte]] = + underlying.serializeBlock(block) + + override def deserializeBlock(bytes: Array[Byte]): F[Either[Throwable, Signed[DataApplicationBlock]]] = + underlying.deserializeBlock(bytes) + + override def serializeCalculatedState(state: DataCalculatedState): F[Array[Byte]] = + underlying.serializeCalculatedState(state) + + override def deserializeCalculatedState(bytes: Array[Byte]): F[Either[Throwable, DataCalculatedState]] = + underlying.deserializeCalculatedState(bytes) + + override def dataEncoder: io.circe.Encoder[DataUpdate] = underlying.dataEncoder + override def dataDecoder: io.circe.Decoder[DataUpdate] = underlying.dataDecoder + + override def signedDataEntityEncoder: org.http4s.EntityEncoder[F, Signed[DataUpdate]] = + underlying.signedDataEntityEncoder + + override def signedDataEntityDecoder: org.http4s.EntityDecoder[F, Signed[DataUpdate]] = + underlying.signedDataEntityDecoder + + override def calculatedStateEncoder: io.circe.Encoder[DataCalculatedState] = underlying.calculatedStateEncoder + override def calculatedStateDecoder: io.circe.Decoder[DataCalculatedState] = underlying.calculatedStateDecoder + + override def hashDataUpdate: Option[DataUpdate => F[Hash]] = underlying.hashDataUpdate + + override def validateFee( + gsOrdinal: SnapshotOrdinal + )(dataUpdate: Signed[DataUpdate], maybeFeeTransaction: Option[Signed[FeeTransaction]])(implicit + context: L0NodeContext[F], + A: cats.Applicative[F] + ): F[DataApplicationValidationErrorOr[Unit]] = + underlying.validateFee(gsOrdinal)(dataUpdate, maybeFeeTransaction) + + override def extractFees(ds: Seq[Signed[DataUpdate]])(implicit + context: L0NodeContext[F], + A: cats.Applicative[F] + ): F[Seq[Signed[FeeTransaction]]] = + underlying.extractFees(ds) + + override def extractFees(ds: Seq[Signed[DataUpdate]])(implicit + A: cats.Applicative[F] + ): F[Seq[Signed[FeeTransaction]]] = + underlying.extractFees(ds)(A) + + override def routesPrefix: io.constellationnetwork.routes.internal.ExternalUrlPrefix = underlying.routesPrefix + } } diff --git a/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/ML0SnapshotStateRoutes.scala b/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/ML0SnapshotStateRoutes.scala new file mode 100644 index 00000000..2edaade0 --- /dev/null +++ b/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/ML0SnapshotStateRoutes.scala @@ -0,0 +1,61 @@ +package xyz.kd5ujc.metagraph_l0 + +import cats.effect.Async +import cats.syntax.all._ + +import io.constellationnetwork.currency.dataApplication.L0NodeContext +import io.constellationnetwork.metagraph_sdk.MetagraphPublicRoutes +import io.constellationnetwork.metagraph_sdk.lifecycle.committed.CommittedOnChain +import io.constellationnetwork.metagraph_sdk.syntax.all.L0ContextOps + +import xyz.kd5ujc.schema.OnChain +import xyz.kd5ujc.schema.fiber.FiberLogEntry.{EventReceipt, OracleInvocation} + +import io.circe.{Decoder, Encoder} +import org.http4s.HttpRoutes +import org.http4s.server.Router + +/** + * The L0 routes that read the latest SIGNED snapshot's on-chain state from the + * [[L0NodeContext]] (vs the committed-cell-backed reads in [[ML0CustomRoutes]]). + * + * The registered on-chain type is `CommittedOnChain[OnChain]` (the breadcrumb wrapper added by + * `CommittedApp.makeL0`), so the snapshot bytes are decoded as the wrapper and unwrapped here -- + * the response shapes are unchanged from the pre-committed implementation. The breadcrumb itself + * is served by `GET /committed/root`. + */ +class ML0SnapshotStateRoutes[F[_]: Async](implicit context: L0NodeContext[F]) extends MetagraphPublicRoutes[F] { + + implicit private val onChainEncoder: Encoder[CommittedOnChain[OnChain]] = CommittedOnChain.encoder[OnChain] + implicit private val onChainDecoder: Decoder[CommittedOnChain[OnChain]] = CommittedOnChain.decoder[OnChain] + + private def latestOnChain = context.getOnChainState[CommittedOnChain[OnChain]].map(_.map(_.inner)) + + private val v1Routes: HttpRoutes[F] = HttpRoutes.of[F] { + + case GET -> Root / "onchain" => + latestOnChain.toResponse + + case GET -> Root / "state-machines" / UUIDVar(fiberId) / "events" => + latestOnChain + .map(_.map { onChain => + onChain.latestLogs + .getOrElse(fiberId, List.empty) + .collect { case r: EventReceipt => r } + }) + .toResponse + + case GET -> Root / "oracles" / UUIDVar(scriptId) / "invocations" => + latestOnChain + .map(_.map { onChain => + onChain.latestLogs + .getOrElse(scriptId, List.empty) + .collect { case i: OracleInvocation => i } + }) + .toResponse + } + + protected val routes: HttpRoutes[F] = Router( + "/v1" -> v1Routes + ) +} diff --git a/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/Main.scala b/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/Main.scala index 2f7d11ee..16209e2e 100755 --- a/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/Main.scala +++ b/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/Main.scala @@ -1,14 +1,15 @@ package xyz.kd5ujc.metagraph_l0 +import java.nio.file.Paths import java.util.UUID -import cats.effect.std.Supervisor import cats.effect.{IO, Resource} import cats.syntax.all._ import io.constellationnetwork.currency.dataApplication._ import io.constellationnetwork.currency.l0.CurrencyL0App import io.constellationnetwork.ext.cats.effect.ResourceIO +import io.constellationnetwork.metagraph_sdk.lifecycle.committed.CatalogJournal import io.constellationnetwork.schema.cluster.ClusterId import io.constellationnetwork.schema.semver.{MetagraphVersion, TessellationVersion} import io.constellationnetwork.security.SecurityProvider @@ -34,7 +35,6 @@ object Main override def dataApplication: Option[Resource[IO, BaseDataApplicationL0Service[IO]]] = (for { config <- ApplicationConfigOps.readDefault[IO, ML0AppConfig].asResource implicit0(logger: SelfAwareStructuredLogger[IO]) <- Slf4jLogger.create[IO].asResource - implicit0(supervisor: Supervisor[IO]) <- Supervisor[IO] implicit0(sp: SecurityProvider[IO]) <- SecurityProvider.forAsync[IO] _ <- loadKeyPair[IO](config).asResource @@ -43,11 +43,17 @@ object Main EmberClientBuilder.default[IO].build.map(Some(_)) } + // Node-local, write-through journal of the committed catalog (level-1 sealed roots + hot epoch), + // persisted under the working dir. A seeded/restarted committed cell hydrates from this so + // `combine`/`advanceWork` can resolve the parent breadcrumb instead of stalling unhydrated. + journal <- CatalogJournal.levelDb[IO](Paths.get("committed-catalog")) + l0Service <- ML0Service .make[IO]( httpClient = httpClient, metagraphId = config.webhook.metagraphId.getOrElse("DAG3KNyfeKUTuWpMMhormWgWSYMD1pDGB2uaWqxG"), - genesisPath = config.genesis.path + genesisPath = config.genesis.path, + journal = Some(journal) ) .asResource } yield l0Service).some diff --git a/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/app/ML0AppConfigOps.scala b/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/app/ML0AppConfigOps.scala index ad392c91..13e4a091 100644 --- a/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/app/ML0AppConfigOps.scala +++ b/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/app/ML0AppConfigOps.scala @@ -6,6 +6,7 @@ import pureconfig.ConfigReader import pureconfig.generic.semiauto.deriveReader object ML0AppConfigOps { + implicit val webhookConfigReader: ConfigReader[ML0AppConfig.WebhookConfig] = deriveReader implicit val genesisConfigReader: ConfigReader[ML0AppConfig.GenesisConfig] = deriveReader implicit val applicationConfigReader: ConfigReader[ML0AppConfig] = deriveReader diff --git a/modules/l0/src/test/scala/xyz/kd5ujc/metagraph_l0/CommittedAdapterSuite.scala b/modules/l0/src/test/scala/xyz/kd5ujc/metagraph_l0/CommittedAdapterSuite.scala new file mode 100644 index 00000000..ba0f49d6 --- /dev/null +++ b/modules/l0/src/test/scala/xyz/kd5ujc/metagraph_l0/CommittedAdapterSuite.scala @@ -0,0 +1,211 @@ +package xyz.kd5ujc.metagraph_l0 + +import cats.effect.IO +import cats.effect.std.UUIDGen +import cats.syntax.all._ + +import scala.collection.immutable.SortedMap + +import io.constellationnetwork.currency.dataApplication.{DataOnChainState, DataState, L0NodeContext} +import io.constellationnetwork.metagraph_sdk.json_logic._ +import io.constellationnetwork.metagraph_sdk.lifecycle.committed.{ + CommitKey, + CommittedBreadcrumb, + CommittedCommitment, + CommittedOnChain, + CommittedView +} +import io.constellationnetwork.security.SecurityProvider +import io.constellationnetwork.security.signature.Signed + +import xyz.kd5ujc.schema.fiber._ +import xyz.kd5ujc.schema.{CalculatedState, OnChain, Updates} +import xyz.kd5ujc.shared_data.lifecycle.Combiner +import xyz.kd5ujc.shared_test.Participant._ +import xyz.kd5ujc.shared_test.TestFixture + +import io.circe.Json +import io.circe.parser.parse +import org.http4s.circe.CirceEntityCodec.circeEntityDecoder +import org.http4s.implicits._ +import org.http4s.{Method, Request, Status} +import weaver.SimpleIOSuite + +/** + * The CommittedState adapter: + * + * - `CommittedView[CalculatedState]` projects fibers under `fiber/` and scripts under + * `script/`, deterministically, with the delta override agreeing with the default + * structural diff; + * - a fiber transition yields a minimal delta whose trie application reproduces the full + * rebuild (the invariant `CommittedState.setCommitted` asserts on every transition); + * - the assembled L0 service emits the on-chain breadcrumb from `combine`; + * - the `/committed/...` and committed-cell-backed custom routes respond. + */ +object CommittedAdapterSuite extends SimpleIOSuite { + + private val view: CommittedView[CalculatedState] = CommittedView[CalculatedState] + + private def simpleMachine: StateMachineDefinition = { + val draft = StateId("draft") + val active = StateId("active") + StateMachineDefinition( + states = Map( + draft -> State(draft, isFinal = false), + active -> State(active, isFinal = true) + ), + initialState = draft, + transitions = List( + Transition( + from = draft, + to = active, + eventName = "activate", + guard = ConstExpression(BoolValue(true)), + effect = ConstExpression(MapValue(Map("activated" -> BoolValue(true)))) + ) + ) + ) + } + + private def breadcrumbOf(onChain: DataOnChainState): Option[CommittedBreadcrumb] = + onChain match { + case c: CommittedOnChain[_] => c.breadcrumb.some + case _ => none + } + + test("projection: fibers -> fiber/, scripts -> script/, deterministic enumeration") { + TestFixture.resource().use { fixture => + implicit val s: SecurityProvider[IO] = fixture.securityProvider + implicit val l0ctx: L0NodeContext[IO] = fixture.l0Context + for { + combiner <- Combiner.make[IO]().pure[IO] + + fiberA <- UUIDGen.randomUUID[IO] + fiberB <- UUIDGen.randomUUID[IO] + scriptId <- UUIDGen.randomUUID[IO] + + createA = Updates.CreateStateMachine(fiberA, simpleMachine, MapValue(Map.empty)) + createB = Updates.CreateStateMachine(fiberB, simpleMachine, MapValue(Map.empty)) + scriptProgram <- IO.fromEither(parse("""{"result": "success"}""").flatMap(_.as[JsonLogicExpression])) + createScript = Updates.CreateScript(scriptId, scriptProgram, None, AccessControlPolicy.Public) + + proofA <- fixture.registry.generateProofs(createA, Set(Alice, Bob)) + proofB <- fixture.registry.generateProofs(createB, Set(Alice, Bob)) + proofS <- fixture.registry.generateProofs(createScript, Set(Alice)) + + state0 = DataState(OnChain.genesis, CalculatedState.genesis) + state1 <- combiner.insert(state0, Signed(createA, proofA)) + state2 <- combiner.insert(state1, Signed(createB, proofB)) + state3 <- combiner.insert(state2, Signed(createScript, proofS)) + + entries = view.entries(state3.calculated) + + // The same records assembled in a different in-memory order project identically + reassembled = CalculatedState( + SortedMap.from(state3.calculated.stateMachines.toList.reverse), + SortedMap.from(state3.calculated.scripts.toList.reverse) + ) + } yield expect.same( + entries.keySet.map(_.value), + Set(s"fiber/$fiberA", s"fiber/$fiberB", s"script/$scriptId") + ) and + expect(entries.keys.forall(k => CommitKey.from(k.value).isRight)) and + expect(entries.keys.map(_.namespace).toSet == Set("fiber", "script")) and + expect.same(view.entries(reassembled).toList, entries.toList) and + expect(view.delta(state3.calculated, reassembled).isEmpty) + } + } + + test("fiber transition: minimal delta, delta-applied trie == rebuilt trie, MPT root changes") { + TestFixture.resource().use { fixture => + implicit val s: SecurityProvider[IO] = fixture.securityProvider + implicit val l0ctx: L0NodeContext[IO] = fixture.l0Context + for { + combiner <- Combiner.make[IO]().pure[IO] + fiberId <- UUIDGen.randomUUID[IO] + + create = Updates.CreateStateMachine(fiberId, simpleMachine, MapValue(Map.empty)) + createProof <- fixture.registry.generateProofs(create, Set(Alice, Bob)) + created <- combiner.insert(DataState(OnChain.genesis, CalculatedState.genesis), Signed(create, createProof)) + + transition = Updates.TransitionStateMachine(fiberId, "activate", MapValue(Map.empty), FiberOrdinal.MinValue) + transitionProof <- fixture.registry.generateProofs(transition, Set(Alice, Bob)) + transitioned <- combiner.insert(created, Signed(transition, transitionProof)) + + prevEntries = view.entries(created.calculated) + nextEntries = view.entries(transitioned.calculated) + delta = view.delta(created.calculated, transitioned.calculated) + + // The overridden delta must agree with the default structural diff over entries + expectedUpserts = nextEntries.filter { case (k, v) => !prevEntries.get(k).contains(v) } + expectedRemoves = prevEntries.keySet.diff(nextEntries.keySet) + + prevTrie <- CommittedCommitment.buildTrie[IO](prevEntries) + nextTrie <- CommittedCommitment.buildTrie[IO](nextEntries) + appliedTrie <- CommittedCommitment.applyDelta[IO](prevTrie, delta) + } yield expect( + transitioned.calculated.stateMachines.get(fiberId).map(_.currentState).contains(StateId("active")) + ) and + expect.same(delta.upserts.keySet.map(_.value), Set(s"fiber/$fiberId")) and + expect(delta.removes.isEmpty) and + expect.same(delta.upserts.toList, expectedUpserts.toList) and + expect(delta.removes == expectedRemoves) and + expect(prevTrie.rootNode.digest != nextTrie.rootNode.digest) and + expect(appliedTrie.rootNode.digest == nextTrie.rootNode.digest) + } + } + + test("service combine: on-chain breadcrumb advances ordinal and commits the new roots") { + TestFixture.resource().use { fixture => + implicit val s: SecurityProvider[IO] = fixture.securityProvider + implicit val l0ctx: L0NodeContext[IO] = fixture.l0Context + for { + service <- ML0Service.make[IO]() + fiberId <- UUIDGen.randomUUID[IO] + + genesisBc = breadcrumbOf(service.genesis.onChain) + + create = Updates.CreateStateMachine(fiberId, simpleMachine, MapValue(Map.empty)) + proof <- fixture.registry.generateProofs(create, Set(Alice, Bob)) + + combined <- service.combine(service.genesis, List(Signed(create, proof))) + combinedBc = breadcrumbOf(combined.onChain) + + // A second (empty) batch chains off the first breadcrumb: state unchanged -> same + // mptRoot, but the catalog accrues the previous root -> new catalogRoot. + combinedAgain <- service.combine(combined, List.empty) + secondBc = breadcrumbOf(combinedAgain.onChain) + } yield expect(genesisBc.map(_.ordinal.value.value).contains(0L)) and + expect(combinedBc.map(_.ordinal.value.value).contains(1L)) and + expect(secondBc.map(_.ordinal.value.value).contains(2L)) and + expect((genesisBc, combinedBc).mapN(_.roots.mptRoot != _.roots.mptRoot).contains(true)) and + expect((genesisBc, combinedBc).mapN(_.roots.catalogRoot != _.roots.catalogRoot).contains(true)) and + expect((combinedBc, secondBc).mapN(_.roots.mptRoot == _.roots.mptRoot).contains(true)) and + expect((combinedBc, secondBc).mapN(_.roots.catalogRoot != _.roots.catalogRoot).contains(true)) + } + } + + test("routes: /committed/root and the committed-cell-backed custom routes respond") { + TestFixture.resource().use { fixture => + implicit val s: SecurityProvider[IO] = fixture.securityProvider + implicit val l0ctx: L0NodeContext[IO] = fixture.l0Context + for { + service <- ML0Service.make[IO]() + app = service.routes.orNotFound + + rootResp <- app.run(Request[IO](Method.GET, uri"/committed/root")) + rootJson <- rootResp.as[Json] + cursor = rootJson.hcursor + + checkpointResp <- app.run(Request[IO](Method.GET, uri"/v1/checkpoint")) + stateMachinesResp <- app.run(Request[IO](Method.GET, uri"/v1/state-machines")) + } yield expect(rootResp.status == Status.Ok) and + expect(cursor.get[Long]("ordinal") == Right(0L)) and + expect(cursor.get[Boolean]("hydrated") == Right(true)) and + expect(cursor.get[String]("mptRoot").isRight) and + expect(cursor.get[String]("calculatedStateHash").isRight) and + expect(checkpointResp.status == Status.Ok) and + expect(stateMachinesResp.status == Status.Ok) + } + } +} diff --git a/modules/models/src/main/scala/xyz/kd5ujc/schema/CalculatedState.scala b/modules/models/src/main/scala/xyz/kd5ujc/schema/CalculatedState.scala index b634e11d..3873df21 100755 --- a/modules/models/src/main/scala/xyz/kd5ujc/schema/CalculatedState.scala +++ b/modules/models/src/main/scala/xyz/kd5ujc/schema/CalculatedState.scala @@ -2,15 +2,18 @@ package xyz.kd5ujc.schema import java.util.UUID -import scala.collection.immutable.SortedMap +import scala.collection.immutable.{SortedMap, SortedSet} import io.constellationnetwork.currency.dataApplication.DataCalculatedState +import io.constellationnetwork.metagraph_sdk.lifecycle.committed.{CommitDelta, CommitKey, CommittedView} import xyz.kd5ujc.schema.CodecConfiguration._ import xyz.kd5ujc.schema.registry.{RegistryEntry, RegistryName} import derevo.circe.magnolia.{customizableDecoder, customizableEncoder} import derevo.derive +import io.circe.Json +import io.circe.syntax.EncoderOps @derive(customizableEncoder, customizableDecoder) case class CalculatedState( @@ -25,4 +28,57 @@ object CalculatedState { val genesis: CalculatedState = CalculatedState(SortedMap.empty, SortedMap.empty, SortedMap.empty, SortedMap.empty) + + /** + * Projection into the committed state dictionary (metakit `lifecycle/committed`). + * + * Namespaces (registered in metakit's `docs/committed-namespaces.md`): + * - `fiber/` -- state-machine fiber records (`UUID.toString` is lowercase hyphenated, + * which is valid `CommitKey` segment grammar) + * - `script/` -- script records + * - `registry/` + `reverse/` -- the registry + reverse-name maps ARE now part of + * `CalculatedState` (versionable-contracts merge) but are NOT YET projected + * into the committed root. FOLLOW-UP: extend this view to cover them (needs a + * RegistryName-keyed projection + a check that CommitKey grammar permits the + * dotted `labels.tld` form) so registry state is included in the commitment. + * + * Values are the records' canonical circe projections; the metakit committed layer + * canonicalizes (RFC 8785) and hashes them when building the MPT, so byte-determinism does not + * depend on field ordering here. `SortedMap` (required by [[CommittedView]]) makes the + * enumeration order canonical for deltas/snapshots. + */ + implicit val committedView: CommittedView[CalculatedState] = new CommittedView[CalculatedState] { + + private def fiberKey(id: UUID): CommitKey = CommitKey.unsafe(s"fiber/$id") + private def scriptKey(id: UUID): CommitKey = CommitKey.unsafe(s"script/$id") + + def entries(s: CalculatedState): SortedMap[CommitKey, Json] = + SortedMap.from( + s.stateMachines.iterator.map { case (id, r) => fiberKey(id) -> r.asJson } ++ + s.scripts.iterator.map { case (id, r) => scriptKey(id) -> r.asJson } + ) + + /** + * Delta fast-path: diff the record maps by case-class equality first and only serialize the + * records that actually changed -- equal records always project to equal Json, so this is + * exactly the default `entries`-level structural diff, minus the redundant encoding work. + */ + override def delta(prev: CalculatedState, next: CalculatedState): CommitDelta = { + def changed[R](p: SortedMap[UUID, R], n: SortedMap[UUID, R], key: UUID => CommitKey)( + encode: R => Json + ): (Iterator[(CommitKey, Json)], Iterator[CommitKey]) = + ( + n.iterator.collect { case (id, r) if !p.get(id).contains(r) => key(id) -> encode(r) }, + p.keysIterator.filterNot(n.contains).map(key) + ) + + val (fiberUpserts, fiberRemoves) = changed(prev.stateMachines, next.stateMachines, fiberKey)(_.asJson) + val (scriptUpserts, scriptRemoves) = changed(prev.scripts, next.scripts, scriptKey)(_.asJson) + + CommitDelta( + SortedMap.from(fiberUpserts ++ scriptUpserts), + SortedSet.from(fiberRemoves ++ scriptRemoves) + ) + } + } } diff --git a/modules/models/src/main/scala/xyz/kd5ujc/schema/Updates.scala b/modules/models/src/main/scala/xyz/kd5ujc/schema/Updates.scala index 3879184e..b078b480 100755 --- a/modules/models/src/main/scala/xyz/kd5ujc/schema/Updates.scala +++ b/modules/models/src/main/scala/xyz/kd5ujc/schema/Updates.scala @@ -202,10 +202,17 @@ object Updates { } /** - * Ordering for Signed[OttochainMessage] - delegates to message ordering. + * TOTAL ordering for Signed[OttochainMessage]. The message-level `ordering` above is the PRIMARY key + * (creates before transitions; per-fiber by sequence number) but is only PARTIAL -- same-name registry + * ops and duplicate (fiberId, sequence) ops produce equal message keys. Complete it to a TOTAL order by + * tiebreaking on the signature(s): they are part of the signed update (identical on every node), are made + * over the canonical message bytes, and therefore distinguish any two distinct updates. This is pure (no + * Hasher needed), so the combiner can simply `batch.sorted(signedOrdering)` and every node folds the + * identical sequence -- the surviving op on a tie is deterministic; the loser is recorded as a + * RejectionReceipt rather than aborting the batch. */ implicit val signedOrdering: Ordering[Signed[OttochainMessage]] = - Ordering.by(_.value) + Ordering.by(s => (s.value, s.proofs.toNonEmptyList.toList.map(_.signature.value.value).mkString(","))) implicit val messageEncoder: Encoder[OttochainMessage] = { case u: Updates.CreateStateMachine => Json.obj(u.messageName -> u.asJson)