From d197065bb04255a6cdd4f9ba300d62a78fb1b5a2 Mon Sep 17 00:00:00 2001 From: OttoBot Date: Wed, 10 Jun 2026 20:06:42 -0500 Subject: [PATCH 1/6] feat(models): add committed-view projection of calculated state MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Project the calculated state into metakit's committed state dictionary (lifecycle/committed): state-machine fibers under fiber/, scripts under script/ (registry/ is reserved but not yet emitted). Values are the records' canonical circe projections; SortedMap keeps the enumeration order canonical for deltas and snapshots. The delta override diffs the record maps by case-class equality first and only serializes records that actually changed — structurally identical to the default entries-level diff, minus the redundant encoding work. Co-Authored-By: Claude Opus 4.8 --- .../xyz/kd5ujc/schema/CalculatedState.scala | 55 ++++++++++++++++++- 1 file changed, 54 insertions(+), 1 deletion(-) diff --git a/modules/models/src/main/scala/xyz/kd5ujc/schema/CalculatedState.scala b/modules/models/src/main/scala/xyz/kd5ujc/schema/CalculatedState.scala index 7bcdd500..aca60c80 100755 --- a/modules/models/src/main/scala/xyz/kd5ujc/schema/CalculatedState.scala +++ b/modules/models/src/main/scala/xyz/kd5ujc/schema/CalculatedState.scala @@ -2,14 +2,17 @@ package xyz.kd5ujc.schema import java.util.UUID -import scala.collection.immutable.SortedMap +import scala.collection.immutable.{SortedMap, SortedSet} import io.constellationnetwork.currency.dataApplication.DataCalculatedState +import io.constellationnetwork.metagraph_sdk.lifecycle.committed.{CommitDelta, CommitKey, CommittedView} import xyz.kd5ujc.schema.CodecConfiguration._ import derevo.circe.magnolia.{customizableDecoder, customizableEncoder} import derevo.derive +import io.circe.Json +import io.circe.syntax.EncoderOps @derive(customizableEncoder, customizableDecoder) case class CalculatedState( @@ -19,4 +22,54 @@ case class CalculatedState( object CalculatedState { val genesis: CalculatedState = CalculatedState(SortedMap.empty, SortedMap.empty) + + /** + * Projection into the committed state dictionary (metakit `lifecycle/committed`). + * + * Namespaces (registered in metakit's `docs/committed-namespaces.md`): + * - `fiber/` -- state-machine fiber records (`UUID.toString` is lowercase hyphenated, + * which is valid `CommitKey` segment grammar) + * - `script/` -- script records + * - `registry/` -- reserved by ottochain for the registry domain; not yet part of + * `CalculatedState`, so no entries are emitted under it today + * + * Values are the records' canonical circe projections; the metakit committed layer + * canonicalizes (RFC 8785) and hashes them when building the MPT, so byte-determinism does not + * depend on field ordering here. `SortedMap` (required by [[CommittedView]]) makes the + * enumeration order canonical for deltas/snapshots. + */ + implicit val committedView: CommittedView[CalculatedState] = new CommittedView[CalculatedState] { + + private def fiberKey(id: UUID): CommitKey = CommitKey.unsafe(s"fiber/$id") + private def scriptKey(id: UUID): CommitKey = CommitKey.unsafe(s"script/$id") + + def entries(s: CalculatedState): SortedMap[CommitKey, Json] = + SortedMap.from( + s.stateMachines.iterator.map { case (id, r) => fiberKey(id) -> r.asJson } ++ + s.scripts.iterator.map { case (id, r) => scriptKey(id) -> r.asJson } + ) + + /** + * Delta fast-path: diff the record maps by case-class equality first and only serialize the + * records that actually changed -- equal records always project to equal Json, so this is + * exactly the default `entries`-level structural diff, minus the redundant encoding work. + */ + override def delta(prev: CalculatedState, next: CalculatedState): CommitDelta = { + def changed[R](p: SortedMap[UUID, R], n: SortedMap[UUID, R], key: UUID => CommitKey)( + encode: R => Json + ): (Iterator[(CommitKey, Json)], Iterator[CommitKey]) = + ( + n.iterator.collect { case (id, r) if !p.get(id).contains(r) => key(id) -> encode(r) }, + p.keysIterator.filterNot(n.contains).map(key) + ) + + val (fiberUpserts, fiberRemoves) = changed(prev.stateMachines, next.stateMachines, fiberKey)(_.asJson) + val (scriptUpserts, scriptRemoves) = changed(prev.scripts, next.scripts, scriptKey)(_.asJson) + + CommitDelta( + SortedMap.from(fiberUpserts ++ scriptUpserts), + SortedSet.from(fiberRemoves ++ scriptRemoves) + ) + } + } } From 3f15e4c0f1d2b7216e9466a2266fa96a6921c1f4 Mon Sep 17 00:00:00 2001 From: OttoBot Date: Wed, 10 Jun 2026 20:07:11 -0500 Subject: [PATCH 2/6] feat(l0): assemble the ML0 service via CommittedApp.makeL0 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The L0 data application is now built by metakit's CommittedApp.makeL0: two-tier state-root commitment over CommittedView[CalculatedState], the constant on-chain breadcrumb (ordinal, mptRoot, catalogRoot) wrapped around OnChain, and the /committed/... proof/replication routes — all correct by construction. Ottochain behavior is preserved by composition: - orderedCombiner: canonical OttochainMessage batch ordering + per-batch latestLogs reset, as the dev combiner handed to makeL0; - rejectionNotifyingValidator: per-update validation with fire-and-forget rejection webhooks, result accumulation unchanged; - withConsensusHooks: delegating wrapper that keeps the consensus-result webhook dispatch (and the notification-side checkpoint cache refresh on setCalculatedState) and appends the snapshot-backed routes; - ML0CustomRoutes now reads through the CommittedReader handed out by makeL0 (one atomic cell read per request, consistent with the served /committed roots); routes that need the latest SIGNED snapshot (/v1/onchain, fiber events, script invocations) moved to ML0SnapshotStateRoutes, which unwraps CommittedOnChain[OnChain] so response shapes are unchanged. FLAGGED metakit follow-up: makeL0 exposes no onSnapshotConsensusResult hook and extraRoutes does not receive the L0NodeContext, hence the wrapper; it disappears once makeL0 grows those parameters. Co-Authored-By: Claude Opus 4.8 --- .../kd5ujc/metagraph_l0/ML0CustomRoutes.scala | 63 ++- .../xyz/kd5ujc/metagraph_l0/ML0Service.scala | 371 ++++++++++++------ .../metagraph_l0/ML0SnapshotStateRoutes.scala | 61 +++ 3 files changed, 338 insertions(+), 157 deletions(-) create mode 100644 modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/ML0SnapshotStateRoutes.scala diff --git a/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/ML0CustomRoutes.scala b/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/ML0CustomRoutes.scala index 98a4908a..f7208f21 100755 --- a/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/ML0CustomRoutes.scala +++ b/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/ML0CustomRoutes.scala @@ -3,21 +3,19 @@ package xyz.kd5ujc.metagraph_l0 import cats.effect.Async import cats.syntax.all._ -import io.constellationnetwork.currency.dataApplication.{DataApplicationValidationError, L0NodeContext} +import io.constellationnetwork.currency.dataApplication.DataApplicationValidationError import io.constellationnetwork.ext.http4s.error.RefinedRequestApplicationDecoder import io.constellationnetwork.metagraph_sdk.MetagraphPublicRoutes -import io.constellationnetwork.metagraph_sdk.lifecycle.CheckpointService +import io.constellationnetwork.metagraph_sdk.lifecycle.committed.CommittedReader import io.constellationnetwork.metagraph_sdk.std.Checkpoint import io.constellationnetwork.metagraph_sdk.std.JsonBinaryHasher.HasherOps -import io.constellationnetwork.metagraph_sdk.syntax.all.L0ContextOps import io.constellationnetwork.security.signature.Signed import xyz.kd5ujc.buildinfo.BuildInfo import xyz.kd5ujc.metagraph_l0.webhooks.{SubscribeRequest, SubscribeResponse, SubscriberRegistry} +import xyz.kd5ujc.schema.CalculatedState import xyz.kd5ujc.schema.Updates.OttochainMessage -import xyz.kd5ujc.schema.fiber.FiberLogEntry.{EventReceipt, OracleInvocation} import xyz.kd5ujc.schema.fiber.FiberStatus -import xyz.kd5ujc.schema.{CalculatedState, OnChain} import io.circe.Json import io.circe.syntax.EncoderOps @@ -25,11 +23,18 @@ import org.http4s.circe.CirceEntityCodec.{circeEntityDecoder, circeEntityEncoder import org.http4s.server.Router import org.http4s.{HttpRoutes, QueryParamDecoder, Response, Status} +/** + * Custom L0 routes backed by the committed cell: every calculated-state read goes through the + * [[CommittedReader]] handed out by `CommittedApp.makeL0` (one atomic cell read per request), so + * responses are always consistent with the served `/committed/...` roots and proofs. + * + * Routes that need the latest SIGNED snapshot (on-chain state / logs) live in + * [[ML0SnapshotStateRoutes]] -- makeL0's `extraRoutes` does not receive the `L0NodeContext` + * (flagged as a metakit follow-up in `ML0Service`). + */ class ML0CustomRoutes[F[_]: Async]( - checkpointService: CheckpointService[F, CalculatedState], + reader: CommittedReader[F, CalculatedState], subscriberRegistry: SubscriberRegistry[F] -)(implicit - context: L0NodeContext[F] ) extends MetagraphPublicRoutes[F] { implicit val fiberStatusDecoder: QueryParamDecoder[FiberStatus] = @@ -39,6 +44,9 @@ class ML0CustomRoutes[F[_]: Async]( object StatusQueryParam extends OptionalQueryParamDecoderMatcher[FiberStatus]("status") + private def calculatedState: F[Checkpoint[CalculatedState]] = + reader.committed.map(c => Checkpoint(c.ordinal, c.state)) + private val v1Routes: HttpRoutes[F] = HttpRoutes.of[F] { // Version endpoint for monitoring integration @@ -63,16 +71,13 @@ class ML0CustomRoutes[F[_]: Async]( } } - case GET -> Root / "onchain" => - context.getOnChainState[OnChain].toResponse - case GET -> Root / "checkpoint" => - checkpointService.get + calculatedState .map(_.asRight[DataApplicationValidationError]) .toResponse case GET -> Root / "state-machines" :? StatusQueryParam(statusOpt) => - checkpointService.get.map { case Checkpoint(_, state) => + calculatedState.map { case Checkpoint(_, state) => statusOpt .fold(state.stateMachines) { status => state.stateMachines.filter { case (_, fiber) => fiber.status == status } @@ -81,44 +86,24 @@ class ML0CustomRoutes[F[_]: Async]( }.toResponse case GET -> Root / "state-machines" / UUIDVar(fiberId) => - checkpointService.get.map { case Checkpoint(_, state) => + calculatedState.map { case Checkpoint(_, state) => state.stateMachines.get(fiberId).asRight[DataApplicationValidationError] }.toResponse case GET -> Root / "oracles" :? StatusQueryParam(statusOpt) => - checkpointService.get.map { case Checkpoint(_, state) => + calculatedState.map { case Checkpoint(_, state) => statusOpt .fold(state.scripts) { status => - state.scripts.filter { case (_, oracle) => oracle.status == status } + state.scripts.filter { case (_, script) => script.status == status } } .asRight[DataApplicationValidationError] }.toResponse - case GET -> Root / "oracles" / UUIDVar(oracleId) => - checkpointService.get.map { case Checkpoint(_, state) => - state.scripts.get(oracleId).asRight[DataApplicationValidationError] + case GET -> Root / "oracles" / UUIDVar(scriptId) => + calculatedState.map { case Checkpoint(_, state) => + state.scripts.get(scriptId).asRight[DataApplicationValidationError] }.toResponse - case GET -> Root / "state-machines" / UUIDVar(fiberId) / "events" => - context - .getOnChainState[OnChain] - .map(_.map { onChain => - onChain.latestLogs - .getOrElse(fiberId, List.empty) - .collect { case r: EventReceipt => r } - }) - .toResponse - - case GET -> Root / "oracles" / UUIDVar(oracleId) / "invocations" => - context - .getOnChainState[OnChain] - .map(_.map { onChain => - onChain.latestLogs - .getOrElse(oracleId, List.empty) - .collect { case i: OracleInvocation => i } - }) - .toResponse - // ========================================================================= // Webhook Management Endpoints // ========================================================================= diff --git a/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/ML0Service.scala b/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/ML0Service.scala index bc7c3455..98c1dd83 100755 --- a/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/ML0Service.scala +++ b/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/ML0Service.scala @@ -3,22 +3,25 @@ package xyz.kd5ujc.metagraph_l0 import cats.data.{NonEmptyList, Validated} import cats.effect.Async import cats.syntax.all._ -import cats.{Applicative, Parallel} +import cats.{Monad, Parallel} -import scala.collection.immutable.SortedMap +import scala.collection.immutable.{SortedMap, SortedSet} import io.constellationnetwork.currency.dataApplication._ -import io.constellationnetwork.currency.dataApplication.dataApplication.DataApplicationValidationErrorOr +import io.constellationnetwork.currency.dataApplication.dataApplication.{ + DataApplicationBlock, + DataApplicationValidationErrorOr +} import io.constellationnetwork.currency.schema.currency.CurrencyIncrementalSnapshot -import io.constellationnetwork.metagraph_sdk.MetagraphCommonService +import io.constellationnetwork.metagraph_sdk.lifecycle.committed.CommittedApp import io.constellationnetwork.metagraph_sdk.lifecycle.{CheckpointService, CombinerService, ValidationService} import io.constellationnetwork.metagraph_sdk.std.Checkpoint -import io.constellationnetwork.metagraph_sdk.std.JsonBinaryHasher.HasherOps import io.constellationnetwork.metagraph_sdk.syntax.all.CurrencyIncrementalSnapshotOps -import io.constellationnetwork.schema.SnapshotOrdinal +import io.constellationnetwork.schema.artifact.TokenUnlock +import io.constellationnetwork.schema.{GlobalIncrementalSnapshot, GlobalSnapshotInfo, SnapshotOrdinal} import io.constellationnetwork.security.hash.Hash import io.constellationnetwork.security.signature.Signed -import io.constellationnetwork.security.{Hashed, SecurityProvider} +import io.constellationnetwork.security.{Hashed, Hasher, SecurityProvider} import xyz.kd5ujc.metagraph_l0.webhooks.{NotificationStats, SubscriberRegistry, WebhookDispatcher} import xyz.kd5ujc.schema.Updates.OttochainMessage @@ -35,7 +38,9 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger object ML0Service { /** - * Create the ML0 service with webhook support + * Create the ML0 service: metakit's `CommittedApp.makeL0` (two-tier state-root commitment over + * the `CommittedView[CalculatedState]` projection, on-chain breadcrumb, `/committed/...` routes) + * plus ottochain's webhook/subscriber dispatch layered on top. * * @param httpClient HTTP client for webhook delivery * @param metagraphId The metagraph token identifier (for webhook notifications) @@ -45,6 +50,11 @@ object ML0Service { metagraphId: String = "DAG3KNyfeKUTuWpMMhormWgWSYMD1pDGB2uaWqxG" ): F[BaseDataApplicationL0Service[F]] = for { implicit0(logger: SelfAwareStructuredLogger[F]) <- Slf4jLogger.create[F] + + // The authoritative calculated state lives in the committed cell owned by + // CommittedApp.makeL0; this checkpoint is only the notification-side cache + // (rejection ordinals, consensus stats), refreshed on setCalculatedState + // exactly like the pre-committed implementation. checkpointService <- CheckpointService.make[F, CalculatedState](CalculatedState.genesis) subscriberRegistry <- SubscriberRegistry.make[F] combiner <- Combiner.make[F]().pure[F] @@ -55,115 +65,240 @@ object ML0Service { WebhookDispatcher.make[F](client, subscriberRegistry, metagraphId) } - dataApplicationL0Service <- makeBaseApplicationL0Service( - checkpointService, - combiner, - validator, - subscriberRegistry, - webhookDispatcher - ).pure[F] - } yield dataApplicationL0Service - - private def makeBaseApplicationL0Service[F[+_]: Async]( - checkpointService: CheckpointService[F, CalculatedState], - combiner: CombinerService[F, OttochainMessage, OnChain, CalculatedState], - validator: ValidationService[F, OttochainMessage, OnChain, CalculatedState], - subscriberRegistry: SubscriberRegistry[F], - webhookDispatcher: Option[WebhookDispatcher[F]] - )(implicit logger: SelfAwareStructuredLogger[F]): BaseDataApplicationL0Service[F] = - BaseDataApplicationL0Service[F, OttochainMessage, OnChain, CalculatedState]( - new MetagraphCommonService[F, OttochainMessage, OnChain, CalculatedState, L0NodeContext[F]] - with DataApplicationL0Service[F, OttochainMessage, OnChain, CalculatedState] { - - override def genesis: DataState[OnChain, CalculatedState] = - DataState(OnChain.genesis, CalculatedState.genesis) - - override def onSnapshotConsensusResult(snapshot: Hashed[CurrencyIncrementalSnapshot])(implicit - A: Applicative[F] - ): F[Unit] = - (for { - signedUpdates <- snapshot.signed.value.getSignedUpdates[OttochainMessage] - _ <- logger.info(s"Got ${signedUpdates.size} updates for ordinal: ${snapshot.ordinal.value}") - - // Dispatch webhooks if dispatcher is configured - _ <- webhookDispatcher match { - case Some(dispatcher) => - // Get current state for notification stats - checkpointService.get.flatMap { case Checkpoint(_, state) => - val stats = NotificationStats( - updatesProcessed = signedUpdates.size, - stateMachinesActive = state.stateMachines.count { case (_, fiber) => - fiber.status == FiberStatus.Active - }, - scriptsActive = state.scripts.count { case (_, script) => - script.status == FiberStatus.Active - } - ) - - // Fire-and-forget: start webhook dispatch but don't wait for it - Async[F].start(dispatcher.dispatch(snapshot, stats)).void - } - - case None => - Async[F].unit - } - } yield ()).handleErrorWith(logger.error(_)("Error during onSnapshotConsensusResult")) - - override def validateData( - state: DataState[OnChain, CalculatedState], - updates: NonEmptyList[Signed[OttochainMessage]] - )(implicit context: L0NodeContext[F]): F[DataApplicationValidationErrorOr[Unit]] = - for { - // Get current ordinal for rejection tracking - ordinal <- checkpointService.get.map(_.ordinal) - - // Validate each update individually to track per-update rejections - results <- updates.toList.traverse { signedUpdate => - validator.validateSignedUpdate(state, signedUpdate).map(signedUpdate -> _) - } - - // Dispatch rejections (fire-and-forget) for failed validations - _ <- webhookDispatcher match { - case Some(dispatcher) => - results.collect { case (signedUpdate, Validated.Invalid(errors)) => - Async[F].start(dispatcher.dispatchRejection(ordinal, signedUpdate, errors)).void - }.sequence_ - case None => - Async[F].unit - } - - // Return combined result (all errors accumulated) - } yield results.map(_._2).combineAll - - override def combine( - state: DataState[OnChain, CalculatedState], - updates: List[Signed[OttochainMessage]] - )(implicit context: L0NodeContext[F]): F[DataState[OnChain, CalculatedState]] = { - // Sort updates using canonical ordering from OttochainMessage companion object. - // This ensures creates are processed before transitions, and transitions for the - // same fiber are processed in sequence number order. - val sortedUpdates = updates.sorted(OttochainMessage.signedOrdering) - - combiner.foldLeft( - state.focus(_.onChain.latestLogs).replace(SortedMap.empty), - sortedUpdates - ) - } - - override def getCalculatedState(implicit - context: L0NodeContext[F] - ): F[(SnapshotOrdinal, CalculatedState)] = - checkpointService.get.map { case Checkpoint(ordinal, state) => ordinal -> state } - - override def setCalculatedState(ordinal: SnapshotOrdinal, state: CalculatedState)(implicit - context: L0NodeContext[F] - ): F[Boolean] = checkpointService.set(Checkpoint(ordinal, state)) - - override def hashCalculatedState(state: CalculatedState)(implicit context: L0NodeContext[F]): F[Hash] = - state.computeDigest - - override def routes(implicit context: L0NodeContext[F]): HttpRoutes[F] = - new ML0CustomRoutes[F](checkpointService, subscriberRegistry).public - } + committedService <- CommittedApp.makeL0[F, OttochainMessage, OnChain, CalculatedState]( + DataState(OnChain.genesis, CalculatedState.genesis), + orderedCombiner(combiner), + rejectionNotifyingValidator(validator, checkpointService, webhookDispatcher), + extraRoutes = Some(reader => new ML0CustomRoutes[F](reader, subscriberRegistry).public) ) + } yield withConsensusHooks(committedService, checkpointService, webhookDispatcher) + + /** + * Sorts each batch with the canonical `OttochainMessage` ordering (creates before transitions, + * transitions per fiber in sequence-number order) and resets the per-snapshot `latestLogs` + * before folding -- the exact behavior of the previous hand-rolled `combine` override, expressed + * as the dev combiner handed to `CommittedApp.makeL0`. + */ + private def orderedCombiner[F[_]: Monad]( + inner: CombinerService[F, OttochainMessage, OnChain, CalculatedState] + ): CombinerService[F, OttochainMessage, OnChain, CalculatedState] = + new CombinerService[F, OttochainMessage, OnChain, CalculatedState] { + + override def foldLeft( + previous: DataState[OnChain, CalculatedState], + batch: List[Signed[OttochainMessage]] + )(implicit ctx: L0NodeContext[F]): F[DataState[OnChain, CalculatedState]] = + inner.foldLeft( + previous.focus(_.onChain.latestLogs).replace(SortedMap.empty), + batch.sorted(OttochainMessage.signedOrdering) + ) + + override def insert( + previous: DataState[OnChain, CalculatedState], + update: Signed[OttochainMessage] + )(implicit ctx: L0NodeContext[F]): F[DataState[OnChain, CalculatedState]] = + inner.insert(previous, update) + } + + /** + * Per-update validation with fire-and-forget rejection webhooks, expressed as the dev validator + * handed to `CommittedApp.makeL0` (whose `validateData` delegates here with the unwrapped + * state). Result accumulation is identical to the previous hand-rolled override. + */ + private def rejectionNotifyingValidator[F[+_]: Async: Parallel]( + inner: ValidationService[F, OttochainMessage, OnChain, CalculatedState], + checkpointService: CheckpointService[F, CalculatedState], + webhookDispatcher: Option[WebhookDispatcher[F]] + ): ValidationService[F, OttochainMessage, OnChain, CalculatedState] = + new ValidationService[F, OttochainMessage, OnChain, CalculatedState] { + + override def validateUpdate( + update: OttochainMessage + )(implicit ctx: L1NodeContext[F]): F[DataApplicationValidationErrorOr[Unit]] = + inner.validateUpdate(update) + + override def validateSignedUpdate( + current: DataState[OnChain, CalculatedState], + signedUpdate: Signed[OttochainMessage] + )(implicit context: L0NodeContext[F]): F[DataApplicationValidationErrorOr[Unit]] = + inner.validateSignedUpdate(current, signedUpdate) + + override def validateData( + current: DataState[OnChain, CalculatedState], + batch: NonEmptyList[Signed[OttochainMessage]] + )(implicit ctx: L0NodeContext[F]): F[DataApplicationValidationErrorOr[Unit]] = + for { + // Current ordinal for rejection tracking (notification metadata only) + ordinal <- checkpointService.get.map(_.ordinal) + + // Validate each update individually to track per-update rejections + results <- batch.toList.traverse { signedUpdate => + inner.validateSignedUpdate(current, signedUpdate).map(signedUpdate -> _) + } + + // Dispatch rejections (fire-and-forget) for failed validations + _ <- webhookDispatcher match { + case Some(dispatcher) => + results.collect { case (signedUpdate, Validated.Invalid(errors)) => + Async[F].start(dispatcher.dispatchRejection(ordinal, signedUpdate, errors)).void + }.sequence_ + case None => + Async[F].unit + } + + // Return combined result (all errors accumulated) + } yield results.map(_._2).combineAll + } + + /** + * Delegating wrapper adding the consensus-result webhook dispatch and the snapshot-backed + * custom routes on top of the service assembled by `CommittedApp.makeL0`. + * + * FLAGGED metakit follow-up: `CommittedApp.makeL0` currently exposes no + * `onSnapshotConsensusResult` hook and its `extraRoutes` function receives only the + * `CommittedReader` (not the `L0NodeContext`), so (a) webhook dispatch and (b) routes that read + * the latest signed snapshot have to be layered here. Once makeL0 grows an + * `onConsensusResult` argument and context-aware `extraRoutes`, this wrapper disappears. + */ + private def withConsensusHooks[F[+_]: Async]( + underlying: BaseDataApplicationL0Service[F], + checkpointService: CheckpointService[F, CalculatedState], + webhookDispatcher: Option[WebhookDispatcher[F]] + )(implicit logger: SelfAwareStructuredLogger[F]): BaseDataApplicationL0Service[F] = + new BaseDataApplicationL0Service[F] { + + // getSignedUpdates decodes the snapshot's blocks via the service's own update codecs + implicit private val dataUpdateEncoder: io.circe.Encoder[DataUpdate] = underlying.dataEncoder + implicit private val dataUpdateDecoder: io.circe.Decoder[DataUpdate] = underlying.dataDecoder + + override def genesis: DataState.Base = underlying.genesis + + override def onSnapshotConsensusResult(snapshot: Hashed[CurrencyIncrementalSnapshot]): F[Unit] = + (for { + signedUpdates <- snapshot.signed.value.getSignedUpdates[OttochainMessage] + _ <- logger.info(s"Got ${signedUpdates.size} updates for ordinal: ${snapshot.ordinal.value}") + + // Dispatch webhooks if dispatcher is configured + _ <- webhookDispatcher match { + case Some(dispatcher) => + // Get current state for notification stats + checkpointService.get.flatMap { case Checkpoint(_, state) => + val stats = NotificationStats( + updatesProcessed = signedUpdates.size, + stateMachinesActive = state.stateMachines.count { case (_, fiber) => + fiber.status == FiberStatus.Active + }, + scriptsActive = state.scripts.count { case (_, script) => + script.status == FiberStatus.Active + } + ) + + // Fire-and-forget: start webhook dispatch but don't wait for it + Async[F].start(dispatcher.dispatch(snapshot, stats)).void + } + + case None => + Async[F].unit + } + } yield ()).handleErrorWith(logger.error(_)("Error during onSnapshotConsensusResult")) >> + underlying.onSnapshotConsensusResult(snapshot) + + override def setCalculatedState(ordinal: SnapshotOrdinal, state: DataCalculatedState)(implicit + context: L0NodeContext[F] + ): F[Boolean] = + (state match { + case cs: CalculatedState => checkpointService.set(Checkpoint(ordinal, cs)).void + case _ => Async[F].unit + }) >> underlying.setCalculatedState(ordinal, state) + + override def routes(implicit context: L0NodeContext[F]): HttpRoutes[F] = + underlying.routes <+> new ML0SnapshotStateRoutes[F].public + + // ---- everything below is pure delegation ---- + + override def onGlobalSnapshotPull( + snapshot: Hashed[GlobalIncrementalSnapshot], + context: GlobalSnapshotInfo + ): F[Unit] = + underlying.onGlobalSnapshotPull(snapshot, context) + + override def validateData(state: DataState.Base, updates: NonEmptyList[Signed[DataUpdate]])(implicit + context: L0NodeContext[F] + ): F[DataApplicationValidationErrorOr[Unit]] = + underlying.validateData(state, updates) + + override def combine(state: DataState.Base, updates: List[Signed[DataUpdate]])(implicit + context: L0NodeContext[F] + ): F[DataState.Base] = + underlying.combine(state, updates) + + override def getCalculatedState(implicit context: L0NodeContext[F]): F[(SnapshotOrdinal, DataCalculatedState)] = + underlying.getCalculatedState + + override def hashCalculatedState(state: DataCalculatedState)(implicit context: L0NodeContext[F]): F[Hash] = + underlying.hashCalculatedState(state) + + override def getTokenUnlocks(state: DataState[DataOnChainState, DataCalculatedState])(implicit + context: L0NodeContext[F], + async: Async[F], + hasher: Hasher[F] + ): F[SortedSet[TokenUnlock]] = + underlying.getTokenUnlocks(state)(context, async, hasher) + + override def serializeState(state: DataOnChainState): F[Array[Byte]] = underlying.serializeState(state) + + override def deserializeState(bytes: Array[Byte]): F[Either[Throwable, DataOnChainState]] = + underlying.deserializeState(bytes) + override def serializeUpdate(update: DataUpdate): F[Array[Byte]] = underlying.serializeUpdate(update) + + override def deserializeUpdate(bytes: Array[Byte]): F[Either[Throwable, DataUpdate]] = + underlying.deserializeUpdate(bytes) + + override def serializeBlock(block: Signed[DataApplicationBlock]): F[Array[Byte]] = + underlying.serializeBlock(block) + + override def deserializeBlock(bytes: Array[Byte]): F[Either[Throwable, Signed[DataApplicationBlock]]] = + underlying.deserializeBlock(bytes) + + override def serializeCalculatedState(state: DataCalculatedState): F[Array[Byte]] = + underlying.serializeCalculatedState(state) + + override def deserializeCalculatedState(bytes: Array[Byte]): F[Either[Throwable, DataCalculatedState]] = + underlying.deserializeCalculatedState(bytes) + + override def dataEncoder: io.circe.Encoder[DataUpdate] = underlying.dataEncoder + override def dataDecoder: io.circe.Decoder[DataUpdate] = underlying.dataDecoder + + override def signedDataEntityEncoder: org.http4s.EntityEncoder[F, Signed[DataUpdate]] = + underlying.signedDataEntityEncoder + + override def signedDataEntityDecoder: org.http4s.EntityDecoder[F, Signed[DataUpdate]] = + underlying.signedDataEntityDecoder + + override def calculatedStateEncoder: io.circe.Encoder[DataCalculatedState] = underlying.calculatedStateEncoder + override def calculatedStateDecoder: io.circe.Decoder[DataCalculatedState] = underlying.calculatedStateDecoder + + override def hashDataUpdate: Option[DataUpdate => F[Hash]] = underlying.hashDataUpdate + + override def validateFee( + gsOrdinal: SnapshotOrdinal + )(dataUpdate: Signed[DataUpdate], maybeFeeTransaction: Option[Signed[FeeTransaction]])(implicit + context: L0NodeContext[F], + A: cats.Applicative[F] + ): F[DataApplicationValidationErrorOr[Unit]] = + underlying.validateFee(gsOrdinal)(dataUpdate, maybeFeeTransaction) + + override def extractFees(ds: Seq[Signed[DataUpdate]])(implicit + context: L0NodeContext[F], + A: cats.Applicative[F] + ): F[Seq[Signed[FeeTransaction]]] = + underlying.extractFees(ds) + + override def extractFees(ds: Seq[Signed[DataUpdate]])(implicit + A: cats.Applicative[F] + ): F[Seq[Signed[FeeTransaction]]] = + underlying.extractFees(ds)(A) + + override def routesPrefix: io.constellationnetwork.routes.internal.ExternalUrlPrefix = underlying.routesPrefix + } } diff --git a/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/ML0SnapshotStateRoutes.scala b/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/ML0SnapshotStateRoutes.scala new file mode 100644 index 00000000..2edaade0 --- /dev/null +++ b/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/ML0SnapshotStateRoutes.scala @@ -0,0 +1,61 @@ +package xyz.kd5ujc.metagraph_l0 + +import cats.effect.Async +import cats.syntax.all._ + +import io.constellationnetwork.currency.dataApplication.L0NodeContext +import io.constellationnetwork.metagraph_sdk.MetagraphPublicRoutes +import io.constellationnetwork.metagraph_sdk.lifecycle.committed.CommittedOnChain +import io.constellationnetwork.metagraph_sdk.syntax.all.L0ContextOps + +import xyz.kd5ujc.schema.OnChain +import xyz.kd5ujc.schema.fiber.FiberLogEntry.{EventReceipt, OracleInvocation} + +import io.circe.{Decoder, Encoder} +import org.http4s.HttpRoutes +import org.http4s.server.Router + +/** + * The L0 routes that read the latest SIGNED snapshot's on-chain state from the + * [[L0NodeContext]] (vs the committed-cell-backed reads in [[ML0CustomRoutes]]). + * + * The registered on-chain type is `CommittedOnChain[OnChain]` (the breadcrumb wrapper added by + * `CommittedApp.makeL0`), so the snapshot bytes are decoded as the wrapper and unwrapped here -- + * the response shapes are unchanged from the pre-committed implementation. The breadcrumb itself + * is served by `GET /committed/root`. + */ +class ML0SnapshotStateRoutes[F[_]: Async](implicit context: L0NodeContext[F]) extends MetagraphPublicRoutes[F] { + + implicit private val onChainEncoder: Encoder[CommittedOnChain[OnChain]] = CommittedOnChain.encoder[OnChain] + implicit private val onChainDecoder: Decoder[CommittedOnChain[OnChain]] = CommittedOnChain.decoder[OnChain] + + private def latestOnChain = context.getOnChainState[CommittedOnChain[OnChain]].map(_.map(_.inner)) + + private val v1Routes: HttpRoutes[F] = HttpRoutes.of[F] { + + case GET -> Root / "onchain" => + latestOnChain.toResponse + + case GET -> Root / "state-machines" / UUIDVar(fiberId) / "events" => + latestOnChain + .map(_.map { onChain => + onChain.latestLogs + .getOrElse(fiberId, List.empty) + .collect { case r: EventReceipt => r } + }) + .toResponse + + case GET -> Root / "oracles" / UUIDVar(scriptId) / "invocations" => + latestOnChain + .map(_.map { onChain => + onChain.latestLogs + .getOrElse(scriptId, List.empty) + .collect { case i: OracleInvocation => i } + }) + .toResponse + } + + protected val routes: HttpRoutes[F] = Router( + "/v1" -> v1Routes + ) +} From d0caef69c1fb07353d3f5bdbe33daf74fa8e34db Mon Sep 17 00:00:00 2001 From: OttoBot Date: Wed, 10 Jun 2026 20:07:24 -0500 Subject: [PATCH 3/6] feat(l0): committed-catalog hydration client MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A node bootstrapped from snapshot download seeds its committed cell from the on-chain breadcrumb (SeededCatalog): state-dict proofs work, but catalog proofs and consensus transitions need the full epoch-rollup contents. CommittedHydrationClient closes the gap over the public routes: GET /committed/root (act only on hydrated=false), GET /committed/catalog, POST /committed/hydrate. The install is verify-gated server-side (contents must recompose to the attested catalog root), so peers are untrusted; they are tried in order, and the loop polls until the cell reports hydrated. Wired in Main behind config (all-optional, env-driven): COMMITTED_HYDRATION_SELF_URL, COMMITTED_HYDRATION_PEERS (comma-separated), COMMITTED_HYDRATION_INTERVAL (default 30s) — a supervised background task only when self-url and at least one peer are set. Co-Authored-By: Claude Opus 4.8 --- .../l0/src/main/resources/application.conf | 6 + .../scala/xyz/kd5ujc/metagraph_l0/Main.scala | 15 ++ .../metagraph_l0/app/ML0AppConfig.scala | 19 ++- .../metagraph_l0/app/ML0AppConfigOps.scala | 15 ++ .../committed/CommittedHydrationClient.scala | 136 ++++++++++++++++++ 5 files changed, 189 insertions(+), 2 deletions(-) create mode 100644 modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/committed/CommittedHydrationClient.scala diff --git a/modules/l0/src/main/resources/application.conf b/modules/l0/src/main/resources/application.conf index b3214a8c..49b7cfe7 100755 --- a/modules/l0/src/main/resources/application.conf +++ b/modules/l0/src/main/resources/application.conf @@ -2,3 +2,9 @@ webhook { url = ${?WEBHOOK_URL} metagraph-id = ${?CL_L0_TOKEN_IDENTIFIER} } + +hydration { + self-url = ${?COMMITTED_HYDRATION_SELF_URL} + peers = ${?COMMITTED_HYDRATION_PEERS} + interval = ${?COMMITTED_HYDRATION_INTERVAL} +} diff --git a/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/Main.scala b/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/Main.scala index f3168db2..b4b0da93 100755 --- a/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/Main.scala +++ b/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/Main.scala @@ -16,6 +16,7 @@ import io.constellationnetwork.security.SecurityProvider import xyz.kd5ujc.buildinfo.BuildInfo import xyz.kd5ujc.metagraph_l0.app.ML0AppConfig import xyz.kd5ujc.metagraph_l0.app.ML0AppConfigOps._ +import xyz.kd5ujc.metagraph_l0.committed.CommittedHydrationClient import xyz.kd5ujc.shared_data.app._ import org.http4s.ember.client.EmberClientBuilder @@ -49,5 +50,19 @@ object Main metagraphId = config.webhook.metagraphId.getOrElse("DAG3KNyfeKUTuWpMMhormWgWSYMD1pDGB2uaWqxG") ) .asResource + + // Committed-catalog hydration: a node bootstrapped from snapshot download is breadcrumb- + // SEEDED but not hydrated; poll peers for the catalog contents until the own node accepts + // them (the /committed/hydrate endpoint is verify-gated against the attested root). + _ <- (config.hydration.selfUrl, config.hydration.peers.getOrElse(List.empty)) match { + case (Some(selfUrl), peers @ (_ :: _)) => + EmberClientBuilder.default[IO].build.evalMap { hydrationHttpClient => + val hydration = new CommittedHydrationClient[IO](hydrationHttpClient) + supervisor + .supervise(hydration.awaitHydrated(selfUrl, peers, config.hydration.effectiveInterval)) + .void + } + case _ => Resource.unit[IO] + } } yield l0Service).some } diff --git a/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/app/ML0AppConfig.scala b/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/app/ML0AppConfig.scala index 466a70a1..f35266ec 100755 --- a/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/app/ML0AppConfig.scala +++ b/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/app/ML0AppConfig.scala @@ -1,12 +1,15 @@ package xyz.kd5ujc.metagraph_l0.app +import scala.concurrent.duration.{DurationInt, FiniteDuration} + import xyz.kd5ujc.shared_data.app.SharedAppConfig import org.http4s.Uri case class ML0AppConfig( - node: SharedAppConfig.NodeConfig, - webhook: ML0AppConfig.WebhookConfig + node: SharedAppConfig.NodeConfig, + webhook: ML0AppConfig.WebhookConfig, + hydration: ML0AppConfig.HydrationConfig ) extends SharedAppConfig object ML0AppConfig { @@ -15,4 +18,16 @@ object ML0AppConfig { url: Option[Uri], metagraphId: Option[String] ) + + /** + * Committed-catalog hydration (see `CommittedHydrationClient`): active only when both + * `selfUrl` (this node's data-application base URL) and at least one peer are configured. + */ + case class HydrationConfig( + selfUrl: Option[Uri], + peers: Option[List[Uri]], + interval: Option[FiniteDuration] + ) { + def effectiveInterval: FiniteDuration = interval.getOrElse(30.seconds) + } } diff --git a/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/app/ML0AppConfigOps.scala b/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/app/ML0AppConfigOps.scala index ff4cf8a1..4f1a7f94 100644 --- a/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/app/ML0AppConfigOps.scala +++ b/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/app/ML0AppConfigOps.scala @@ -1,11 +1,26 @@ package xyz.kd5ujc.metagraph_l0.app +import cats.syntax.all._ + import xyz.kd5ujc.shared_data.app.ApplicationConfigOps._ +import org.http4s.Uri import pureconfig.ConfigReader +import pureconfig.error.CannotConvert import pureconfig.generic.semiauto.deriveReader object ML0AppConfigOps { + + // Env-var friendly: a comma-separated string of URIs (e.g. COMMITTED_HYDRATION_PEERS) + implicit val uriListReader: ConfigReader[List[Uri]] = + ConfigReader[String].emap { + _.split(',').toList + .map(_.trim) + .filter(_.nonEmpty) + .traverse(s => Uri.fromString(s).leftMap(ex => CannotConvert(s, "URI", ex.getMessage))) + } + implicit val webhookConfigReader: ConfigReader[ML0AppConfig.WebhookConfig] = deriveReader + implicit val hydrationConfigReader: ConfigReader[ML0AppConfig.HydrationConfig] = deriveReader implicit val applicationConfigReader: ConfigReader[ML0AppConfig] = deriveReader } diff --git a/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/committed/CommittedHydrationClient.scala b/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/committed/CommittedHydrationClient.scala new file mode 100644 index 00000000..8be89f3d --- /dev/null +++ b/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/committed/CommittedHydrationClient.scala @@ -0,0 +1,136 @@ +package xyz.kd5ujc.metagraph_l0.committed + +import cats.effect.Async +import cats.syntax.all._ + +import scala.concurrent.duration.FiniteDuration + +import io.constellationnetwork.metagraph_sdk.lifecycle.committed.CatalogContents + +import io.circe.Json +import org.http4s.circe.CirceEntityCodec.{circeEntityDecoder, circeEntityEncoder} +import org.http4s.client.Client +import org.http4s.{Method, Request, Status, Uri} +import org.typelevel.log4cats.SelfAwareStructuredLogger + +/** + * Catalog hydration for a breadcrumb-seeded node. + * + * A node that bootstraps from snapshot download seeds its committed cell from the on-chain + * breadcrumb (`CatalogView.SeededCatalog`): it can verify `hashCalculatedState` and serve + * state-dict proofs, but catalog proofs and consensus transitions need the full epoch-rollup + * contents. This client closes that gap over the public routes: + * + * 1. `GET /committed/root` -- act only when the cell reports `hydrated: false`; + * 2. `GET /committed/catalog` -- fetch the [[CatalogContents]] payload from a peer; + * 3. `POST /committed/hydrate` -- install it on the own node. + * + * Trustless by construction: step 3 is verify-gated SERVER-side -- `CommittedState.hydrate` + * rejects contents that do not recompose to the consensus-attested catalog root + * (`HydrationRootMismatch`), so a malicious or stale peer can cause a retry, never a bad state. + * Peers are tried in order until one payload is accepted. + * + * Base URIs must point at the node's data-application route root (the segment under which the + * `/committed/...` routes are mounted). + */ +final class CommittedHydrationClient[F[_]: Async]( + client: Client[F] +)(implicit logger: SelfAwareStructuredLogger[F]) { + + import CommittedHydrationClient._ + + /** One atomic read of a node's committed root descriptor. */ + def rootStatus(node: Uri): F[RootStatus] = + client.expect[Json](node / "committed" / "root").flatMap { json => + val cursor = json.hcursor + (cursor.get[Long]("ordinal"), cursor.get[Boolean]("hydrated")) + .mapN(RootStatus(_, _)) + .liftTo[F] + } + + /** + * A single hydration pass: no-op on an already-hydrated cell, otherwise try each peer in order + * until the own node accepts a payload. + */ + def hydrateOnce(self: Uri, peers: List[Uri]): F[Outcome] = + rootStatus(self).flatMap { status => + if (status.hydrated) (Outcome.AlreadyHydrated(status.ordinal): Outcome).pure[F] + else tryPeers(self, peers, Nil) + } + + /** + * Poll [[hydrateOnce]] every `interval` until the cell is hydrated (a transition or journal + * recovery may also hydrate it independently, which terminates the loop the same way). Errors + * of a whole pass (e.g. own node not yet serving routes) are logged and retried. + */ + def awaitHydrated(self: Uri, peers: List[Uri], interval: FiniteDuration): F[Outcome] = + hydrateOnce(self, peers) + .handleErrorWith { err => + logger + .warn(s"committed hydration pass failed: ${err.getMessage}") + .as(Outcome.NotHydrated(List(self -> err.getMessage)): Outcome) + } + .flatMap { + case done @ (Outcome.AlreadyHydrated(_) | Outcome.Hydrated(_, _)) => + (done: Outcome).pure[F] + case Outcome.NotHydrated(failures) => + logger.info( + s"committed catalog not hydrated yet (${failures.size} attempt(s) failed); retrying in $interval" + ) >> Async[F].sleep(interval) >> awaitHydrated(self, peers, interval) + } + + private def tryPeers(self: Uri, peers: List[Uri], failures: List[(Uri, String)]): F[Outcome] = + peers match { + case Nil => (Outcome.NotHydrated(failures.reverse): Outcome).pure[F] + case peer :: rest => + hydrateFrom(self, peer).flatMap { + case Right(outcome) => (outcome: Outcome).pure[F] + case Left(reason) => + logger.info(s"committed hydration from $peer failed: $reason") >> + tryPeers(self, rest, (peer -> reason) :: failures) + } + } + + private def hydrateFrom(self: Uri, peer: Uri): F[Either[String, Outcome]] = + client + .expect[CatalogContents](peer / "committed" / "catalog") + .attempt + .flatMap { + case Left(err) => s"failed to fetch /committed/catalog: ${err.getMessage}".asLeft[Outcome].pure[F] + case Right(contents) => + val request = Request[F](Method.POST, self / "committed" / "hydrate").withEntity(contents) + client.run(request).use { response => + response.status match { + case Status.Ok => + response.as[Json].map(json => (Outcome.Hydrated(peer, json): Outcome).asRight[String]) + case status => + response + .as[Json] + .map(_.hcursor.get[String]("error").getOrElse("")) + .handleError(_ => "") + .map(detail => s"own node rejected hydration payload ($status): $detail".asLeft[Outcome]) + } + } + } + .handleError(err => s"hydration attempt errored: ${err.getMessage}".asLeft[Outcome]) +} + +object CommittedHydrationClient { + + /** The fields of `GET /committed/root` the client acts on. */ + final case class RootStatus(ordinal: Long, hydrated: Boolean) + + sealed trait Outcome extends Product with Serializable + + object Outcome { + + /** The cell already holds live catalog contents (genesis node, journal recovery, or a prior pass). */ + final case class AlreadyHydrated(ordinal: Long) extends Outcome + + /** A peer's payload recomposed to the attested catalog root and was installed. */ + final case class Hydrated(peer: Uri, response: Json) extends Outcome + + /** Every peer failed this pass; `failures` records one reason per attempted peer. */ + final case class NotHydrated(failures: List[(Uri, String)]) extends Outcome + } +} From fbbab70c4b59d1df39b6ed31bdf8d144ead8215a Mon Sep 17 00:00:00 2001 From: OttoBot Date: Wed, 10 Jun 2026 20:07:35 -0500 Subject: [PATCH 4/6] test(l0): committed adapter + hydration client suites CommittedAdapterSuite: - projection: fibers -> fiber/, scripts -> script/, valid CommitKey grammar, deterministic enumeration regardless of in-memory assembly order, empty delta on identical states; - fiber transition: minimal single-key delta that agrees with the default structural diff, MPT root change, and delta-application == full rebuild (the invariant CommittedState.setCommitted asserts every transition); - service-level combine: on-chain breadcrumb advances ordinal 0 -> 1 -> 2, mptRoot changes with state, catalogRoot changes every snapshot; - routes: /committed/root responds (ordinal 0, hydrated, roots + combined hash) and the committed-cell-backed /v1 custom routes respond. CommittedHydrationClientSuite drives the client against an in-process stub network: no-op when already hydrated, peer fallback chain, the verify-gate rejection path, and full-failure reporting. Co-Authored-By: Claude Opus 4.8 --- .../metagraph_l0/CommittedAdapterSuite.scala | 211 ++++++++++++++++++ .../CommittedHydrationClientSuite.scala | 118 ++++++++++ 2 files changed, 329 insertions(+) create mode 100644 modules/l0/src/test/scala/xyz/kd5ujc/metagraph_l0/CommittedAdapterSuite.scala create mode 100644 modules/l0/src/test/scala/xyz/kd5ujc/metagraph_l0/committed/CommittedHydrationClientSuite.scala diff --git a/modules/l0/src/test/scala/xyz/kd5ujc/metagraph_l0/CommittedAdapterSuite.scala b/modules/l0/src/test/scala/xyz/kd5ujc/metagraph_l0/CommittedAdapterSuite.scala new file mode 100644 index 00000000..ba0f49d6 --- /dev/null +++ b/modules/l0/src/test/scala/xyz/kd5ujc/metagraph_l0/CommittedAdapterSuite.scala @@ -0,0 +1,211 @@ +package xyz.kd5ujc.metagraph_l0 + +import cats.effect.IO +import cats.effect.std.UUIDGen +import cats.syntax.all._ + +import scala.collection.immutable.SortedMap + +import io.constellationnetwork.currency.dataApplication.{DataOnChainState, DataState, L0NodeContext} +import io.constellationnetwork.metagraph_sdk.json_logic._ +import io.constellationnetwork.metagraph_sdk.lifecycle.committed.{ + CommitKey, + CommittedBreadcrumb, + CommittedCommitment, + CommittedOnChain, + CommittedView +} +import io.constellationnetwork.security.SecurityProvider +import io.constellationnetwork.security.signature.Signed + +import xyz.kd5ujc.schema.fiber._ +import xyz.kd5ujc.schema.{CalculatedState, OnChain, Updates} +import xyz.kd5ujc.shared_data.lifecycle.Combiner +import xyz.kd5ujc.shared_test.Participant._ +import xyz.kd5ujc.shared_test.TestFixture + +import io.circe.Json +import io.circe.parser.parse +import org.http4s.circe.CirceEntityCodec.circeEntityDecoder +import org.http4s.implicits._ +import org.http4s.{Method, Request, Status} +import weaver.SimpleIOSuite + +/** + * The CommittedState adapter: + * + * - `CommittedView[CalculatedState]` projects fibers under `fiber/` and scripts under + * `script/`, deterministically, with the delta override agreeing with the default + * structural diff; + * - a fiber transition yields a minimal delta whose trie application reproduces the full + * rebuild (the invariant `CommittedState.setCommitted` asserts on every transition); + * - the assembled L0 service emits the on-chain breadcrumb from `combine`; + * - the `/committed/...` and committed-cell-backed custom routes respond. + */ +object CommittedAdapterSuite extends SimpleIOSuite { + + private val view: CommittedView[CalculatedState] = CommittedView[CalculatedState] + + private def simpleMachine: StateMachineDefinition = { + val draft = StateId("draft") + val active = StateId("active") + StateMachineDefinition( + states = Map( + draft -> State(draft, isFinal = false), + active -> State(active, isFinal = true) + ), + initialState = draft, + transitions = List( + Transition( + from = draft, + to = active, + eventName = "activate", + guard = ConstExpression(BoolValue(true)), + effect = ConstExpression(MapValue(Map("activated" -> BoolValue(true)))) + ) + ) + ) + } + + private def breadcrumbOf(onChain: DataOnChainState): Option[CommittedBreadcrumb] = + onChain match { + case c: CommittedOnChain[_] => c.breadcrumb.some + case _ => none + } + + test("projection: fibers -> fiber/, scripts -> script/, deterministic enumeration") { + TestFixture.resource().use { fixture => + implicit val s: SecurityProvider[IO] = fixture.securityProvider + implicit val l0ctx: L0NodeContext[IO] = fixture.l0Context + for { + combiner <- Combiner.make[IO]().pure[IO] + + fiberA <- UUIDGen.randomUUID[IO] + fiberB <- UUIDGen.randomUUID[IO] + scriptId <- UUIDGen.randomUUID[IO] + + createA = Updates.CreateStateMachine(fiberA, simpleMachine, MapValue(Map.empty)) + createB = Updates.CreateStateMachine(fiberB, simpleMachine, MapValue(Map.empty)) + scriptProgram <- IO.fromEither(parse("""{"result": "success"}""").flatMap(_.as[JsonLogicExpression])) + createScript = Updates.CreateScript(scriptId, scriptProgram, None, AccessControlPolicy.Public) + + proofA <- fixture.registry.generateProofs(createA, Set(Alice, Bob)) + proofB <- fixture.registry.generateProofs(createB, Set(Alice, Bob)) + proofS <- fixture.registry.generateProofs(createScript, Set(Alice)) + + state0 = DataState(OnChain.genesis, CalculatedState.genesis) + state1 <- combiner.insert(state0, Signed(createA, proofA)) + state2 <- combiner.insert(state1, Signed(createB, proofB)) + state3 <- combiner.insert(state2, Signed(createScript, proofS)) + + entries = view.entries(state3.calculated) + + // The same records assembled in a different in-memory order project identically + reassembled = CalculatedState( + SortedMap.from(state3.calculated.stateMachines.toList.reverse), + SortedMap.from(state3.calculated.scripts.toList.reverse) + ) + } yield expect.same( + entries.keySet.map(_.value), + Set(s"fiber/$fiberA", s"fiber/$fiberB", s"script/$scriptId") + ) and + expect(entries.keys.forall(k => CommitKey.from(k.value).isRight)) and + expect(entries.keys.map(_.namespace).toSet == Set("fiber", "script")) and + expect.same(view.entries(reassembled).toList, entries.toList) and + expect(view.delta(state3.calculated, reassembled).isEmpty) + } + } + + test("fiber transition: minimal delta, delta-applied trie == rebuilt trie, MPT root changes") { + TestFixture.resource().use { fixture => + implicit val s: SecurityProvider[IO] = fixture.securityProvider + implicit val l0ctx: L0NodeContext[IO] = fixture.l0Context + for { + combiner <- Combiner.make[IO]().pure[IO] + fiberId <- UUIDGen.randomUUID[IO] + + create = Updates.CreateStateMachine(fiberId, simpleMachine, MapValue(Map.empty)) + createProof <- fixture.registry.generateProofs(create, Set(Alice, Bob)) + created <- combiner.insert(DataState(OnChain.genesis, CalculatedState.genesis), Signed(create, createProof)) + + transition = Updates.TransitionStateMachine(fiberId, "activate", MapValue(Map.empty), FiberOrdinal.MinValue) + transitionProof <- fixture.registry.generateProofs(transition, Set(Alice, Bob)) + transitioned <- combiner.insert(created, Signed(transition, transitionProof)) + + prevEntries = view.entries(created.calculated) + nextEntries = view.entries(transitioned.calculated) + delta = view.delta(created.calculated, transitioned.calculated) + + // The overridden delta must agree with the default structural diff over entries + expectedUpserts = nextEntries.filter { case (k, v) => !prevEntries.get(k).contains(v) } + expectedRemoves = prevEntries.keySet.diff(nextEntries.keySet) + + prevTrie <- CommittedCommitment.buildTrie[IO](prevEntries) + nextTrie <- CommittedCommitment.buildTrie[IO](nextEntries) + appliedTrie <- CommittedCommitment.applyDelta[IO](prevTrie, delta) + } yield expect( + transitioned.calculated.stateMachines.get(fiberId).map(_.currentState).contains(StateId("active")) + ) and + expect.same(delta.upserts.keySet.map(_.value), Set(s"fiber/$fiberId")) and + expect(delta.removes.isEmpty) and + expect.same(delta.upserts.toList, expectedUpserts.toList) and + expect(delta.removes == expectedRemoves) and + expect(prevTrie.rootNode.digest != nextTrie.rootNode.digest) and + expect(appliedTrie.rootNode.digest == nextTrie.rootNode.digest) + } + } + + test("service combine: on-chain breadcrumb advances ordinal and commits the new roots") { + TestFixture.resource().use { fixture => + implicit val s: SecurityProvider[IO] = fixture.securityProvider + implicit val l0ctx: L0NodeContext[IO] = fixture.l0Context + for { + service <- ML0Service.make[IO]() + fiberId <- UUIDGen.randomUUID[IO] + + genesisBc = breadcrumbOf(service.genesis.onChain) + + create = Updates.CreateStateMachine(fiberId, simpleMachine, MapValue(Map.empty)) + proof <- fixture.registry.generateProofs(create, Set(Alice, Bob)) + + combined <- service.combine(service.genesis, List(Signed(create, proof))) + combinedBc = breadcrumbOf(combined.onChain) + + // A second (empty) batch chains off the first breadcrumb: state unchanged -> same + // mptRoot, but the catalog accrues the previous root -> new catalogRoot. + combinedAgain <- service.combine(combined, List.empty) + secondBc = breadcrumbOf(combinedAgain.onChain) + } yield expect(genesisBc.map(_.ordinal.value.value).contains(0L)) and + expect(combinedBc.map(_.ordinal.value.value).contains(1L)) and + expect(secondBc.map(_.ordinal.value.value).contains(2L)) and + expect((genesisBc, combinedBc).mapN(_.roots.mptRoot != _.roots.mptRoot).contains(true)) and + expect((genesisBc, combinedBc).mapN(_.roots.catalogRoot != _.roots.catalogRoot).contains(true)) and + expect((combinedBc, secondBc).mapN(_.roots.mptRoot == _.roots.mptRoot).contains(true)) and + expect((combinedBc, secondBc).mapN(_.roots.catalogRoot != _.roots.catalogRoot).contains(true)) + } + } + + test("routes: /committed/root and the committed-cell-backed custom routes respond") { + TestFixture.resource().use { fixture => + implicit val s: SecurityProvider[IO] = fixture.securityProvider + implicit val l0ctx: L0NodeContext[IO] = fixture.l0Context + for { + service <- ML0Service.make[IO]() + app = service.routes.orNotFound + + rootResp <- app.run(Request[IO](Method.GET, uri"/committed/root")) + rootJson <- rootResp.as[Json] + cursor = rootJson.hcursor + + checkpointResp <- app.run(Request[IO](Method.GET, uri"/v1/checkpoint")) + stateMachinesResp <- app.run(Request[IO](Method.GET, uri"/v1/state-machines")) + } yield expect(rootResp.status == Status.Ok) and + expect(cursor.get[Long]("ordinal") == Right(0L)) and + expect(cursor.get[Boolean]("hydrated") == Right(true)) and + expect(cursor.get[String]("mptRoot").isRight) and + expect(cursor.get[String]("calculatedStateHash").isRight) and + expect(checkpointResp.status == Status.Ok) and + expect(stateMachinesResp.status == Status.Ok) + } + } +} diff --git a/modules/l0/src/test/scala/xyz/kd5ujc/metagraph_l0/committed/CommittedHydrationClientSuite.scala b/modules/l0/src/test/scala/xyz/kd5ujc/metagraph_l0/committed/CommittedHydrationClientSuite.scala new file mode 100644 index 00000000..fc963faa --- /dev/null +++ b/modules/l0/src/test/scala/xyz/kd5ujc/metagraph_l0/committed/CommittedHydrationClientSuite.scala @@ -0,0 +1,118 @@ +package xyz.kd5ujc.metagraph_l0.committed + +import cats.effect.{IO, Ref} + +import scala.collection.immutable.SortedMap + +import io.constellationnetwork.metagraph_sdk.lifecycle.committed.CatalogContents + +import io.circe.Json +import io.circe.syntax.EncoderOps +import org.http4s._ +import org.http4s.circe.CirceEntityCodec.{circeEntityDecoder, circeEntityEncoder} +import org.http4s.client.Client +import org.http4s.dsl.io._ +import org.http4s.implicits._ +import org.typelevel.log4cats.SelfAwareStructuredLogger +import org.typelevel.log4cats.slf4j.Slf4jLogger +import weaver.SimpleIOSuite + +/** + * Drives [[CommittedHydrationClient]] against an in-process stub of the `/committed/...` routes: + * the no-op on a hydrated cell, the peer fallback chain, and the verify-gate rejection path + * (the stub's `/committed/hydrate` plays the role of `CommittedState.hydrate`'s root check). + */ +object CommittedHydrationClientSuite extends SimpleIOSuite { + + private val selfUri = uri"http://self:9200/data-application" + private val goodPeer = uri"http://good-peer:9200/data-application" + private val badPeer = uri"http://bad-peer:9200/data-application" + + private val contents = CatalogContents(65536, SortedMap.empty, SortedMap.empty, SortedMap.empty) + + private def withLogger[A](run: SelfAwareStructuredLogger[IO] => IO[A]): IO[A] = + Slf4jLogger.create[IO].flatMap(run) + + /** A stub network: `self` root/hydrate behavior is parameterized, peers serve (or fail) the catalog. */ + private def stubClient( + selfHydrated: Boolean, + acceptPayload: Boolean, + hydrateCalls: Ref[IO, Int] + ): Client[IO] = { + def pathOf(req: Request[IO]): String = req.uri.path.renderString + def hostOf(req: Request[IO]): Option[String] = req.uri.host.map(_.value) + + val routes = HttpRoutes.of[IO] { + case req if req.method == Method.GET && pathOf(req).endsWith("/committed/root") && hostOf(req).contains("self") => + Ok(Json.obj("ordinal" -> 5L.asJson, "hydrated" -> selfHydrated.asJson)) + + case req if req.method == Method.GET && pathOf(req).endsWith("/committed/catalog") => + hostOf(req) match { + case Some("good-peer") => Ok(contents.asJson) + case _ => InternalServerError(Json.obj("error" -> "boom".asJson)) + } + + case req if req.method == Method.POST && pathOf(req).endsWith("/committed/hydrate") => + hydrateCalls.update(_ + 1) >> + req.as[CatalogContents].flatMap { _ => + if (acceptPayload) Ok(Json.obj("ordinal" -> 5L.asJson, "catalogRoot" -> "abc".asJson)) + else BadRequest(Json.obj("error" -> "hydration contents recompose to a different root".asJson)) + } + } + Client.fromHttpApp(routes.orNotFound) + } + + test("no-op when the own cell is already hydrated") { + withLogger { implicit logger => + for { + calls <- Ref.of[IO, Int](0) + client = new CommittedHydrationClient[IO](stubClient(selfHydrated = true, acceptPayload = true, calls)) + outcome <- client.hydrateOnce(selfUri, List(goodPeer)) + attempted <- calls.get + } yield expect(outcome == CommittedHydrationClient.Outcome.AlreadyHydrated(5L)) and + expect(attempted == 0) + } + } + + test("seeded cell: falls past a failing peer and hydrates from the first serving peer") { + withLogger { implicit logger => + for { + calls <- Ref.of[IO, Int](0) + client = new CommittedHydrationClient[IO](stubClient(selfHydrated = false, acceptPayload = true, calls)) + outcome <- client.hydrateOnce(selfUri, List(badPeer, goodPeer)) + attempted <- calls.get + } yield expect(attempted == 1) and + expect(outcome match { + case CommittedHydrationClient.Outcome.Hydrated(peer, _) => peer == goodPeer + case _ => false + }) + } + } + + test("verify-gate: a payload the own node rejects is a per-peer failure, not a success") { + withLogger { implicit logger => + for { + calls <- Ref.of[IO, Int](0) + client = new CommittedHydrationClient[IO](stubClient(selfHydrated = false, acceptPayload = false, calls)) + outcome <- client.hydrateOnce(selfUri, List(goodPeer)) + } yield expect(outcome match { + case CommittedHydrationClient.Outcome.NotHydrated(failures) => + failures.map(_._1) == List(goodPeer) && failures.head._2.contains("rejected") + case _ => false + }) + } + } + + test("no serving peers: all failures reported in order") { + withLogger { implicit logger => + for { + calls <- Ref.of[IO, Int](0) + client = new CommittedHydrationClient[IO](stubClient(selfHydrated = false, acceptPayload = true, calls)) + outcome <- client.hydrateOnce(selfUri, List(badPeer, badPeer)) + } yield expect(outcome match { + case CommittedHydrationClient.Outcome.NotHydrated(failures) => failures.size == 2 + case _ => false + }) + } + } +} From 658f5e93c5a9ab42e4d26ef94d2ed726496b379a Mon Sep 17 00:00:00 2001 From: OttoBot Date: Thu, 11 Jun 2026 15:16:04 -0500 Subject: [PATCH 5/6] refactor(l0): remove committed-catalog peer hydration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The peer-hydration path (selfUrl/peers config, uriListReader, the CommittedHydrationClient + Main wiring) duplicated work the framework already does: tessellation syncs ML0 snapshots to L1 and exposes them via L1NodeContext.getLastCurrencySnapshot, and stateful validation belongs at ML0 (validateData has CalculatedState) with L1 staying stateless. The custom HTTP self/peer pull fought that split. Keeps the committed-root state commitment (CommittedApp.makeL0, CommittedView, /committed routes) — that is the verifiable-root/proof adapter and is unaffected. A seeded-but-unhydrated cell still attests the current root; serving HISTORICAL proofs (epoch catalog) will come back via a CatalogJournal (local restart recovery) when proofs are actually served. 341+4 tests pass. Co-Authored-By: Claude Opus 4.8 --- .../l0/src/main/resources/application.conf | 6 - .../scala/xyz/kd5ujc/metagraph_l0/Main.scala | 17 --- .../metagraph_l0/app/ML0AppConfig.scala | 21 +-- .../metagraph_l0/app/ML0AppConfigOps.scala | 14 -- .../committed/CommittedHydrationClient.scala | 136 ------------------ .../CommittedHydrationClientSuite.scala | 118 --------------- .../xyz/kd5ujc/schema/CalculatedState.scala | 1 + 7 files changed, 4 insertions(+), 309 deletions(-) delete mode 100644 modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/committed/CommittedHydrationClient.scala delete mode 100644 modules/l0/src/test/scala/xyz/kd5ujc/metagraph_l0/committed/CommittedHydrationClientSuite.scala diff --git a/modules/l0/src/main/resources/application.conf b/modules/l0/src/main/resources/application.conf index 91e9b31b..2b7b2b53 100755 --- a/modules/l0/src/main/resources/application.conf +++ b/modules/l0/src/main/resources/application.conf @@ -3,12 +3,6 @@ webhook { metagraph-id = ${?CL_L0_TOKEN_IDENTIFIER} } -hydration { - self-url = ${?COMMITTED_HYDRATION_SELF_URL} - peers = ${?COMMITTED_HYDRATION_PEERS} - interval = ${?COMMITTED_HYDRATION_INTERVAL} -} - genesis { path = ${?GENESIS_STATE_PATH} } diff --git a/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/Main.scala b/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/Main.scala index 597149c1..c626362f 100755 --- a/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/Main.scala +++ b/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/Main.scala @@ -2,7 +2,6 @@ package xyz.kd5ujc.metagraph_l0 import java.util.UUID -import cats.effect.std.Supervisor import cats.effect.{IO, Resource} import cats.syntax.all._ @@ -16,7 +15,6 @@ import io.constellationnetwork.security.SecurityProvider import xyz.kd5ujc.buildinfo.BuildInfo import xyz.kd5ujc.metagraph_l0.app.ML0AppConfig import xyz.kd5ujc.metagraph_l0.app.ML0AppConfigOps._ -import xyz.kd5ujc.metagraph_l0.committed.CommittedHydrationClient import xyz.kd5ujc.shared_data.app._ import org.http4s.ember.client.EmberClientBuilder @@ -35,7 +33,6 @@ object Main override def dataApplication: Option[Resource[IO, BaseDataApplicationL0Service[IO]]] = (for { config <- ApplicationConfigOps.readDefault[IO, ML0AppConfig].asResource implicit0(logger: SelfAwareStructuredLogger[IO]) <- Slf4jLogger.create[IO].asResource - implicit0(supervisor: Supervisor[IO]) <- Supervisor[IO] implicit0(sp: SecurityProvider[IO]) <- SecurityProvider.forAsync[IO] _ <- loadKeyPair[IO](config).asResource @@ -51,19 +48,5 @@ object Main genesisPath = config.genesis.path ) .asResource - - // Committed-catalog hydration: a node bootstrapped from snapshot download is breadcrumb- - // SEEDED but not hydrated; poll peers for the catalog contents until the own node accepts - // them (the /committed/hydrate endpoint is verify-gated against the attested root). - _ <- (config.hydration.selfUrl, config.hydration.peers.getOrElse(List.empty)) match { - case (Some(selfUrl), peers @ (_ :: _)) => - EmberClientBuilder.default[IO].build.evalMap { hydrationHttpClient => - val hydration = new CommittedHydrationClient[IO](hydrationHttpClient) - supervisor - .supervise(hydration.awaitHydrated(selfUrl, peers, config.hydration.effectiveInterval)) - .void - } - case _ => Resource.unit[IO] - } } yield l0Service).some } diff --git a/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/app/ML0AppConfig.scala b/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/app/ML0AppConfig.scala index dc0b1615..3586b8f6 100755 --- a/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/app/ML0AppConfig.scala +++ b/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/app/ML0AppConfig.scala @@ -1,16 +1,13 @@ package xyz.kd5ujc.metagraph_l0.app -import scala.concurrent.duration.{DurationInt, FiniteDuration} - import xyz.kd5ujc.shared_data.app.SharedAppConfig import org.http4s.Uri case class ML0AppConfig( - node: SharedAppConfig.NodeConfig, - webhook: ML0AppConfig.WebhookConfig, - hydration: ML0AppConfig.HydrationConfig, - genesis: ML0AppConfig.GenesisConfig + node: SharedAppConfig.NodeConfig, + webhook: ML0AppConfig.WebhookConfig, + genesis: ML0AppConfig.GenesisConfig ) extends SharedAppConfig object ML0AppConfig { @@ -20,18 +17,6 @@ object ML0AppConfig { metagraphId: Option[String] ) - /** - * Committed-catalog hydration (see `CommittedHydrationClient`): active only when both - * `selfUrl` (this node's data-application base URL) and at least one peer are configured. - */ - case class HydrationConfig( - selfUrl: Option[Uri], - peers: Option[List[Uri]], - interval: Option[FiniteDuration] - ) { - def effectiveInterval: FiniteDuration = interval.getOrElse(30.seconds) - } - case class GenesisConfig( path: Option[String] ) diff --git a/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/app/ML0AppConfigOps.scala b/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/app/ML0AppConfigOps.scala index c06826d0..13e4a091 100644 --- a/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/app/ML0AppConfigOps.scala +++ b/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/app/ML0AppConfigOps.scala @@ -1,27 +1,13 @@ package xyz.kd5ujc.metagraph_l0.app -import cats.syntax.all._ - import xyz.kd5ujc.shared_data.app.ApplicationConfigOps._ -import org.http4s.Uri import pureconfig.ConfigReader -import pureconfig.error.CannotConvert import pureconfig.generic.semiauto.deriveReader object ML0AppConfigOps { - // Env-var friendly: a comma-separated string of URIs (e.g. COMMITTED_HYDRATION_PEERS) - implicit val uriListReader: ConfigReader[List[Uri]] = - ConfigReader[String].emap { - _.split(',').toList - .map(_.trim) - .filter(_.nonEmpty) - .traverse(s => Uri.fromString(s).leftMap(ex => CannotConvert(s, "URI", ex.getMessage))) - } - implicit val webhookConfigReader: ConfigReader[ML0AppConfig.WebhookConfig] = deriveReader - implicit val hydrationConfigReader: ConfigReader[ML0AppConfig.HydrationConfig] = deriveReader implicit val genesisConfigReader: ConfigReader[ML0AppConfig.GenesisConfig] = deriveReader implicit val applicationConfigReader: ConfigReader[ML0AppConfig] = deriveReader } diff --git a/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/committed/CommittedHydrationClient.scala b/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/committed/CommittedHydrationClient.scala deleted file mode 100644 index 8be89f3d..00000000 --- a/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/committed/CommittedHydrationClient.scala +++ /dev/null @@ -1,136 +0,0 @@ -package xyz.kd5ujc.metagraph_l0.committed - -import cats.effect.Async -import cats.syntax.all._ - -import scala.concurrent.duration.FiniteDuration - -import io.constellationnetwork.metagraph_sdk.lifecycle.committed.CatalogContents - -import io.circe.Json -import org.http4s.circe.CirceEntityCodec.{circeEntityDecoder, circeEntityEncoder} -import org.http4s.client.Client -import org.http4s.{Method, Request, Status, Uri} -import org.typelevel.log4cats.SelfAwareStructuredLogger - -/** - * Catalog hydration for a breadcrumb-seeded node. - * - * A node that bootstraps from snapshot download seeds its committed cell from the on-chain - * breadcrumb (`CatalogView.SeededCatalog`): it can verify `hashCalculatedState` and serve - * state-dict proofs, but catalog proofs and consensus transitions need the full epoch-rollup - * contents. This client closes that gap over the public routes: - * - * 1. `GET /committed/root` -- act only when the cell reports `hydrated: false`; - * 2. `GET /committed/catalog` -- fetch the [[CatalogContents]] payload from a peer; - * 3. `POST /committed/hydrate` -- install it on the own node. - * - * Trustless by construction: step 3 is verify-gated SERVER-side -- `CommittedState.hydrate` - * rejects contents that do not recompose to the consensus-attested catalog root - * (`HydrationRootMismatch`), so a malicious or stale peer can cause a retry, never a bad state. - * Peers are tried in order until one payload is accepted. - * - * Base URIs must point at the node's data-application route root (the segment under which the - * `/committed/...` routes are mounted). - */ -final class CommittedHydrationClient[F[_]: Async]( - client: Client[F] -)(implicit logger: SelfAwareStructuredLogger[F]) { - - import CommittedHydrationClient._ - - /** One atomic read of a node's committed root descriptor. */ - def rootStatus(node: Uri): F[RootStatus] = - client.expect[Json](node / "committed" / "root").flatMap { json => - val cursor = json.hcursor - (cursor.get[Long]("ordinal"), cursor.get[Boolean]("hydrated")) - .mapN(RootStatus(_, _)) - .liftTo[F] - } - - /** - * A single hydration pass: no-op on an already-hydrated cell, otherwise try each peer in order - * until the own node accepts a payload. - */ - def hydrateOnce(self: Uri, peers: List[Uri]): F[Outcome] = - rootStatus(self).flatMap { status => - if (status.hydrated) (Outcome.AlreadyHydrated(status.ordinal): Outcome).pure[F] - else tryPeers(self, peers, Nil) - } - - /** - * Poll [[hydrateOnce]] every `interval` until the cell is hydrated (a transition or journal - * recovery may also hydrate it independently, which terminates the loop the same way). Errors - * of a whole pass (e.g. own node not yet serving routes) are logged and retried. - */ - def awaitHydrated(self: Uri, peers: List[Uri], interval: FiniteDuration): F[Outcome] = - hydrateOnce(self, peers) - .handleErrorWith { err => - logger - .warn(s"committed hydration pass failed: ${err.getMessage}") - .as(Outcome.NotHydrated(List(self -> err.getMessage)): Outcome) - } - .flatMap { - case done @ (Outcome.AlreadyHydrated(_) | Outcome.Hydrated(_, _)) => - (done: Outcome).pure[F] - case Outcome.NotHydrated(failures) => - logger.info( - s"committed catalog not hydrated yet (${failures.size} attempt(s) failed); retrying in $interval" - ) >> Async[F].sleep(interval) >> awaitHydrated(self, peers, interval) - } - - private def tryPeers(self: Uri, peers: List[Uri], failures: List[(Uri, String)]): F[Outcome] = - peers match { - case Nil => (Outcome.NotHydrated(failures.reverse): Outcome).pure[F] - case peer :: rest => - hydrateFrom(self, peer).flatMap { - case Right(outcome) => (outcome: Outcome).pure[F] - case Left(reason) => - logger.info(s"committed hydration from $peer failed: $reason") >> - tryPeers(self, rest, (peer -> reason) :: failures) - } - } - - private def hydrateFrom(self: Uri, peer: Uri): F[Either[String, Outcome]] = - client - .expect[CatalogContents](peer / "committed" / "catalog") - .attempt - .flatMap { - case Left(err) => s"failed to fetch /committed/catalog: ${err.getMessage}".asLeft[Outcome].pure[F] - case Right(contents) => - val request = Request[F](Method.POST, self / "committed" / "hydrate").withEntity(contents) - client.run(request).use { response => - response.status match { - case Status.Ok => - response.as[Json].map(json => (Outcome.Hydrated(peer, json): Outcome).asRight[String]) - case status => - response - .as[Json] - .map(_.hcursor.get[String]("error").getOrElse("")) - .handleError(_ => "") - .map(detail => s"own node rejected hydration payload ($status): $detail".asLeft[Outcome]) - } - } - } - .handleError(err => s"hydration attempt errored: ${err.getMessage}".asLeft[Outcome]) -} - -object CommittedHydrationClient { - - /** The fields of `GET /committed/root` the client acts on. */ - final case class RootStatus(ordinal: Long, hydrated: Boolean) - - sealed trait Outcome extends Product with Serializable - - object Outcome { - - /** The cell already holds live catalog contents (genesis node, journal recovery, or a prior pass). */ - final case class AlreadyHydrated(ordinal: Long) extends Outcome - - /** A peer's payload recomposed to the attested catalog root and was installed. */ - final case class Hydrated(peer: Uri, response: Json) extends Outcome - - /** Every peer failed this pass; `failures` records one reason per attempted peer. */ - final case class NotHydrated(failures: List[(Uri, String)]) extends Outcome - } -} diff --git a/modules/l0/src/test/scala/xyz/kd5ujc/metagraph_l0/committed/CommittedHydrationClientSuite.scala b/modules/l0/src/test/scala/xyz/kd5ujc/metagraph_l0/committed/CommittedHydrationClientSuite.scala deleted file mode 100644 index fc963faa..00000000 --- a/modules/l0/src/test/scala/xyz/kd5ujc/metagraph_l0/committed/CommittedHydrationClientSuite.scala +++ /dev/null @@ -1,118 +0,0 @@ -package xyz.kd5ujc.metagraph_l0.committed - -import cats.effect.{IO, Ref} - -import scala.collection.immutable.SortedMap - -import io.constellationnetwork.metagraph_sdk.lifecycle.committed.CatalogContents - -import io.circe.Json -import io.circe.syntax.EncoderOps -import org.http4s._ -import org.http4s.circe.CirceEntityCodec.{circeEntityDecoder, circeEntityEncoder} -import org.http4s.client.Client -import org.http4s.dsl.io._ -import org.http4s.implicits._ -import org.typelevel.log4cats.SelfAwareStructuredLogger -import org.typelevel.log4cats.slf4j.Slf4jLogger -import weaver.SimpleIOSuite - -/** - * Drives [[CommittedHydrationClient]] against an in-process stub of the `/committed/...` routes: - * the no-op on a hydrated cell, the peer fallback chain, and the verify-gate rejection path - * (the stub's `/committed/hydrate` plays the role of `CommittedState.hydrate`'s root check). - */ -object CommittedHydrationClientSuite extends SimpleIOSuite { - - private val selfUri = uri"http://self:9200/data-application" - private val goodPeer = uri"http://good-peer:9200/data-application" - private val badPeer = uri"http://bad-peer:9200/data-application" - - private val contents = CatalogContents(65536, SortedMap.empty, SortedMap.empty, SortedMap.empty) - - private def withLogger[A](run: SelfAwareStructuredLogger[IO] => IO[A]): IO[A] = - Slf4jLogger.create[IO].flatMap(run) - - /** A stub network: `self` root/hydrate behavior is parameterized, peers serve (or fail) the catalog. */ - private def stubClient( - selfHydrated: Boolean, - acceptPayload: Boolean, - hydrateCalls: Ref[IO, Int] - ): Client[IO] = { - def pathOf(req: Request[IO]): String = req.uri.path.renderString - def hostOf(req: Request[IO]): Option[String] = req.uri.host.map(_.value) - - val routes = HttpRoutes.of[IO] { - case req if req.method == Method.GET && pathOf(req).endsWith("/committed/root") && hostOf(req).contains("self") => - Ok(Json.obj("ordinal" -> 5L.asJson, "hydrated" -> selfHydrated.asJson)) - - case req if req.method == Method.GET && pathOf(req).endsWith("/committed/catalog") => - hostOf(req) match { - case Some("good-peer") => Ok(contents.asJson) - case _ => InternalServerError(Json.obj("error" -> "boom".asJson)) - } - - case req if req.method == Method.POST && pathOf(req).endsWith("/committed/hydrate") => - hydrateCalls.update(_ + 1) >> - req.as[CatalogContents].flatMap { _ => - if (acceptPayload) Ok(Json.obj("ordinal" -> 5L.asJson, "catalogRoot" -> "abc".asJson)) - else BadRequest(Json.obj("error" -> "hydration contents recompose to a different root".asJson)) - } - } - Client.fromHttpApp(routes.orNotFound) - } - - test("no-op when the own cell is already hydrated") { - withLogger { implicit logger => - for { - calls <- Ref.of[IO, Int](0) - client = new CommittedHydrationClient[IO](stubClient(selfHydrated = true, acceptPayload = true, calls)) - outcome <- client.hydrateOnce(selfUri, List(goodPeer)) - attempted <- calls.get - } yield expect(outcome == CommittedHydrationClient.Outcome.AlreadyHydrated(5L)) and - expect(attempted == 0) - } - } - - test("seeded cell: falls past a failing peer and hydrates from the first serving peer") { - withLogger { implicit logger => - for { - calls <- Ref.of[IO, Int](0) - client = new CommittedHydrationClient[IO](stubClient(selfHydrated = false, acceptPayload = true, calls)) - outcome <- client.hydrateOnce(selfUri, List(badPeer, goodPeer)) - attempted <- calls.get - } yield expect(attempted == 1) and - expect(outcome match { - case CommittedHydrationClient.Outcome.Hydrated(peer, _) => peer == goodPeer - case _ => false - }) - } - } - - test("verify-gate: a payload the own node rejects is a per-peer failure, not a success") { - withLogger { implicit logger => - for { - calls <- Ref.of[IO, Int](0) - client = new CommittedHydrationClient[IO](stubClient(selfHydrated = false, acceptPayload = false, calls)) - outcome <- client.hydrateOnce(selfUri, List(goodPeer)) - } yield expect(outcome match { - case CommittedHydrationClient.Outcome.NotHydrated(failures) => - failures.map(_._1) == List(goodPeer) && failures.head._2.contains("rejected") - case _ => false - }) - } - } - - test("no serving peers: all failures reported in order") { - withLogger { implicit logger => - for { - calls <- Ref.of[IO, Int](0) - client = new CommittedHydrationClient[IO](stubClient(selfHydrated = false, acceptPayload = true, calls)) - outcome <- client.hydrateOnce(selfUri, List(badPeer, badPeer)) - } yield expect(outcome match { - case CommittedHydrationClient.Outcome.NotHydrated(failures) => failures.size == 2 - case _ => false - }) - } - } -} diff --git a/modules/models/src/main/scala/xyz/kd5ujc/schema/CalculatedState.scala b/modules/models/src/main/scala/xyz/kd5ujc/schema/CalculatedState.scala index bdde5706..3873df21 100755 --- a/modules/models/src/main/scala/xyz/kd5ujc/schema/CalculatedState.scala +++ b/modules/models/src/main/scala/xyz/kd5ujc/schema/CalculatedState.scala @@ -25,6 +25,7 @@ case class CalculatedState( ) extends DataCalculatedState object CalculatedState { + val genesis: CalculatedState = CalculatedState(SortedMap.empty, SortedMap.empty, SortedMap.empty, SortedMap.empty) From d82fa8a5122f3a8e1963cf3c721d0289788b4d63 Mon Sep 17 00:00:00 2001 From: OttoBot Date: Thu, 11 Jun 2026 16:03:34 -0500 Subject: [PATCH 6/6] fix(l0): thread persistent CatalogJournal into committed cell The committed adapter's `combine` -> `advanceWork` -> `resolveCatalog` returns None on a seeded/restarted cell whose work-cache is empty and journal is None, raising `BreadcrumbUnresolvable` and stalling the metagraph at ordinal 1. Acquire a LevelDB-backed `CatalogJournal` Resource in ML0 Main (under `committed-catalog`) and thread it through `ML0Service.make` into `CommittedApp.makeL0`, so a seeded committed cell hydrates from its own write-through catalog and can resolve the parent breadcrumb. Test callers keep the `journal = None` default. Co-Authored-By: Claude Opus 4.8 --- .../scala/xyz/kd5ujc/metagraph_l0/ML0Service.scala | 11 ++++++++--- .../src/main/scala/xyz/kd5ujc/metagraph_l0/Main.scala | 10 +++++++++- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/ML0Service.scala b/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/ML0Service.scala index 19a877db..fd0412a9 100755 --- a/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/ML0Service.scala +++ b/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/ML0Service.scala @@ -13,7 +13,7 @@ import io.constellationnetwork.currency.dataApplication.dataApplication.{ DataApplicationValidationErrorOr } import io.constellationnetwork.currency.schema.currency.CurrencyIncrementalSnapshot -import io.constellationnetwork.metagraph_sdk.lifecycle.committed.CommittedApp +import io.constellationnetwork.metagraph_sdk.lifecycle.committed.{CatalogJournal, CommittedApp} import io.constellationnetwork.metagraph_sdk.lifecycle.{CheckpointService, CombinerService, ValidationService} import io.constellationnetwork.metagraph_sdk.std.Checkpoint import io.constellationnetwork.metagraph_sdk.syntax.all.CurrencyIncrementalSnapshotOps @@ -50,7 +50,11 @@ object ML0Service { def make[F[+_]: Async: Files: Parallel: SecurityProvider]( httpClient: Option[Client[F]] = None, metagraphId: String = "DAG3KNyfeKUTuWpMMhormWgWSYMD1pDGB2uaWqxG", - genesisPath: Option[String] = None + genesisPath: Option[String] = None, + // Local catalog journal (acquired as a Resource in Main and threaded in): lets a seeded committed + // cell re-hydrate from its own persisted catalog so `combine`/`advanceWork` can resolve the parent + // breadcrumb. Without it a seeded cell stays unhydrated and the metagraph cannot advance. + journal: Option[CatalogJournal[F]] = None ): F[BaseDataApplicationL0Service[F]] = for { implicit0(logger: SelfAwareStructuredLogger[F]) <- Slf4jLogger.create[F] genesisState <- GenesisLoader.load[F](genesisPath) @@ -73,7 +77,8 @@ object ML0Service { genesisState, orderedCombiner(combiner), rejectionNotifyingValidator(validator, checkpointService, webhookDispatcher), - extraRoutes = Some(reader => new ML0CustomRoutes[F](reader, subscriberRegistry).public) + extraRoutes = Some(reader => new ML0CustomRoutes[F](reader, subscriberRegistry).public), + journal = journal ) } yield withConsensusHooks(committedService, checkpointService, webhookDispatcher) diff --git a/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/Main.scala b/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/Main.scala index c626362f..16209e2e 100755 --- a/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/Main.scala +++ b/modules/l0/src/main/scala/xyz/kd5ujc/metagraph_l0/Main.scala @@ -1,5 +1,6 @@ package xyz.kd5ujc.metagraph_l0 +import java.nio.file.Paths import java.util.UUID import cats.effect.{IO, Resource} @@ -8,6 +9,7 @@ import cats.syntax.all._ import io.constellationnetwork.currency.dataApplication._ import io.constellationnetwork.currency.l0.CurrencyL0App import io.constellationnetwork.ext.cats.effect.ResourceIO +import io.constellationnetwork.metagraph_sdk.lifecycle.committed.CatalogJournal import io.constellationnetwork.schema.cluster.ClusterId import io.constellationnetwork.schema.semver.{MetagraphVersion, TessellationVersion} import io.constellationnetwork.security.SecurityProvider @@ -41,11 +43,17 @@ object Main EmberClientBuilder.default[IO].build.map(Some(_)) } + // Node-local, write-through journal of the committed catalog (level-1 sealed roots + hot epoch), + // persisted under the working dir. A seeded/restarted committed cell hydrates from this so + // `combine`/`advanceWork` can resolve the parent breadcrumb instead of stalling unhydrated. + journal <- CatalogJournal.levelDb[IO](Paths.get("committed-catalog")) + l0Service <- ML0Service .make[IO]( httpClient = httpClient, metagraphId = config.webhook.metagraphId.getOrElse("DAG3KNyfeKUTuWpMMhormWgWSYMD1pDGB2uaWqxG"), - genesisPath = config.genesis.path + genesisPath = config.genesis.path, + journal = Some(journal) ) .asResource } yield l0Service).some