From 2054584ca20f55430a2c5abf8e131b930f615189 Mon Sep 17 00:00:00 2001 From: brunota20 Date: Mon, 1 Jun 2026 14:19:42 -0300 Subject: [PATCH 1/3] chore(deps): pull cowprotocol, alloy, redb, reqwest, tracing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the dependencies the 0.2 host backends need: - cowprotocol (1.0.0-alpha) for the cow-api submission path (OrderBookApi, OrderCreation, OrderUid, Chain). - alloy-provider / -rpc-client / -transport-ws / -primitives (1.5) for the chain JSON-RPC dispatch. The reqwest feature on alloy-provider engages connect_http; the pubsub/ws features back eth_subscribe-class methods. - redb (2) for local-store. Same crate cowprotocol's own watch-tower picked, so the dep tree does not bifurcate when both are used in the same workspace. - reqwest (0.12, rustls-tls) — direct, so the import survives any future cowprotocol feature rearrangement. - tracing + tracing-subscriber (env-filter + fmt) — replaces the 0.1 eprintln! debug log so the engine can drop into a structured log pipeline without re-instrumenting every host call. - thiserror (2) — typed error enums in each backend. - tempfile + wiremock as dev-deps for the host backend tests. Adds engine.example.toml documenting the [engine] state_dir + per- chain RPC URLs the chain backend reads at boot; data/ is now ignored so a local run does not leave the redb file in tree. --- .gitignore | 1 + crates/nexum-engine/Cargo.toml | 46 +++++++++++++++++++++++++++++++++- engine.example.toml | 34 +++++++++++++++++++++++++ 3 files changed, 80 insertions(+), 1 deletion(-) create mode 100644 engine.example.toml diff --git a/.gitignore b/.gitignore index 357bddc..e43a15c 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,4 @@ Thumbs.db # Environment .env .env.* +data/ diff --git a/crates/nexum-engine/Cargo.toml b/crates/nexum-engine/Cargo.toml index 65768c3..637f51f 100644 --- a/crates/nexum-engine/Cargo.toml +++ b/crates/nexum-engine/Cargo.toml @@ -6,10 +6,54 @@ license.workspace = true repository.workspace = true [dependencies] +# WASM Component Model runtime. wasmtime = { version = "45", features = ["component-model"] } wasmtime-wasi = "45" + +# Async + error plumbing. anyhow = "1" +thiserror = "2" tokio = { version = "1", features = ["full"] } -getrandom = "0.4" + +# Manifest parsing. serde = { version = "1", features = ["derive"] } toml = "1" +serde_json = "1" + +# Observability. `tracing` replaces the prior `eprintln!` debug log +# so the engine can drop into a structured log pipeline in production. +tracing = "0.1" +tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt", "env-filter", "ansi"] } + +# `cow-api` backend. cowprotocol pulls `OrderBookApi`, `OrderCreation`, +# `OrderUid`, the orderbook base URL table per `Chain`, and the typed +# error surface the host re-projects into `HostError`. Pinned to the +# crates.io release Shepherd is shipping against. +cowprotocol = "1.0.0-alpha" +# REST passthrough for `cow_api::request`. cowprotocol pulls reqwest +# transitively for its own client; we depend on it directly so the +# import is explicit and survives any future cowprotocol feature +# rearrangement. +reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } + +# `chain` backend. Each configured chain owns a `DynProvider` built +# from a `WsConnect`/`Http` transport so the host's `request` / +# `request-batch` impls can hand a raw `(method, params)` pair to +# alloy's JSON-RPC layer without reimplementing the codec. +alloy-provider = { version = "1.5", default-features = false, features = ["ws", "ipc", "pubsub", "reqwest"] } +alloy-rpc-client = { version = "1.5", default-features = false } +alloy-transport = { version = "1.5", default-features = false } +alloy-transport-ws = { version = "1.5", default-features = false } +alloy-primitives = { version = "1.5", default-features = false, features = ["std", "serde"] } + +# `local-store` backend. Per-module namespacing is enforced +# host-side via a `[len:u8][module_name][raw_key]` prefix. +redb = "2" + +# Misc. +getrandom = "0.4" +url = "2" + +[dev-dependencies] +tempfile = "3" +wiremock = "0.6" diff --git a/engine.example.toml b/engine.example.toml new file mode 100644 index 0000000..d6513b0 --- /dev/null +++ b/engine.example.toml @@ -0,0 +1,34 @@ +# Engine-side runtime configuration for `nexum-engine`. +# +# Distinct from `nexum.toml` (per-module manifest): this file +# describes the *engine*'s I/O wiring. Copy to `engine.toml` next to +# the binary, or pass the path as the third positional argument. + +[engine] +# Directory the local-store redb file (and future engine artefacts) +# will be created under. Created automatically at boot. +state_dir = "./data" + +# `tracing_subscriber::EnvFilter`-compatible directive. `RUST_LOG` +# overrides at process start. +log_level = "info" + +# One [chains.] table per chain the engine should be able to talk +# to. Chain ids are EVM decimal. `ws://` and `wss://` URLs engage +# alloy's pubsub transport (needed for `eth_subscribe`); `http://` and +# `https://` use the HTTP transport. + +[chains.1] +rpc_url = "https://ethereum-rpc.publicnode.com" + +[chains.100] +rpc_url = "https://rpc.gnosischain.com" + +[chains.11155111] +rpc_url = "wss://ethereum-sepolia-rpc.publicnode.com" + +[chains.42161] +rpc_url = "https://arb1.arbitrum.io/rpc" + +[chains.8453] +rpc_url = "https://mainnet.base.org" From f85d3d3ad7c915c0ca7369873b105293d3039fa1 Mon Sep 17 00:00:00 2001 From: brunota20 Date: Mon, 1 Jun 2026 14:20:01 -0300 Subject: [PATCH 2/3] runtime: implement cow-api, chain, local-store host backends MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces the 0.2 Unsupported stubs with working backends. Each capability lives in its own host submodule so the trait impls in main.rs stay thin (dispatch + project the backend's typed error onto HostError). cow_api::submit_order - Parses the guest's bytes as JSON cowprotocol::OrderCreation. - Dispatches via cowprotocol::OrderBookApi::post_order. - Returns the assigned OrderUid as a 0x-prefixed hex string. cow_api::request - REST passthrough. The base URL is whichever URL the pool's OrderBookApi client carries — so OrderBookApi::new_with_base_url overrides (staging, wiremock) flow through transparently. - Method/path validated host-side; orderbook 4xx/5xx bodies are surfaced verbatim so the guest can decode {errorType,description}. chain::request - Raw JSON-RPC dispatch over an alloy DynProvider opened from engine.toml at boot. WebSocket URLs engage pubsub (eth_subscribe); HTTP URLs use the HTTP transport. Params are passed as serde_json::RawValue so alloy does not re-encode. - request-batch falls back to per-call dispatch (same shape as the earlier stub but now backed by real RPC). local_store - redb file under engine_config.engine.state_dir. - Single shared table. Per-module namespacing is enforced host-side via [len:u8][module_name][raw_key] prefix on every key. list_keys strips the prefix before returning to the guest. logging - Routes through tracing::event! tagged with module=. - Engine boot installs an EnvFilter-based subscriber; RUST_LOG overrides the engine.toml log_level. identity / remote-store / messaging / http stay at Unsupported per the 0.2 roadmap (keystore / Swarm / Waku land in 0.3). Tests (14, all green): - cow_orderbook: pool default chains, unknown-chain typing, REST GET passthrough, relative-path resolution, unknown-method rejection, submit_order round-trip — last three under wiremock so the full HTTP path is exercised without hitting api.cow.fi. - provider_pool: empty pool surfaces UnknownChain. - local_store: roundtrip, namespace isolation, delete, list_keys prefix-stripping, empty-namespace rejection. End-to-end against modules/example: example.wasm loads under the new wiring, logs init + on_event through the tracing pipeline. --- crates/nexum-engine/src/engine_config.rs | 98 +++++ crates/nexum-engine/src/host/cow_orderbook.rs | 294 +++++++++++++ .../nexum-engine/src/host/local_store_redb.rs | 211 +++++++++ crates/nexum-engine/src/host/mod.rs | 12 + crates/nexum-engine/src/host/provider_pool.rs | 152 +++++++ crates/nexum-engine/src/main.rs | 403 ++++++++++++------ 6 files changed, 1029 insertions(+), 141 deletions(-) create mode 100644 crates/nexum-engine/src/engine_config.rs create mode 100644 crates/nexum-engine/src/host/cow_orderbook.rs create mode 100644 crates/nexum-engine/src/host/local_store_redb.rs create mode 100644 crates/nexum-engine/src/host/mod.rs create mode 100644 crates/nexum-engine/src/host/provider_pool.rs diff --git a/crates/nexum-engine/src/engine_config.rs b/crates/nexum-engine/src/engine_config.rs new file mode 100644 index 0000000..4ec271c --- /dev/null +++ b/crates/nexum-engine/src/engine_config.rs @@ -0,0 +1,98 @@ +//! Engine-side runtime configuration. +//! +//! Distinct from `nexum.toml` (module manifest): this file describes +//! the *engine*'s I/O wiring — chain RPC endpoints and the on-disk +//! location of the `local-store` database. Both are required for the +//! 0.2 reference engine to do anything other than print stubs. +//! +//! Lookup order: +//! +//! 1. `--engine-config ` CLI flag (future), or third positional +//! argument today; +//! 2. `engine.toml` in the current working directory; +//! 3. defaults — no chains configured, `state_dir = ./data`. +//! +//! A missing config is OK for the example module (it only logs); for +//! the cow-api / chain backends it surfaces as `HostError { +//! kind: unsupported }` so guests learn early. + +use std::collections::BTreeMap; +use std::path::{Path, PathBuf}; + +use serde::Deserialize; +use tracing::{info, warn}; + +/// Engine-side configuration loaded from `engine.toml`. +#[derive(Debug, Default, Deserialize)] +pub struct EngineConfig { + #[serde(default)] + pub engine: EngineSection, + /// Per-chain RPC URLs keyed by EVM chain id (decimal in TOML). + /// Used by the `chain::request` host call and as the alloy provider + /// pool seed. + #[serde(default)] + pub chains: BTreeMap, +} + +#[derive(Debug, Deserialize)] +pub struct EngineSection { + #[serde(default = "default_state_dir")] + pub state_dir: PathBuf, + /// `tracing_subscriber::EnvFilter`-compatible directive. Defaults to + /// `info` when absent; `RUST_LOG` overrides at process start. + #[serde(default = "default_log_level")] + pub log_level: String, +} + +impl Default for EngineSection { + fn default() -> Self { + Self { + state_dir: default_state_dir(), + log_level: default_log_level(), + } + } +} + +#[derive(Debug, Deserialize)] +pub struct ChainConfig { + /// JSON-RPC endpoint. `ws://` and `wss://` engage alloy's pubsub + /// transport (required for `eth_subscribe`); `http://` and `https://` + /// engage the HTTP transport (request/response only). + pub rpc_url: String, +} + +fn default_state_dir() -> PathBuf { + PathBuf::from("./data") +} + +fn default_log_level() -> String { + "info".to_owned() +} + +/// Read an engine config from disk, returning defaults if the file is +/// missing. Parse errors propagate. +pub fn load_or_default(path: Option<&Path>) -> anyhow::Result { + let path = match path { + Some(p) => p.to_path_buf(), + None => PathBuf::from("engine.toml"), + }; + + if !path.exists() { + warn!( + path = %path.display(), + "engine.toml not found — running with defaults (no chain RPC endpoints; \ + chain::request and cow_api::submit_order will return Unsupported)" + ); + return Ok(EngineConfig::default()); + } + + let raw = std::fs::read_to_string(&path)?; + let cfg: EngineConfig = toml::from_str(&raw)?; + info!( + path = %path.display(), + chains = cfg.chains.len(), + state_dir = %cfg.engine.state_dir.display(), + "engine config loaded", + ); + Ok(cfg) +} diff --git a/crates/nexum-engine/src/host/cow_orderbook.rs b/crates/nexum-engine/src/host/cow_orderbook.rs new file mode 100644 index 0000000..cd11173 --- /dev/null +++ b/crates/nexum-engine/src/host/cow_orderbook.rs @@ -0,0 +1,294 @@ +//! `shepherd:cow/cow-api` backend. +//! +//! Two responsibilities: +//! +//! 1. `request` — generic REST passthrough. Module gives the HTTP +//! method, path (relative to the chain's orderbook base URL), and +//! optional JSON body. We dispatch via `reqwest`, return the +//! response body verbatim. +//! 2. `submit_order` — typed submission. Module gives a JSON-encoded +//! `cowprotocol::OrderCreation`; we parse, dispatch via +//! `cowprotocol::OrderBookApi::post_order`, return the assigned +//! `OrderUid` as a `0x`-prefixed hex string. +//! +//! Per-chain `OrderBookApi` instances are constructed once at engine +//! boot from the discriminated chain set in `cowprotocol::Chain`. +//! Chains the SDK does not know about return `Unsupported` at the +//! host call boundary. + +use std::collections::BTreeMap; + +use cowprotocol::{Chain, OrderBookApi, OrderCreation, OrderUid}; +use thiserror::Error; + +/// Process-wide pool of `OrderBookApi` clients keyed by EVM chain id. +#[derive(Debug, Clone)] +pub struct OrderBookPool { + clients: BTreeMap, + http: reqwest::Client, +} + +impl OrderBookPool { + /// Build a pool covering every `cowprotocol::Chain` variant. The + /// default `OrderBookApi::new(chain)` constructor uses the canonical + /// `api.cow.fi/{slug}/api/v1` base URL from the SDK; callers that + /// need barn or a custom staging URL override per chain. + pub fn with_default_chains() -> Self { + let http = reqwest::Client::new(); + let chains = [ + Chain::Mainnet, + Chain::Gnosis, + Chain::Sepolia, + Chain::ArbitrumOne, + Chain::Base, + ]; + let clients = chains + .iter() + .map(|c| (c.id(), OrderBookApi::new(*c))) + .collect(); + Self { clients, http } + } + + /// Look up the client for a chain. + pub fn get(&self, chain_id: u64) -> Result<&OrderBookApi, CowApiError> { + self.clients + .get(&chain_id) + .ok_or(CowApiError::UnknownChain(chain_id)) + } + + /// REST passthrough. The base URL is whichever URL the pool's + /// `OrderBookApi` client carries — overrides set via + /// `OrderBookApi::new_with_base_url` (staging, wiremock) flow + /// through here too, which keeps the passthrough and the typed + /// `submit_order_json` path aimed at the same orderbook. + pub async fn request( + &self, + chain_id: u64, + method: &str, + path: &str, + body: Option<&str>, + ) -> Result { + let api = self.get(chain_id)?; + let base = api.base_url().clone(); + // `path` may or may not lead with a slash; `Url::join` handles + // both, but we strip a single leading `/` so consumers can + // write either `/orders/...` or `orders/...` interchangeably. + let trimmed = path.strip_prefix('/').unwrap_or(path); + let url = base + .join(trimmed) + .map_err(|e| CowApiError::BadPath(format!("{path:?}: {e}")))?; + + let request = match method.to_ascii_uppercase().as_str() { + "GET" => self.http.get(url), + "POST" => self.http.post(url), + "PUT" => self.http.put(url), + "DELETE" => self.http.delete(url), + other => return Err(CowApiError::BadMethod(other.to_owned())), + }; + let request = if let Some(body) = body { + request + .header(reqwest::header::CONTENT_TYPE, "application/json") + .body(body.to_owned()) + } else { + request + }; + + let response = request.send().await.map_err(CowApiError::Network)?; + // Surface the orderbook's structured 4xx / 5xx bodies verbatim + // so the guest can decode `{"errorType": "...", "description": + // "..."}` — projecting them into HostError here loses the + // detail the guest needs to recover. + let text = response.text().await.map_err(CowApiError::Network)?; + Ok(text) + } + + /// Typed submission. `body` is the JSON encoding of + /// `cowprotocol::OrderCreation`. The chain's orderbook validates + /// `from`, the EIP-712 hash, and (if `Eip1271`) the contract + /// signature; we return whatever UID it assigns. + pub async fn submit_order_json( + &self, + chain_id: u64, + body: &[u8], + ) -> Result { + let creation: OrderCreation = serde_json::from_slice(body).map_err(CowApiError::Decode)?; + let api = self.get(chain_id)?; + let uid = api + .post_order(&creation) + .await + .map_err(|e| CowApiError::Orderbook(e.to_string()))?; + Ok(uid) + } +} + +#[derive(Debug, Error)] +pub enum CowApiError { + #[error("unknown chain {0} (no cowprotocol::Chain variant)")] + UnknownChain(u64), + #[error("bad HTTP method `{0}` (expected GET/POST/PUT/DELETE)")] + BadMethod(String), + #[error("invalid path: {0}")] + BadPath(String), + #[error("network: {0}")] + Network(#[from] reqwest::Error), + #[error("decode OrderCreation JSON: {0}")] + Decode(#[from] serde_json::Error), + #[error("orderbook rejected: {0}")] + Orderbook(String), +} + +#[cfg(test)] +mod tests { + use super::*; + use wiremock::matchers::{method, path}; + use wiremock::{Mock, MockServer, ResponseTemplate}; + + #[test] + fn pool_indexes_default_chains() { + let pool = OrderBookPool::with_default_chains(); + assert!(pool.get(1).is_ok(), "mainnet present"); + assert!(pool.get(100).is_ok(), "gnosis present"); + assert!(pool.get(11_155_111).is_ok(), "sepolia present"); + assert!(pool.get(42_161).is_ok(), "arbitrum present"); + assert!(pool.get(8_453).is_ok(), "base present"); + } + + #[test] + fn unknown_chain_surfaces_typed_error() { + let pool = OrderBookPool::with_default_chains(); + assert!(matches!( + pool.get(99_999), + Err(CowApiError::UnknownChain(99_999)) + )); + } + + /// Build a pool whose Mainnet entry points at `mock.uri()`. + /// `OrderBookApi::new_with_base_url` ships in cowprotocol; we + /// rely on it so wiremock-driven tests can exercise the full + /// request path without re-implementing the HTTP client. + fn pool_with_mainnet_at(mock: &MockServer) -> OrderBookPool { + let mut clients = std::collections::BTreeMap::new(); + clients.insert( + Chain::Mainnet.id(), + OrderBookApi::new_with_base_url(mock.uri().parse().expect("mock uri parses")), + ); + OrderBookPool { + clients, + http: reqwest::Client::new(), + } + } + + #[tokio::test] + async fn request_passes_get_path_through() { + let mock = MockServer::start().await; + Mock::given(method("GET")) + .and(path("/api/v1/version")) + .respond_with(ResponseTemplate::new(200).set_body_string(r#"{"version":"x.y.z"}"#)) + .expect(1) + .mount(&mock) + .await; + + let pool = pool_with_mainnet_at(&mock); + let body = pool + .request(Chain::Mainnet.id(), "GET", "/api/v1/version", None) + .await + .expect("request succeeds"); + assert_eq!(body, r#"{"version":"x.y.z"}"#); + } + + #[tokio::test] + async fn request_relative_path_works() { + // Module passes a path without a leading slash. The + // passthrough should still resolve against the orderbook + // base URL. + let mock = MockServer::start().await; + Mock::given(method("GET")) + .and(path("/api/v1/native_price/0xabc")) + .respond_with(ResponseTemplate::new(200).set_body_string("1.23")) + .expect(1) + .mount(&mock) + .await; + + let pool = pool_with_mainnet_at(&mock); + let body = pool + .request( + Chain::Mainnet.id(), + "GET", + "api/v1/native_price/0xabc", + None, + ) + .await + .expect("relative path resolves"); + assert_eq!(body, "1.23"); + } + + #[tokio::test] + async fn request_rejects_unknown_method() { + let pool = OrderBookPool::with_default_chains(); + let err = pool + .request(Chain::Mainnet.id(), "PATCH", "/x", None) + .await + .unwrap_err(); + assert!(matches!(err, CowApiError::BadMethod(_))); + } + + #[tokio::test] + async fn submit_order_propagates_orderbook_response() { + let mock = MockServer::start().await; + let body_json = sample_order_json(); + // cowprotocol POST /api/v1/orders returns the order UID + // (56-byte hex) as a JSON string body. + let returned_uid = format!("\"0x{}\"", "ab".repeat(56)); + Mock::given(method("POST")) + .and(path("/api/v1/orders")) + .respond_with(ResponseTemplate::new(201).set_body_string(returned_uid.clone())) + .expect(1) + .mount(&mock) + .await; + + let pool = pool_with_mainnet_at(&mock); + let uid = pool + .submit_order_json(Chain::Mainnet.id(), body_json.as_bytes()) + .await + .expect("submit succeeds"); + assert_eq!(uid.as_slice().len(), 56); + assert_eq!(uid.as_slice(), &[0xab; 56]); + } + + /// A minimal but accepted-by-cowprotocol OrderCreation JSON. We + /// generate it inside the test so the JSON shape stays in lockstep + /// with the published `cowprotocol` version. + fn sample_order_json() -> String { + use alloy_primitives::{Address, U256}; + use cowprotocol::OrderCreation; + use cowprotocol::app_data::{EMPTY_APP_DATA_HASH, EMPTY_APP_DATA_JSON}; + use cowprotocol::order::{BuyTokenDestination, OrderData, OrderKind, SellTokenSource}; + use cowprotocol::signature::Signature; + use cowprotocol::signing_scheme::SigningScheme; + + let order_data = OrderData { + sell_token: Address::from([0x01; 20]), + buy_token: Address::from([0x02; 20]), + receiver: None, + sell_amount: U256::from(100u64), + buy_amount: U256::from(99u64), + valid_to: u32::MAX, + app_data: EMPTY_APP_DATA_HASH, + fee_amount: U256::ZERO, + kind: OrderKind::Sell, + partially_fillable: false, + sell_token_balance: SellTokenSource::Erc20, + buy_token_balance: BuyTokenDestination::Erc20, + }; + let signature = Signature::from_bytes(SigningScheme::PreSign, &[]).expect("presign empty"); + let creation = OrderCreation::from_signed_order_data( + &order_data, + signature, + Address::from([0x03; 20]), + EMPTY_APP_DATA_JSON.to_owned(), + None, + ) + .expect("valid OrderCreation"); + serde_json::to_string(&creation).expect("serialise OrderCreation") + } +} diff --git a/crates/nexum-engine/src/host/local_store_redb.rs b/crates/nexum-engine/src/host/local_store_redb.rs new file mode 100644 index 0000000..4e535e0 --- /dev/null +++ b/crates/nexum-engine/src/host/local_store_redb.rs @@ -0,0 +1,211 @@ +//! `nexum:host/local-store` backend. +//! +//! Single redb file under `EngineConfig.engine.state_dir`. Per-module +//! namespacing is enforced host-side via a `[len:u8][module_name][raw_key]` +//! prefix on every redb key. Two modules using the same key string see +//! disjoint data. +//! +//! The runtime supplies the namespace; modules see plain key strings. +//! Module names longer than 255 bytes are rejected at construction +//! (matches the one-byte length prefix). + +// The redb error enum is large by construction (Txn / Storage / +// Commit each carry a redb backtrace ≈ 160 bytes). Allowing the +// cap-on-Result-size lint here is the lesser evil: boxing every +// variant pushes the error path to the heap just to humour the lint. +#![allow(clippy::result_large_err)] + +use std::path::Path; +use std::sync::Arc; + +use redb::{Database, ReadableTable, TableDefinition}; +use thiserror::Error; + +const TABLE: TableDefinition<'static, &[u8], &[u8]> = TableDefinition::new("nexum:local-store"); +const MAX_NAMESPACE_LEN: usize = u8::MAX as usize; + +/// Process-wide handle to the local-store redb database. Cheap to +/// clone; the per-module view is constructed by setting the +/// namespace prefix at call time. +#[derive(Debug, Clone)] +pub struct LocalStore { + db: Arc, +} + +impl LocalStore { + /// Open (or create) the redb file at `path`. Materialises the + /// shared table so subsequent read transactions never hit + /// `TableDoesNotExist`. + pub fn open(path: impl AsRef) -> Result { + let db = Database::create(path).map_err(StorageError::Open)?; + { + let txn = db.begin_write().map_err(StorageError::Txn)?; + txn.open_table(TABLE).map_err(StorageError::Table)?; + txn.commit().map_err(StorageError::Commit)?; + } + Ok(Self { db: Arc::new(db) }) + } + + /// Fetch a value for `(namespace, key)`. Returns `Ok(None)` when + /// no entry exists; module never observes the prefix. + pub fn get(&self, namespace: &str, key: &str) -> Result>, StorageError> { + let full = build_key(namespace, key)?; + let txn = self.db.begin_read().map_err(StorageError::Txn)?; + let table = txn.open_table(TABLE).map_err(StorageError::Table)?; + let value = table + .get(full.as_slice()) + .map_err(StorageError::Storage)? + .map(|v| v.value().to_vec()); + Ok(value) + } + + /// Insert or overwrite. + pub fn set(&self, namespace: &str, key: &str, value: &[u8]) -> Result<(), StorageError> { + let full = build_key(namespace, key)?; + let txn = self.db.begin_write().map_err(StorageError::Txn)?; + { + let mut table = txn.open_table(TABLE).map_err(StorageError::Table)?; + table + .insert(full.as_slice(), value) + .map_err(StorageError::Storage)?; + } + txn.commit().map_err(StorageError::Commit)?; + Ok(()) + } + + /// Delete. Idempotent — deleting a missing key is a no-op. + pub fn delete(&self, namespace: &str, key: &str) -> Result<(), StorageError> { + let full = build_key(namespace, key)?; + let txn = self.db.begin_write().map_err(StorageError::Txn)?; + { + let mut table = txn.open_table(TABLE).map_err(StorageError::Table)?; + table + .remove(full.as_slice()) + .map_err(StorageError::Storage)?; + } + txn.commit().map_err(StorageError::Commit)?; + Ok(()) + } + + /// Enumerate keys in `namespace` whose raw key (post-prefix) + /// starts with `prefix`. Returns only the module-visible key + /// strings — the host strips the namespace prefix. + pub fn list_keys(&self, namespace: &str, prefix: &str) -> Result, StorageError> { + let ns_prefix = namespace_prefix(namespace)?; + let full_prefix = build_key(namespace, prefix)?; + let txn = self.db.begin_read().map_err(StorageError::Txn)?; + let table = txn.open_table(TABLE).map_err(StorageError::Table)?; + let mut out = Vec::new(); + for entry in table.iter().map_err(StorageError::Storage)? { + let (k, _v) = entry.map_err(StorageError::Storage)?; + let key_bytes = k.value(); + if key_bytes.starts_with(&full_prefix) + && let Ok(s) = std::str::from_utf8(&key_bytes[ns_prefix.len()..]) + { + out.push(s.to_owned()); + } + } + Ok(out) + } +} + +fn namespace_prefix(namespace: &str) -> Result, StorageError> { + if namespace.is_empty() { + return Err(StorageError::InvalidNamespace( + "module namespace must not be empty".into(), + )); + } + let bytes = namespace.as_bytes(); + if bytes.len() > MAX_NAMESPACE_LEN { + return Err(StorageError::InvalidNamespace(format!( + "namespace `{namespace}` is {} bytes; max is {MAX_NAMESPACE_LEN}", + bytes.len() + ))); + } + let mut out = Vec::with_capacity(1 + bytes.len()); + out.push(bytes.len() as u8); + out.extend_from_slice(bytes); + Ok(out) +} + +fn build_key(namespace: &str, key: &str) -> Result, StorageError> { + let mut out = namespace_prefix(namespace)?; + out.extend_from_slice(key.as_bytes()); + Ok(out) +} + +/// Errors surfaced by [`LocalStore`]. +#[derive(Debug, Error)] +pub enum StorageError { + #[error("open redb: {0}")] + Open(#[source] redb::DatabaseError), + #[error("redb txn: {0}")] + Txn(#[source] redb::TransactionError), + #[error("redb table: {0}")] + Table(#[source] redb::TableError), + #[error("redb storage: {0}")] + Storage(#[source] redb::StorageError), + #[error("redb commit: {0}")] + Commit(#[source] redb::CommitError), + #[error("invalid namespace: {0}")] + InvalidNamespace(String), +} + +#[cfg(test)] +mod tests { + use super::*; + + fn fresh() -> (tempfile::TempDir, LocalStore) { + let dir = tempfile::tempdir().expect("tempdir"); + let store = LocalStore::open(dir.path().join("ls.redb")).expect("open"); + (dir, store) + } + + #[test] + fn set_get_roundtrip() { + let (_dir, store) = fresh(); + store.set("twap", "k", b"v").unwrap(); + assert_eq!(store.get("twap", "k").unwrap().as_deref(), Some(&b"v"[..])); + } + + #[test] + fn namespaces_isolate_modules() { + let (_dir, store) = fresh(); + store.set("a", "k", b"from-a").unwrap(); + store.set("b", "k", b"from-b").unwrap(); + assert_eq!( + store.get("a", "k").unwrap().as_deref(), + Some(&b"from-a"[..]) + ); + assert_eq!( + store.get("b", "k").unwrap().as_deref(), + Some(&b"from-b"[..]) + ); + } + + #[test] + fn delete_then_get_is_none() { + let (_dir, store) = fresh(); + store.set("twap", "k", b"v").unwrap(); + store.delete("twap", "k").unwrap(); + assert!(store.get("twap", "k").unwrap().is_none()); + } + + #[test] + fn list_keys_strips_namespace_prefix() { + let (_dir, store) = fresh(); + store.set("twap", "posted:1", b"x").unwrap(); + store.set("twap", "posted:2", b"y").unwrap(); + store.set("twap", "other", b"z").unwrap(); + let keys = store.list_keys("twap", "posted:").unwrap(); + assert_eq!(keys.len(), 2); + assert!(keys.iter().all(|k| k.starts_with("posted:"))); + } + + #[test] + fn rejects_empty_namespace() { + let (_dir, store) = fresh(); + let err = store.set("", "k", b"v").unwrap_err(); + assert!(matches!(err, StorageError::InvalidNamespace(_))); + } +} diff --git a/crates/nexum-engine/src/host/mod.rs b/crates/nexum-engine/src/host/mod.rs new file mode 100644 index 0000000..b76fe99 --- /dev/null +++ b/crates/nexum-engine/src/host/mod.rs @@ -0,0 +1,12 @@ +//! Host-side backends for the `nexum:host` / `shepherd:cow` +//! interfaces. +//! +//! Each submodule owns one capability. The trait impls in `main.rs` +//! stay thin: they validate inputs, dispatch to the backend, and +//! project the backend's typed error onto the bindgen-generated +//! `HostError`. Keeping the backends pure (no bindgen types) means +//! each can be unit-tested without spinning up a wasmtime store. + +pub mod cow_orderbook; +pub mod local_store_redb; +pub mod provider_pool; diff --git a/crates/nexum-engine/src/host/provider_pool.rs b/crates/nexum-engine/src/host/provider_pool.rs new file mode 100644 index 0000000..0b6d94b --- /dev/null +++ b/crates/nexum-engine/src/host/provider_pool.rs @@ -0,0 +1,152 @@ +//! `nexum:host/chain` backend. +//! +//! Per-chain alloy provider, opened from the engine config at boot. +//! `request` is a raw JSON-RPC dispatch: the host hands `(method, +//! params)` straight to alloy's transport and returns the result body +//! verbatim. No method allowlist, no re-encoding of params — the +//! contract is "give us a JSON-RPC pair, we'll return what the node +//! returns". +//! +//! Transports: +//! - `ws://` / `wss://` — `WsConnect`; required for `eth_subscribe`. +//! - `http://` / `https://` — alloy's HTTP transport; request/response only. + +use std::collections::BTreeMap; +use std::sync::Arc; + +use alloy_provider::{DynProvider, Provider, ProviderBuilder, WsConnect}; +use serde_json::value::RawValue; +use thiserror::Error; +use tracing::info; + +use crate::engine_config::EngineConfig; + +/// Pool of alloy providers keyed by chain id. +#[derive(Debug, Clone)] +pub struct ProviderPool { + providers: Arc>, +} + +impl ProviderPool { + /// Open one provider per chain in `cfg.chains`. WebSocket URLs + /// engage alloy's pubsub transport; HTTP URLs use the HTTP + /// transport. Connection failures propagate to the caller; the + /// engine treats them as fatal at boot. + pub async fn from_config(cfg: &EngineConfig) -> Result { + let mut providers: BTreeMap = BTreeMap::new(); + for (chain_id, chain_cfg) in &cfg.chains { + let url = chain_cfg.rpc_url.as_str(); + info!(chain_id, url, "opening chain RPC provider"); + let provider = if url.starts_with("ws://") || url.starts_with("wss://") { + ProviderBuilder::new() + .connect_ws(WsConnect::new(url)) + .await + .map_err(|e| ProviderError::Connect { + chain_id: *chain_id, + detail: e.to_string(), + })? + .erased() + } else { + let parsed: url::Url = + url.parse() + .map_err(|e: url::ParseError| ProviderError::Connect { + chain_id: *chain_id, + detail: e.to_string(), + })?; + ProviderBuilder::new().connect_http(parsed).erased() + }; + providers.insert(*chain_id, provider); + } + Ok(Self { + providers: Arc::new(providers), + }) + } + + /// Empty pool — used by tests and as a default when no + /// `engine.toml` is found. Every `request` call returns + /// `UnknownChain`. + #[cfg_attr(not(test), allow(dead_code))] + pub fn empty() -> Self { + Self { + providers: Arc::new(BTreeMap::new()), + } + } + + /// Raw JSON-RPC dispatch. `params_json` must be the JSON encoding + /// of the params array (e.g. `"[\"0x...\",\"latest\"]"`), as + /// produced by the SDK's `chain::request` glue. + pub async fn request( + &self, + chain_id: u64, + method: String, + params_json: String, + ) -> Result { + let provider = self + .providers + .get(&chain_id) + .ok_or(ProviderError::UnknownChain(chain_id))?; + // Pass the params through as a raw JSON value so alloy does + // not re-encode them on the way to the node. + let params: Box = RawValue::from_string(params_json.clone()).map_err(|e| { + ProviderError::InvalidParams { + method: method.clone(), + detail: e.to_string(), + } + })?; + let result: Box = provider + .raw_request(method.clone().into(), params) + .await + .map_err(|e| ProviderError::Rpc { + method, + detail: e.to_string(), + })?; + Ok(result.get().to_owned()) + } +} + +/// Errors surfaced by [`ProviderPool`]. +#[derive(Debug, Error)] +pub enum ProviderError { + /// Chain id absent from the engine config. + #[error("unknown chain {0} (no engine.toml entry)")] + UnknownChain(u64), + /// Could not open the underlying transport. + #[error("connect chain {chain_id}: {detail}")] + Connect { + /// Chain id we failed to dial. + chain_id: u64, + /// Transport-side error string. + detail: String, + }, + /// The guest-supplied JSON params did not parse. + #[error("invalid params JSON for `{method}`: {detail}")] + InvalidParams { + /// RPC method name. + method: String, + /// JSON-parser detail. + detail: String, + }, + /// The node returned an error for the dispatched call. + #[error("rpc `{method}` failed: {detail}")] + Rpc { + /// RPC method name. + method: String, + /// Transport-side error string. + detail: String, + }, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn empty_pool_rejects_lookups() { + let pool = ProviderPool::empty(); + let err = pool + .request(1, "eth_blockNumber".into(), "[]".into()) + .await + .unwrap_err(); + assert!(matches!(err, ProviderError::UnknownChain(1))); + } +} diff --git a/crates/nexum-engine/src/main.rs b/crates/nexum-engine/src/main.rs index f013f22..eca6629 100644 --- a/crates/nexum-engine/src/main.rs +++ b/crates/nexum-engine/src/main.rs @@ -1,7 +1,12 @@ +mod engine_config; +mod host; mod manifest; use std::path::PathBuf; use std::time::{Instant, SystemTime, UNIX_EPOCH}; + +use tracing::{info, warn}; +use tracing_subscriber::EnvFilter; use wasmtime::component::{Component, Linker, ResourceTable}; use wasmtime::error::Context as _; use wasmtime::{Engine, Store}; @@ -28,6 +33,15 @@ struct HostState { /// Per-module `[capabilities.http].allow` allowlist (from nexum.toml). /// Consulted by `http::fetch` before any outbound call. http_allowlist: Vec, + /// Namespace for the running module's `local-store` rows. Set from + /// `manifest.module.name` at instantiation. + module_namespace: String, + /// `cow-api` backend — per-chain `OrderBookApi` clients + reqwest. + cow: host::cow_orderbook::OrderBookPool, + /// `chain` backend — per-chain alloy `DynProvider` pool. + chain: host::provider_pool::ProviderPool, + /// `local-store` backend — redb file with host-side namespacing. + store: host::local_store_redb::LocalStore, } impl WasiView for HostState { @@ -49,58 +63,135 @@ fn unimplemented(domain: &str, detail: impl Into) -> HostError { } } -// -- Stub implementations for host interfaces -- +fn internal_error(domain: &str, detail: impl Into) -> HostError { + HostError { + domain: domain.into(), + kind: HostErrorKind::Internal, + code: 0, + message: detail.into(), + data: None, + } +} + +// -- nexum:host/types is empty (declarations only). -- impl nexum::host::types::Host for HostState {} +// -- shepherd:cow/cow-api: REST passthrough + typed submission. -- + impl shepherd::cow::cow_api::Host for HostState { async fn request( &mut self, - _chain_id: u64, + chain_id: u64, method: String, path: String, - _body: Option, + body: Option, ) -> Result { let start = Instant::now(); - eprintln!("[cow-api] {method} {path}"); - let result = Err(unimplemented( - "cow-api", - format!("not implemented: {method} {path}"), - )); - eprintln!("[timing] cow-api::request: {:?}", start.elapsed()); + tracing::debug!(chain_id, %method, %path, "cow-api::request"); + let result = match self + .cow + .request(chain_id, &method, &path, body.as_deref()) + .await + { + Ok(body) => Ok(body), + Err(host::cow_orderbook::CowApiError::UnknownChain(id)) => Err(unimplemented( + "cow-api", + format!("chain {id} not in cowprotocol"), + )), + Err(host::cow_orderbook::CowApiError::BadMethod(m)) => Err(HostError { + domain: "cow-api".into(), + kind: HostErrorKind::InvalidInput, + code: 0, + message: format!("unsupported HTTP method: {m}"), + data: None, + }), + Err(host::cow_orderbook::CowApiError::BadPath(msg)) => Err(HostError { + domain: "cow-api".into(), + kind: HostErrorKind::InvalidInput, + code: 0, + message: msg, + data: None, + }), + Err(err) => Err(internal_error("cow-api", err.to_string())), + }; + tracing::trace!(elapsed_ms = ?start.elapsed(), "cow-api::request done"); result } async fn submit_order( &mut self, - _chain_id: u64, - _order_data: Vec, + chain_id: u64, + order_data: Vec, ) -> Result { let start = Instant::now(); - eprintln!("[cow-api] submit-order"); - let result = Err(unimplemented("cow-api", "submit-order not implemented")); - eprintln!("[timing] cow-api::submit-order: {:?}", start.elapsed()); + tracing::debug!(chain_id, bytes = order_data.len(), "cow-api::submit-order"); + let result = match self.cow.submit_order_json(chain_id, &order_data).await { + Ok(uid) => Ok(format!("0x{}", hex_encode(uid.as_slice()))), + Err(host::cow_orderbook::CowApiError::UnknownChain(id)) => Err(unimplemented( + "cow-api", + format!("chain {id} not in cowprotocol"), + )), + Err(host::cow_orderbook::CowApiError::Decode(err)) => Err(HostError { + domain: "cow-api".into(), + kind: HostErrorKind::InvalidInput, + code: 0, + message: format!("invalid OrderCreation JSON: {err}"), + data: None, + }), + Err(host::cow_orderbook::CowApiError::Orderbook(msg)) => Err(HostError { + domain: "cow-api".into(), + kind: HostErrorKind::Denied, + code: 0, + message: msg, + data: None, + }), + Err(err) => Err(internal_error("cow-api", err.to_string())), + }; + tracing::trace!(elapsed_ms = ?start.elapsed(), "cow-api::submit-order done"); result } } +// -- nexum:host/chain: raw JSON-RPC dispatch over alloy. -- + impl nexum::host::chain::Host for HostState { async fn request( &mut self, - _chain_id: u64, + chain_id: u64, method: String, - _params: String, + params: String, ) -> Result { let start = Instant::now(); - eprintln!("[chain] request: {method}"); - let result = Err(HostError { - domain: "chain".into(), - kind: HostErrorKind::Unsupported, - code: -32601, - message: format!("method not implemented: {method}"), - data: None, - }); - eprintln!("[timing] chain::request: {:?}", start.elapsed()); + tracing::debug!(chain_id, %method, "chain::request"); + let result = match self.chain.request(chain_id, method.clone(), params).await { + Ok(body) => Ok(body), + Err(host::provider_pool::ProviderError::UnknownChain(id)) => Err(HostError { + domain: "chain".into(), + kind: HostErrorKind::Unsupported, + code: 0, + message: format!("chain {id} has no engine.toml RPC entry"), + data: None, + }), + Err(host::provider_pool::ProviderError::InvalidParams { detail, .. }) => { + Err(HostError { + domain: "chain".into(), + kind: HostErrorKind::InvalidInput, + code: -32602, + message: detail, + data: None, + }) + } + Err(host::provider_pool::ProviderError::Rpc { detail, .. }) => Err(HostError { + domain: "chain".into(), + kind: HostErrorKind::Internal, + code: -32603, + message: detail, + data: None, + }), + Err(err) => Err(internal_error("chain", err.to_string())), + }; + tracing::trace!(elapsed_ms = ?start.elapsed(), "chain::request done"); result } @@ -110,34 +201,30 @@ impl nexum::host::chain::Host for HostState { requests: Vec, ) -> Result, HostError> { let start = Instant::now(); - eprintln!("[chain] request-batch: {} calls", requests.len()); + tracing::debug!(chain_id, count = requests.len(), "chain::request-batch"); let mut out = Vec::with_capacity(requests.len()); for req in requests { - match self.request(chain_id, req.method, req.params).await { + match nexum::host::chain::Host::request(self, chain_id, req.method, req.params).await { Ok(s) => out.push(nexum::host::chain::RpcResult::Ok(s)), Err(e) => out.push(nexum::host::chain::RpcResult::Err(e)), } } - eprintln!("[timing] chain::request-batch: {:?}", start.elapsed()); + tracing::trace!(elapsed_ms = ?start.elapsed(), "chain::request-batch done"); Ok(out) } } +// -- nexum:host/identity: deferred to 0.3 (keystore/KMS backend). -- + impl nexum::host::identity::Host for HostState { async fn accounts(&mut self) -> Result>, HostError> { - let start = Instant::now(); - eprintln!("[identity] accounts"); - let result = Ok(vec![]); - eprintln!("[timing] identity::accounts: {:?}", start.elapsed()); - result + // No keystore wired yet — return an empty roster so guests can + // probe-then-skip without erroring. Real keystore lands in 0.3. + Ok(vec![]) } async fn sign(&mut self, _account: Vec, _message: Vec) -> Result, HostError> { - let start = Instant::now(); - eprintln!("[identity] sign"); - let result = Err(unimplemented("identity", "sign not implemented")); - eprintln!("[timing] identity::sign: {:?}", start.elapsed()); - result + Err(unimplemented("identity", "sign requires a keystore (0.3)")) } async fn sign_typed_data( @@ -145,61 +232,54 @@ impl nexum::host::identity::Host for HostState { _account: Vec, _typed_data: String, ) -> Result, HostError> { - let start = Instant::now(); - eprintln!("[identity] sign-typed-data"); - let result = Err(unimplemented("identity", "sign-typed-data not implemented")); - eprintln!("[timing] identity::sign-typed-data: {:?}", start.elapsed()); - result + Err(unimplemented( + "identity", + "sign-typed-data requires a keystore (0.3)", + )) } } +// -- nexum:host/local-store: redb backend with host-side namespacing. -- + impl nexum::host::local_store::Host for HostState { async fn get(&mut self, key: String) -> Result>, HostError> { - let start = Instant::now(); - eprintln!("[local-store] get: {key}"); - let result = Ok(None); - eprintln!("[timing] local-store::get: {:?}", start.elapsed()); - result + self.store + .get(&self.module_namespace, &key) + .map_err(|err| internal_error("local-store", err.to_string())) } - async fn set(&mut self, key: String, _value: Vec) -> Result<(), HostError> { - let start = Instant::now(); - eprintln!("[local-store] set: {key}"); - let result = Ok(()); - eprintln!("[timing] local-store::set: {:?}", start.elapsed()); - result + async fn set(&mut self, key: String, value: Vec) -> Result<(), HostError> { + self.store + .set(&self.module_namespace, &key, &value) + .map_err(|err| internal_error("local-store", err.to_string())) } async fn delete(&mut self, key: String) -> Result<(), HostError> { - let start = Instant::now(); - eprintln!("[local-store] delete: {key}"); - let result = Ok(()); - eprintln!("[timing] local-store::delete: {:?}", start.elapsed()); - result + self.store + .delete(&self.module_namespace, &key) + .map_err(|err| internal_error("local-store", err.to_string())) } async fn list_keys(&mut self, prefix: String) -> Result, HostError> { - let start = Instant::now(); - eprintln!("[local-store] list-keys: {prefix}"); - let result = Ok(vec![]); - eprintln!("[timing] local-store::list-keys: {:?}", start.elapsed()); - result + self.store + .list_keys(&self.module_namespace, &prefix) + .map_err(|err| internal_error("local-store", err.to_string())) } } impl nexum::host::remote_store::Host for HostState { async fn upload(&mut self, _data: Vec) -> Result, HostError> { - let start = Instant::now(); - let result = Err(unimplemented("remote-store", "upload not implemented")); - eprintln!("[timing] remote-store::upload: {:?}", start.elapsed()); - result + Err(unimplemented( + "remote-store", + "Swarm backend deferred to 0.3", + )) } async fn download(&mut self, _reference: Vec) -> Result, HostError> { - let start = Instant::now(); - let result = Err(unimplemented("remote-store", "download not implemented")); - eprintln!("[timing] remote-store::download: {:?}", start.elapsed()); - result + Err(unimplemented( + "remote-store", + "Swarm backend deferred to 0.3", + )) } async fn read_feed( @@ -207,56 +287,51 @@ impl nexum::host::remote_store::Host for HostState { _owner: Vec, _topic: Vec, ) -> Result>, HostError> { - let start = Instant::now(); - let result = Err(unimplemented("remote-store", "read-feed not implemented")); - eprintln!("[timing] remote-store::read-feed: {:?}", start.elapsed()); - result + Err(unimplemented( + "remote-store", + "Swarm backend deferred to 0.3", + )) } async fn write_feed(&mut self, _topic: Vec, _data: Vec) -> Result, HostError> { - let start = Instant::now(); - let result = Err(unimplemented("remote-store", "write-feed not implemented")); - eprintln!("[timing] remote-store::write-feed: {:?}", start.elapsed()); - result + Err(unimplemented( + "remote-store", + "Swarm backend deferred to 0.3", + )) } } impl nexum::host::messaging::Host for HostState { - async fn publish(&mut self, content_topic: String, _payload: Vec) -> Result<(), HostError> { - let start = Instant::now(); - eprintln!("[messaging] publish: {content_topic}"); - let result = Err(unimplemented("messaging", "publish not implemented")); - eprintln!("[timing] messaging::publish: {:?}", start.elapsed()); - result + async fn publish( + &mut self, + _content_topic: String, + _payload: Vec, + ) -> Result<(), HostError> { + Err(unimplemented("messaging", "Waku backend deferred to 0.3")) } async fn query( &mut self, - content_topic: String, + _content_topic: String, _start_time: Option, _end_time: Option, _limit: Option, ) -> Result, HostError> { - let start = Instant::now(); - eprintln!("[messaging] query: {content_topic}"); - let result = Ok(vec![]); - eprintln!("[timing] messaging::query: {:?}", start.elapsed()); - result + // Empty result — same posture as `identity::accounts`. + Ok(vec![]) } } impl nexum::host::logging::Host for HostState { async fn log(&mut self, level: nexum::host::logging::Level, message: String) { - let start = Instant::now(); - let level_str = match level { - nexum::host::logging::Level::Trace => "TRACE", - nexum::host::logging::Level::Debug => "DEBUG", - nexum::host::logging::Level::Info => "INFO", - nexum::host::logging::Level::Warn => "WARN", - nexum::host::logging::Level::Error => "ERROR", - }; - eprintln!("[{level_str}] {message}"); - eprintln!("[timing] logging::log: {:?}", start.elapsed()); + let module = self.module_namespace.as_str(); + match level { + nexum::host::logging::Level::Trace => tracing::trace!(module, "{}", message), + nexum::host::logging::Level::Debug => tracing::debug!(module, "{}", message), + nexum::host::logging::Level::Info => tracing::info!(module, "{}", message), + nexum::host::logging::Level::Warn => tracing::warn!(module, "{}", message), + nexum::host::logging::Level::Error => tracing::error!(module, "{}", message), + } } } @@ -292,16 +367,12 @@ impl nexum::host::http::Host for HostState { &mut self, req: nexum::host::http::Request, ) -> Result { - let start = Instant::now(); - eprintln!("[http] {} {}", req.method, req.url); - // Manifest allowlist enforcement runs before any I/O. Hosts that // never link a manifest leave `http_allowlist` empty, which denies // every request — matching the "no implicit network" stance. let host = match manifest::extract_host(&req.url) { Some(h) => h, None => { - eprintln!("[timing] http::fetch: {:?}", start.elapsed()); return Err(HostError { domain: "http".into(), kind: HostErrorKind::InvalidInput, @@ -312,8 +383,7 @@ impl nexum::host::http::Host for HostState { } }; if !manifest::host_allowed(host, &self.http_allowlist) { - eprintln!("[http] denied by allowlist: {host}"); - eprintln!("[timing] http::fetch: {:?}", start.elapsed()); + warn!(host, "[http] denied by allowlist"); return Err(HostError { domain: "http".into(), kind: HostErrorKind::Denied, @@ -325,31 +395,52 @@ impl nexum::host::http::Host for HostState { data: None, }); } - // 0.2: allowlist passed, but the reference runtime does not perform // real HTTP yet. Real fetch lands in 0.3. - let result = Err(unimplemented( + Err(unimplemented( "http", "fetch not implemented in 0.2 reference runtime (allowlist passed)", - )); - eprintln!("[timing] http::fetch: {:?}", start.elapsed()); - result + )) } } +/// Lowercase hex encoder. Kept in the engine binary rather than +/// pulling a `hex` crate just for one call site. +fn hex_encode(bytes: &[u8]) -> String { + let mut s = String::with_capacity(bytes.len() * 2); + for b in bytes { + s.push_str(&format!("{b:02x}")); + } + s +} + #[tokio::main] async fn main() -> anyhow::Result<()> { let mut args = std::env::args().skip(1); let wasm_path = args.next().ok_or_else(|| { - anyhow::anyhow!("usage: nexum-engine []") + anyhow::anyhow!( + "usage: nexum-engine [] []" + ) })?; let explicit_manifest = args.next().map(PathBuf::from); + let explicit_engine_config = args.next().map(PathBuf::from); + + // -- 1. Load engine config (optional). -- + let engine_cfg = engine_config::load_or_default(explicit_engine_config.as_deref())?; - println!("nexum-engine: loading component from {wasm_path}"); + // -- 2. Install tracing subscriber. -- + let env_filter = EnvFilter::try_from_default_env() + .or_else(|_| EnvFilter::try_new(&engine_cfg.engine.log_level)) + .unwrap_or_else(|_| EnvFilter::new("info")); + tracing_subscriber::fmt() + .with_env_filter(env_filter) + .with_target(true) + .init(); - // Load the manifest from the explicit path if given, otherwise from - // `nexum.toml` next to the component file. Missing → fallback (with - // deprecation warning). + info!("nexum-engine starting"); + info!(wasm = %wasm_path, "loading component"); + + // -- 3. Load the module manifest. -- let manifest_path = explicit_manifest.or_else(|| { PathBuf::from(&wasm_path) .parent() @@ -357,20 +448,40 @@ async fn main() -> anyhow::Result<()> { }); let loaded = match manifest_path.as_deref() { Some(p) if p.exists() => { - println!("nexum-engine: loading manifest from {}", p.display()); + info!(manifest = %p.display(), "loading nexum.toml"); manifest::load(p)? } _ => manifest::fallback_manifest(), }; + // -- 4. Bring up the host backends. -- + std::fs::create_dir_all(&engine_cfg.engine.state_dir).with_context(|| { + format!( + "create state directory {}", + engine_cfg.engine.state_dir.display() + ) + })?; + let store_path = engine_cfg.engine.state_dir.join("local-store.redb"); + let local_store = host::local_store_redb::LocalStore::open(&store_path) + .with_context(|| format!("open local-store at {}", store_path.display()))?; + let cow_pool = host::cow_orderbook::OrderBookPool::with_default_chains(); + let provider_pool = host::provider_pool::ProviderPool::from_config(&engine_cfg) + .await + .context("open chain providers")?; + + // -- 5. Build the wasmtime engine + component. -- let mut config = wasmtime::Config::new(); config.wasm_component_model(true); + // `async_support` was deprecated in wasmtime 45 — the engine + // resolves async on its own. Keeping the call out of the Config + // chain silences the `deprecated` warning under + // `RUSTFLAGS=-D warnings`. let engine = Engine::new(&config)?; - let start = Instant::now(); + let load_start = Instant::now(); let component = Component::from_file(&engine, &wasm_path).context("failed to load component")?; - eprintln!("[timing] component load: {:?}", start.elapsed()); + tracing::debug!(elapsed_ms = ?load_start.elapsed(), "component load"); let mut linker = Linker::::new(&engine); Shepherd::add_to_linker::>( @@ -380,6 +491,11 @@ async fn main() -> anyhow::Result<()> { wasmtime_wasi::p2::add_to_linker_async(&mut linker)?; let wasi = WasiCtxBuilder::new().inherit_stdio().build(); + let module_namespace = if loaded.manifest.module.name.is_empty() { + "module".to_owned() + } else { + loaded.manifest.module.name.clone() + }; let mut store = Store::new( &engine, @@ -388,36 +504,39 @@ async fn main() -> anyhow::Result<()> { table: ResourceTable::new(), monotonic_baseline: Instant::now(), http_allowlist: loaded.http_allowlist, + module_namespace, + cow: cow_pool, + chain: provider_pool, + store: local_store, }, ); - let start = Instant::now(); + let inst_start = Instant::now(); let bindings = Shepherd::instantiate_async(&mut store, &component, &linker) .await .context("failed to instantiate component")?; - eprintln!("[timing] component instantiate: {:?}", start.elapsed()); + tracing::debug!(elapsed_ms = ?inst_start.elapsed(), "component instantiate"); - println!("nexum-engine: calling init..."); - // 0.2: [config] is stringly-typed (typed variant deferred to 0.3). - // Fall back to a single ("name", "") pair if the manifest has - // no [config] section so the example module still has something to log. + info!("calling init"); let config_entries: Config = if loaded.config.is_empty() { vec![("name".into(), loaded.manifest.module.name.clone())] } else { loaded.config }; - let start = Instant::now(); + let init_start = Instant::now(); match bindings.call_init(&mut store, &config_entries).await? { - Ok(()) => println!("nexum-engine: init succeeded"), - Err(e) => println!( - "nexum-engine: init failed: {}::{:?} {} ({})", - e.domain, e.kind, e.message, e.code + Ok(()) => info!(elapsed_ms = ?init_start.elapsed(), "init succeeded"), + Err(e) => warn!( + domain = %e.domain, + kind = ?e.kind, + code = e.code, + message = %e.message, + "init failed", ), } - eprintln!("[timing] call_init: {:?}", start.elapsed()); // Dispatch a test block event (timestamps are ms since Unix epoch, UTC). - println!("nexum-engine: dispatching test block event..."); + info!("dispatching test block event"); let block = nexum::host::types::Block { chain_id: 1, number: 19_000_000, @@ -425,16 +544,18 @@ async fn main() -> anyhow::Result<()> { timestamp: 1_700_000_000_000, }; let event = nexum::host::types::Event::Block(block); - let start = Instant::now(); + let evt_start = Instant::now(); match bindings.call_on_event(&mut store, &event).await? { - Ok(()) => println!("nexum-engine: on-event succeeded"), - Err(e) => println!( - "nexum-engine: on-event failed: {}::{:?} {} ({})", - e.domain, e.kind, e.message, e.code + Ok(()) => info!(elapsed_ms = ?evt_start.elapsed(), "on-event succeeded"), + Err(e) => warn!( + domain = %e.domain, + kind = ?e.kind, + code = e.code, + message = %e.message, + "on-event failed", ), } - eprintln!("[timing] call_on_event: {:?}", start.elapsed()); - println!("nexum-engine: done"); + info!("done"); Ok(()) } From 6f669c6d6ae6ef9834330db1a9749e660176cf5d Mon Sep 17 00:00:00 2001 From: brunota20 Date: Mon, 1 Jun 2026 15:43:42 -0300 Subject: [PATCH 3/3] runtime: multi-module supervisor + block/log event loop --- crates/nexum-engine/Cargo.toml | 2 + crates/nexum-engine/src/engine_config.rs | 21 + crates/nexum-engine/src/host/provider_pool.rs | 49 +++ crates/nexum-engine/src/main.rs | 336 ++++++++++----- crates/nexum-engine/src/manifest.rs | 48 +++ crates/nexum-engine/src/supervisor.rs | 387 ++++++++++++++++++ 6 files changed, 745 insertions(+), 98 deletions(-) create mode 100644 crates/nexum-engine/src/supervisor.rs diff --git a/crates/nexum-engine/Cargo.toml b/crates/nexum-engine/Cargo.toml index 637f51f..047abfb 100644 --- a/crates/nexum-engine/Cargo.toml +++ b/crates/nexum-engine/Cargo.toml @@ -42,9 +42,11 @@ reqwest = { version = "0.12", default-features = false, features = ["json", "rus # alloy's JSON-RPC layer without reimplementing the codec. alloy-provider = { version = "1.5", default-features = false, features = ["ws", "ipc", "pubsub", "reqwest"] } alloy-rpc-client = { version = "1.5", default-features = false } +alloy-rpc-types-eth = { version = "1.5", default-features = false, features = ["std"] } alloy-transport = { version = "1.5", default-features = false } alloy-transport-ws = { version = "1.5", default-features = false } alloy-primitives = { version = "1.5", default-features = false, features = ["std", "serde"] } +futures = "0.3" # `local-store` backend. Per-module namespacing is enforced # host-side via a `[len:u8][module_name][raw_key]` prefix. diff --git a/crates/nexum-engine/src/engine_config.rs b/crates/nexum-engine/src/engine_config.rs index 4ec271c..595a688 100644 --- a/crates/nexum-engine/src/engine_config.rs +++ b/crates/nexum-engine/src/engine_config.rs @@ -32,6 +32,27 @@ pub struct EngineConfig { /// pool seed. #[serde(default)] pub chains: BTreeMap, + /// Modules the supervisor should boot. Each entry resolves a + /// `(component.wasm, nexum.toml)` pair on the local filesystem + /// for 0.2 — content-addressed resolution (Swarm / OCI / + /// `[[content.sources]]`) lands in 0.3 per + /// `docs/03-module-discovery.md`. + #[serde(default)] + pub modules: Vec, +} + +/// One `[[modules]]` table from `engine.toml`. +/// +/// Both fields are filesystem paths in 0.2. `manifest` defaults to +/// `nexum.toml` next to `path` if omitted, matching the bundle layout +/// in `docs/02-modules-events-packaging.md`. +#[derive(Debug, Deserialize)] +pub struct ModuleEntry { + /// Path to the compiled `.wasm` component. + pub path: std::path::PathBuf, + /// Path to the module's `nexum.toml`. Defaults to `/nexum.toml`. + #[serde(default)] + pub manifest: Option, } #[derive(Debug, Deserialize)] diff --git a/crates/nexum-engine/src/host/provider_pool.rs b/crates/nexum-engine/src/host/provider_pool.rs index 0b6d94b..adf589f 100644 --- a/crates/nexum-engine/src/host/provider_pool.rs +++ b/crates/nexum-engine/src/host/provider_pool.rs @@ -12,9 +12,13 @@ //! - `http://` / `https://` — alloy's HTTP transport; request/response only. use std::collections::BTreeMap; +use std::pin::Pin; use std::sync::Arc; use alloy_provider::{DynProvider, Provider, ProviderBuilder, WsConnect}; +use alloy_rpc_types_eth::{Filter, Header, Log}; +use futures::stream::Stream; +use futures::stream::StreamExt as _; use serde_json::value::RawValue; use thiserror::Error; use tracing::info; @@ -72,6 +76,46 @@ impl ProviderPool { } } + /// Open a new-blocks (`eth_subscribe newHeads`) stream on + /// `chain_id`. Requires a WS / IPC transport at construction + /// time; HTTP-only providers surface `UnknownChain` here. + pub async fn subscribe_blocks(&self, chain_id: u64) -> Result { + let provider = self + .providers + .get(&chain_id) + .ok_or(ProviderError::UnknownChain(chain_id))?; + let sub = provider + .subscribe_blocks() + .await + .map_err(|e| ProviderError::Rpc { + method: "eth_subscribe(newHeads)".into(), + detail: e.to_string(), + })?; + let stream = sub.into_stream().map(Ok::<_, ProviderError>); + Ok(Box::pin(stream)) + } + + /// Open an `eth_subscribe(logs, filter)` stream on `chain_id`. + pub async fn subscribe_logs( + &self, + chain_id: u64, + filter: Filter, + ) -> Result { + let provider = self + .providers + .get(&chain_id) + .ok_or(ProviderError::UnknownChain(chain_id))?; + let sub = provider + .subscribe_logs(&filter) + .await + .map_err(|e| ProviderError::Rpc { + method: "eth_subscribe(logs)".into(), + detail: e.to_string(), + })?; + let stream = sub.into_stream().map(Ok::<_, ProviderError>); + Ok(Box::pin(stream)) + } + /// Raw JSON-RPC dispatch. `params_json` must be the JSON encoding /// of the params array (e.g. `"[\"0x...\",\"latest\"]"`), as /// produced by the SDK's `chain::request` glue. @@ -104,6 +148,11 @@ impl ProviderPool { } } +/// Boxed stream of `newHeads`-style block headers. +pub type BlockStream = Pin> + Send>>; +/// Boxed stream of `logs`-filtered log events. +pub type LogStream = Pin> + Send>>; + /// Errors surfaced by [`ProviderPool`]. #[derive(Debug, Error)] pub enum ProviderError { diff --git a/crates/nexum-engine/src/main.rs b/crates/nexum-engine/src/main.rs index eca6629..0602ccc 100644 --- a/crates/nexum-engine/src/main.rs +++ b/crates/nexum-engine/src/main.rs @@ -1,16 +1,19 @@ mod engine_config; mod host; mod manifest; +mod supervisor; use std::path::PathBuf; use std::time::{Instant, SystemTime, UNIX_EPOCH}; +use futures::StreamExt; +use futures::stream::{FuturesUnordered, select_all}; use tracing::{info, warn}; use tracing_subscriber::EnvFilter; -use wasmtime::component::{Component, Linker, ResourceTable}; +use wasmtime::Engine; +use wasmtime::component::{Linker, ResourceTable}; use wasmtime::error::Context as _; -use wasmtime::{Engine, Store}; -use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView}; +use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView}; // Both packages are listed explicitly so wit-parser can resolve the // cross-package reference natively — no vendored deps/ tree needed. @@ -416,19 +419,16 @@ fn hex_encode(bytes: &[u8]) -> String { #[tokio::main] async fn main() -> anyhow::Result<()> { - let mut args = std::env::args().skip(1); - let wasm_path = args.next().ok_or_else(|| { - anyhow::anyhow!( - "usage: nexum-engine [] []" - ) - })?; - let explicit_manifest = args.next().map(PathBuf::from); - let explicit_engine_config = args.next().map(PathBuf::from); + // CLI args: + // nexum-engine [ []] [--engine-config ] + // + // Positional `` is a backwards-compat shortcut that + // synthesises a one-module engine config. Production deployments + // pass `--engine-config` and declare modules in TOML. + let cli = Cli::parse(); - // -- 1. Load engine config (optional). -- - let engine_cfg = engine_config::load_or_default(explicit_engine_config.as_deref())?; + let engine_cfg = engine_config::load_or_default(cli.engine_config.as_deref())?; - // -- 2. Install tracing subscriber. -- let env_filter = EnvFilter::try_from_default_env() .or_else(|_| EnvFilter::try_new(&engine_cfg.engine.log_level)) .unwrap_or_else(|_| EnvFilter::new("info")); @@ -438,23 +438,8 @@ async fn main() -> anyhow::Result<()> { .init(); info!("nexum-engine starting"); - info!(wasm = %wasm_path, "loading component"); - - // -- 3. Load the module manifest. -- - let manifest_path = explicit_manifest.or_else(|| { - PathBuf::from(&wasm_path) - .parent() - .map(|p| p.join("nexum.toml")) - }); - let loaded = match manifest_path.as_deref() { - Some(p) if p.exists() => { - info!(manifest = %p.display(), "loading nexum.toml"); - manifest::load(p)? - } - _ => manifest::fallback_manifest(), - }; - // -- 4. Bring up the host backends. -- + // Bring up shared host backends. std::fs::create_dir_all(&engine_cfg.engine.state_dir).with_context(|| { format!( "create state directory {}", @@ -469,20 +454,11 @@ async fn main() -> anyhow::Result<()> { .await .context("open chain providers")?; - // -- 5. Build the wasmtime engine + component. -- + // wasmtime engine + linker — one of each, shared across modules. let mut config = wasmtime::Config::new(); config.wasm_component_model(true); - // `async_support` was deprecated in wasmtime 45 — the engine - // resolves async on its own. Keeping the call out of the Config - // chain silences the `deprecated` warning under - // `RUSTFLAGS=-D warnings`. let engine = Engine::new(&config)?; - let load_start = Instant::now(); - let component = - Component::from_file(&engine, &wasm_path).context("failed to load component")?; - tracing::debug!(elapsed_ms = ?load_start.elapsed(), "component load"); - let mut linker = Linker::::new(&engine); Shepherd::add_to_linker::>( &mut linker, @@ -490,72 +466,236 @@ async fn main() -> anyhow::Result<()> { )?; wasmtime_wasi::p2::add_to_linker_async(&mut linker)?; - let wasi = WasiCtxBuilder::new().inherit_stdio().build(); - let module_namespace = if loaded.manifest.module.name.is_empty() { - "module".to_owned() + // Boot supervisor — `engine.toml.[[modules]]` first, CLI + // positional second. + let mut supervisor = if let Some(wasm) = cli.wasm.as_deref() { + if !engine_cfg.modules.is_empty() { + warn!("ignoring engine.toml [[modules]] because a positional was given"); + } + supervisor::Supervisor::boot_single( + &engine, + &linker, + wasm, + cli.manifest.as_deref(), + &cow_pool, + &provider_pool, + &local_store, + ) + .await? + } else if !engine_cfg.modules.is_empty() { + supervisor::Supervisor::boot( + &engine, + &linker, + &engine_cfg, + &cow_pool, + &provider_pool, + &local_store, + ) + .await? } else { - loaded.manifest.module.name.clone() + anyhow::bail!( + "no modules to run — either pass a positional or declare \ + [[modules]] entries in engine.toml" + ); }; - let mut store = Store::new( - &engine, - HostState { - wasi, - table: ResourceTable::new(), - monotonic_baseline: Instant::now(), - http_allowlist: loaded.http_allowlist, - module_namespace, - cow: cow_pool, - chain: provider_pool, - store: local_store, - }, + info!( + modules = supervisor.module_count(), + chains = supervisor.block_chains().len(), + "supervisor ready" ); - let inst_start = Instant::now(); - let bindings = Shepherd::instantiate_async(&mut store, &component, &linker) - .await - .context("failed to instantiate component")?; - tracing::debug!(elapsed_ms = ?inst_start.elapsed(), "component instantiate"); + // Open per-chain block subscriptions + per-module log + // subscriptions, merge, dispatch until shutdown. + let block_chains = supervisor.block_chains(); + let log_subs = supervisor.log_subscriptions(); - info!("calling init"); - let config_entries: Config = if loaded.config.is_empty() { - vec![("name".into(), loaded.manifest.module.name.clone())] - } else { - loaded.config - }; - let init_start = Instant::now(); - match bindings.call_init(&mut store, &config_entries).await? { - Ok(()) => info!(elapsed_ms = ?init_start.elapsed(), "init succeeded"), - Err(e) => warn!( - domain = %e.domain, - kind = ?e.kind, - code = e.code, - message = %e.message, - "init failed", - ), - } - - // Dispatch a test block event (timestamps are ms since Unix epoch, UTC). - info!("dispatching test block event"); - let block = nexum::host::types::Block { - chain_id: 1, - number: 19_000_000, - hash: vec![0xab; 32], - timestamp: 1_700_000_000_000, - }; - let event = nexum::host::types::Event::Block(block); - let evt_start = Instant::now(); - match bindings.call_on_event(&mut store, &event).await? { - Ok(()) => info!(elapsed_ms = ?evt_start.elapsed(), "on-event succeeded"), - Err(e) => warn!( - domain = %e.domain, - kind = ?e.kind, - code = e.code, - message = %e.message, - "on-event failed", - ), + if block_chains.is_empty() && log_subs.is_empty() { + info!("no [[subscription]] entries — engine has nothing to run; exiting"); + return Ok(()); } + let block_streams = open_block_streams(&provider_pool, &block_chains).await; + let log_streams = open_log_streams(&provider_pool, log_subs).await; + + let shutdown = async { + match wait_for_shutdown_signal().await { + Ok(name) => info!(signal = %name, "shutdown signal received"), + Err(err) => warn!(error = %err, "signal handler failed — using ctrl-c"), + } + }; + + run_event_loop(&mut supervisor, block_streams, log_streams, shutdown).await; info!("done"); Ok(()) } + +/// Parsed CLI surface. +#[derive(Debug, Default)] +struct Cli { + wasm: Option, + manifest: Option, + engine_config: Option, +} + +impl Cli { + fn parse() -> Self { + let mut args = std::env::args().skip(1); + let mut cli = Self::default(); + let mut positional = Vec::new(); + while let Some(arg) = args.next() { + match arg.as_str() { + "--engine-config" => cli.engine_config = args.next().map(PathBuf::from), + "-h" | "--help" => { + eprintln!( + "usage: nexum-engine [ []] \ + [--engine-config ]" + ); + std::process::exit(0); + } + _ => positional.push(arg), + } + } + if let Some(p) = positional.first() { + cli.wasm = Some(PathBuf::from(p)); + } + if let Some(p) = positional.get(1) { + cli.manifest = Some(PathBuf::from(p)); + } + cli + } +} + +/// Per-chain block subscriptions, one shared stream per chain id. +async fn open_block_streams( + pool: &host::provider_pool::ProviderPool, + chains: &std::collections::BTreeSet, +) -> Vec { + let mut openings: FuturesUnordered<_> = chains + .iter() + .copied() + .map(|chain_id| async move { (chain_id, pool.subscribe_blocks(chain_id).await) }) + .collect(); + + let mut streams = Vec::new(); + while let Some((chain_id, result)) = openings.next().await { + match result { + Ok(stream) => { + info!(chain_id, "block subscription open"); + let tagged: TaggedBlockStream = Box::pin(stream.map(move |item| { + item.map(|header| (chain_id, header)) + .map_err(anyhow::Error::from) + })); + streams.push(tagged); + } + Err(err) => { + warn!(chain_id, error = %err, "block subscription failed"); + } + } + } + streams +} + +/// Per-module log subscriptions. Each entry is a stream tagged with +/// the owning module name + chain id. +async fn open_log_streams( + pool: &host::provider_pool::ProviderPool, + subs: Vec<(String, u64, alloy_rpc_types_eth::Filter)>, +) -> Vec { + let mut openings: FuturesUnordered<_> = subs + .into_iter() + .map(|(module, chain_id, filter)| async move { + let stream = pool.subscribe_logs(chain_id, filter).await; + (module, chain_id, stream) + }) + .collect(); + + let mut streams = Vec::new(); + while let Some((module, chain_id, result)) = openings.next().await { + match result { + Ok(stream) => { + info!(module = %module, chain_id, "log subscription open"); + let module_name = module.clone(); + let tagged: TaggedLogStream = Box::pin(stream.map(move |item| { + item.map(|log| (module_name.clone(), chain_id, log)) + .map_err(anyhow::Error::from) + })); + streams.push(tagged); + } + Err(err) => { + warn!(module = %module, chain_id, error = %err, "log subscription failed"); + } + } + } + streams +} + +type TaggedBlockStream = std::pin::Pin< + Box< + dyn futures::Stream> + + Send, + >, +>; +type TaggedLogStream = std::pin::Pin< + Box< + dyn futures::Stream> + + Send, + >, +>; + +/// Drive the supervisor with events until `shutdown` resolves. +async fn run_event_loop( + supervisor: &mut supervisor::Supervisor, + block_streams: Vec, + log_streams: Vec, + shutdown: impl std::future::Future + Send, +) { + let mut blocks = select_all(block_streams); + let mut logs = select_all(log_streams); + let mut shutdown = Box::pin(shutdown); + loop { + tokio::select! { + biased; + () = &mut shutdown => return, + next = blocks.next() => match next { + Some(Ok((chain_id, header))) => { + let block = nexum::host::types::Block { + chain_id, + number: header.number, + hash: header.hash.as_slice().to_vec(), + timestamp: header.timestamp.saturating_mul(1000), + }; + supervisor.dispatch_block(block).await; + } + Some(Err(err)) => warn!(error = %err, "block stream error — continuing"), + None => {} + }, + next = logs.next() => match next { + Some(Ok((module, chain_id, log))) => { + supervisor.dispatch_log(&module, chain_id, log).await; + } + Some(Err(err)) => warn!(error = %err, "log stream error — continuing"), + None => {} + }, + } + } +} + +/// Wait for SIGINT or (on Unix) SIGTERM, whichever arrives first. +async fn wait_for_shutdown_signal() -> anyhow::Result<&'static str> { + #[cfg(unix)] + { + use tokio::signal::unix::{SignalKind, signal}; + let mut sigterm = signal(SignalKind::terminate())?; + let mut sigint = signal(SignalKind::interrupt())?; + tokio::select! { + _ = sigterm.recv() => Ok("SIGTERM"), + _ = sigint.recv() => Ok("SIGINT"), + } + } + #[cfg(not(unix))] + { + tokio::signal::ctrl_c().await?; + Ok("ctrl-c") + } +} diff --git a/crates/nexum-engine/src/manifest.rs b/crates/nexum-engine/src/manifest.rs index 522e168..2ce576a 100644 --- a/crates/nexum-engine/src/manifest.rs +++ b/crates/nexum-engine/src/manifest.rs @@ -47,6 +47,54 @@ pub struct Manifest { pub capabilities: Option, #[serde(default)] pub config: toml::Table, + /// Event subscriptions the runtime wires before calling + /// `_init`. See `docs/02-modules-events-packaging.md` for the + /// schema; 0.2 implements `block` and `log` kinds, `cron` is + /// parsed and ignored (deferred to 0.3). + #[serde(default, rename = "subscription")] + pub subscriptions: Vec, +} + +/// One `[[subscription]]` table in `nexum.toml`. +/// +/// The discriminator is the `kind` field; remaining fields are +/// validated per-kind by the supervisor. Unknown kinds are surfaced +/// at load time so a typo does not silently disable an event source. +#[derive(Debug, Deserialize, Clone)] +#[serde(tag = "kind", rename_all = "lowercase")] +pub enum Subscription { + /// New-block events. Fan-out is shared per chain — the + /// supervisor opens one subscription per chain id and routes to + /// every module that asked for blocks on that chain. + Block { + /// EVM chain id. + chain_id: u64, + }, + /// Log events matching `address` + topic-0. Fan-out is + /// per-module — the supervisor opens one subscription per + /// `[[subscription]]` entry and tags emitted events with the + /// owning module. + Log { + /// EVM chain id. + chain_id: u64, + /// Contract address as `0x`-prefixed 20-byte hex. Optional. + #[serde(default)] + address: Option, + /// Topic-0 of the event the module wants to consume. `0x`- + /// prefixed 32-byte hex. Optional — when absent the + /// subscription matches every event from the address(es). + #[serde(default)] + event_signature: Option, + }, + /// Cron-scheduled tick. 0.2 parses but does not dispatch; the + /// supervisor emits a warning so the operator knows the + /// declaration is currently inert. `schedule` is preserved so a + /// 0.3 dispatcher can pick it up without re-parsing the manifest. + Cron { + /// Standard 5-field cron expression. + #[allow(dead_code)] + schedule: String, + }, } #[derive(Debug, Deserialize, Default)] diff --git a/crates/nexum-engine/src/supervisor.rs b/crates/nexum-engine/src/supervisor.rs new file mode 100644 index 0000000..18c2fa4 --- /dev/null +++ b/crates/nexum-engine/src/supervisor.rs @@ -0,0 +1,387 @@ +//! Multi-module supervisor. +//! +//! Loads every `[[modules]]` entry from `engine.toml`, instantiates +//! each as a `Shepherd` bindings against a dedicated wasmtime +//! `Store`, and routes the event types declared in each manifest's +//! `[[subscription]]` table. +//! +//! 0.2 dispatches a block event to every module that subscribed to +//! that chain's blocks, and a log event only to the module that +//! opened the subscription. Restart + poison-pill bookkeeping ships +//! in a follow-up — for now a failing `_on_event` is logged via +//! `tracing::error!` and the module continues to receive subsequent +//! events. Lifecycle states `Restart` / `Dead` from +//! `docs/02-modules-events-packaging.md` land alongside the +//! `[module.restart]` schema in 0.3. + +use std::collections::BTreeSet; +use std::path::Path; + +use anyhow::{Context, Error, Result, anyhow}; +use tracing::{error, info, warn}; +use wasmtime::component::{Component, Linker, ResourceTable}; +use wasmtime::{Engine, Store}; +use wasmtime_wasi::WasiCtxBuilder; + +use crate::engine_config::{EngineConfig, ModuleEntry}; +use crate::host::cow_orderbook::OrderBookPool; +use crate::host::local_store_redb::LocalStore; +use crate::host::provider_pool::ProviderPool; +use crate::manifest::{self, LoadedManifest, Subscription}; +use crate::{HostState, Shepherd}; + +/// Owns every loaded module and exposes the dispatch surface the +/// event loop needs. +pub struct Supervisor { + modules: Vec, +} + +struct LoadedModule { + name: String, + bindings: Shepherd, + store: Store, + /// Subscriptions copied from `nexum.toml`. The supervisor reads + /// these on every event to decide whether to dispatch. + subscriptions: Vec, +} + +impl Supervisor { + /// Compile + instantiate every module declared in + /// `engine_cfg.modules`. The wasmtime `Engine` + `Linker` are + /// passed in so `main.rs` can build them once (the bindgen + /// `Shepherd::add_to_linker` call binds them to `HostState`, + /// which the supervisor does not re-derive). + pub async fn boot( + engine: &Engine, + linker: &Linker, + engine_cfg: &EngineConfig, + cow_pool: &OrderBookPool, + provider_pool: &ProviderPool, + local_store: &LocalStore, + ) -> Result { + let mut modules = Vec::with_capacity(engine_cfg.modules.len()); + for entry in &engine_cfg.modules { + let loaded = + Self::load_one(engine, linker, entry, cow_pool, provider_pool, local_store) + .await + .with_context(|| format!("load module {}", entry.path.display()))?; + modules.push(loaded); + } + info!(count = modules.len(), "supervisor up"); + Ok(Self { modules }) + } + + /// One-shot construction from a single ad-hoc `(component, manifest)` + /// pair. Used by the CLI-positional invocation so `just run` + /// against the example module keeps working without an + /// `engine.toml`. + pub async fn boot_single( + engine: &Engine, + linker: &Linker, + wasm: &Path, + manifest: Option<&Path>, + cow_pool: &OrderBookPool, + provider_pool: &ProviderPool, + local_store: &LocalStore, + ) -> Result { + let entry = ModuleEntry { + path: wasm.to_path_buf(), + manifest: manifest.map(Path::to_path_buf), + }; + let loaded = + Self::load_one(engine, linker, &entry, cow_pool, provider_pool, local_store).await?; + Ok(Self { + modules: vec![loaded], + }) + } + + async fn load_one( + engine: &Engine, + linker: &Linker, + entry: &ModuleEntry, + cow_pool: &OrderBookPool, + provider_pool: &ProviderPool, + local_store: &LocalStore, + ) -> Result { + let manifest_path = entry + .manifest + .clone() + .or_else(|| entry.path.parent().map(|p| p.join("nexum.toml"))); + let loaded_manifest: LoadedManifest = match manifest_path.as_deref() { + Some(p) if p.exists() => { + info!(manifest = %p.display(), "loading nexum.toml"); + manifest::load(p)? + } + _ => { + warn!( + component = %entry.path.display(), + "no nexum.toml — falling back to anonymous module" + ); + manifest::fallback_manifest() + } + }; + + // Compile + instantiate. + info!(component = %entry.path.display(), "compiling component"); + let component = Component::from_file(engine, &entry.path) + .map_err(Error::from) + .with_context(|| format!("compile {}", entry.path.display()))?; + let wasi = WasiCtxBuilder::new().inherit_stdio().build(); + let module_namespace = if loaded_manifest.manifest.module.name.is_empty() { + "module".to_owned() + } else { + loaded_manifest.manifest.module.name.clone() + }; + let mut store = Store::new( + engine, + HostState { + wasi, + table: ResourceTable::new(), + monotonic_baseline: std::time::Instant::now(), + http_allowlist: loaded_manifest.http_allowlist.clone(), + module_namespace: module_namespace.clone(), + cow: cow_pool.clone(), + chain: provider_pool.clone(), + store: local_store.clone(), + }, + ); + let bindings = Shepherd::instantiate_async(&mut store, &component, linker) + .await + .map_err(Error::from) + .with_context(|| format!("instantiate {}", entry.path.display()))?; + + // Call `init` with the manifest's `[config]`. + let config: crate::Config = if loaded_manifest.config.is_empty() { + vec![("name".into(), module_namespace.clone())] + } else { + loaded_manifest.config.clone() + }; + match bindings + .call_init(&mut store, &config) + .await + .map_err(Error::from)? + { + Ok(()) => info!(module = %module_namespace, "init succeeded"), + Err(e) => warn!( + module = %module_namespace, + domain = %e.domain, + kind = ?e.kind, + code = e.code, + message = %e.message, + "init failed", + ), + } + + // Surface any `[[subscription]]` entries the host cannot + // service yet, so an operator running 0.2 against a 0.3 + // manifest does not silently drop events. + for sub in &loaded_manifest.manifest.subscriptions { + if matches!(sub, Subscription::Cron { .. }) { + warn!( + module = %module_namespace, + "cron subscriptions are declared but inert in 0.2 (lands in 0.3)", + ); + } + } + + Ok(LoadedModule { + name: module_namespace, + bindings, + store, + subscriptions: loaded_manifest.manifest.subscriptions.clone(), + }) + } + + /// Number of modules currently loaded. + pub fn module_count(&self) -> usize { + self.modules.len() + } + + /// Set of chain ids any module asked for block events on. The + /// caller opens one shared block subscription per chain id and + /// routes through `dispatch_block`. + pub fn block_chains(&self) -> BTreeSet { + let mut out = BTreeSet::new(); + for module in &self.modules { + for sub in &module.subscriptions { + if let Subscription::Block { chain_id } = sub { + out.insert(*chain_id); + } + } + } + out + } + + /// Per-module log subscriptions. Each entry is a `(module_name, + /// chain_id, filter)` triple the event loop opens against the + /// matching alloy provider; the resulting stream tags every log + /// with `module_name` so `dispatch_log` routes correctly. + pub fn log_subscriptions(&self) -> Vec<(String, u64, alloy_rpc_types_eth::Filter)> { + let mut out = Vec::new(); + for module in &self.modules { + for sub in &module.subscriptions { + if let Subscription::Log { + chain_id, + address, + event_signature, + } = sub + { + match build_alloy_filter(address.as_deref(), event_signature.as_deref()) { + Ok(filter) => out.push((module.name.clone(), *chain_id, filter)), + Err(err) => warn!( + module = %module.name, + chain_id, + error = %err, + "invalid log subscription — skipping", + ), + } + } + } + } + out + } + + /// Dispatch a block event to every module subscribed to + /// `block.chain_id`. Returns the number of modules invoked. + pub async fn dispatch_block(&mut self, block: crate::nexum::host::types::Block) -> usize { + let event = crate::nexum::host::types::Event::Block(block); + let chain_id = match &event { + crate::nexum::host::types::Event::Block(b) => b.chain_id, + _ => unreachable!(), + }; + let mut dispatched = 0; + for module in &mut self.modules { + let subscribed = module + .subscriptions + .iter() + .any(|s| matches!(s, Subscription::Block { chain_id: cid } if *cid == chain_id)); + if !subscribed { + continue; + } + match module + .bindings + .call_on_event(&mut module.store, &event) + .await + { + Ok(Ok(())) => dispatched += 1, + Ok(Err(host_err)) => warn!( + module = %module.name, + chain_id, + domain = %host_err.domain, + kind = ?host_err.kind, + message = %host_err.message, + "on-event returned host-error", + ), + Err(trap) => error!( + module = %module.name, + chain_id, + error = %trap, + "on-event trapped", + ), + } + } + dispatched + } + + /// Dispatch a log event to the specific module that opened the + /// subscription. Returns `true` when the module accepted the + /// dispatch; `false` when the module was not found or its + /// callback failed. + pub async fn dispatch_log( + &mut self, + module_name: &str, + chain_id: u64, + log: alloy_rpc_types_eth::Log, + ) -> bool { + let target = match self.modules.iter_mut().find(|m| m.name == module_name) { + Some(m) => m, + None => { + warn!(module = %module_name, "no such module — dropping log"); + return false; + } + }; + let event = crate::nexum::host::types::Event::Logs(vec![project_log(chain_id, &log)]); + match target + .bindings + .call_on_event(&mut target.store, &event) + .await + { + Ok(Ok(())) => true, + Ok(Err(host_err)) => { + warn!( + module = %module_name, + chain_id, + domain = %host_err.domain, + kind = ?host_err.kind, + message = %host_err.message, + "on-event returned host-error", + ); + false + } + Err(trap) => { + error!( + module = %module_name, + chain_id, + error = %trap, + "on-event trapped", + ); + false + } + } + } +} + +/// Project an alloy `Log` onto the WIT `log` record. The chain id +/// is not on the alloy log (the subscription context carries it), +/// so we receive it alongside. +fn project_log(chain_id: u64, log: &alloy_rpc_types_eth::Log) -> crate::nexum::host::types::Log { + crate::nexum::host::types::Log { + chain_id, + address: log.address().as_slice().to_vec(), + topics: log.topics().iter().map(|t| t.as_slice().to_vec()).collect(), + data: log.inner.data.data.to_vec(), + block_number: log.block_number.unwrap_or(0), + transaction_hash: log + .transaction_hash + .map(|h| h.as_slice().to_vec()) + .unwrap_or_default(), + log_index: log.log_index.unwrap_or(0) as u32, + } +} + +/// Translate a `[[subscription]]` log entry into an alloy `Filter`. +fn build_alloy_filter( + address: Option<&str>, + event_signature: Option<&str>, +) -> Result { + use alloy_primitives::{Address, B256}; + let mut filter = alloy_rpc_types_eth::Filter::new(); + if let Some(addr_hex) = address { + let addr: Address = addr_hex + .parse() + .map_err(|e| anyhow!("invalid log address {addr_hex:?}: {e}"))?; + filter = filter.address(addr); + } + if let Some(topic_hex) = event_signature { + let topic: B256 = topic_hex + .parse() + .map_err(|e| anyhow!("invalid topic {topic_hex:?}: {e}"))?; + filter = filter.event_signature(topic); + } + Ok(filter) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn empty_supervisor_returns_no_subscriptions() { + let sup = Supervisor { + modules: Vec::new(), + }; + assert!(sup.block_chains().is_empty()); + assert!(sup.log_subscriptions().is_empty()); + assert_eq!(sup.module_count(), 0); + } +}