diff --git a/e2e/nest_five_networks.py b/e2e/nest_five_networks.py new file mode 100644 index 0000000..c1d1d80 --- /dev/null +++ b/e2e/nest_five_networks.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python3 +"""Drive 5 distinct NEST spiking networks through the NCP RPC contract. + +Speaks newline-delimited JSON to engram's bridge_server (--backend nest) — the +exact medium the NCP Rust gateway bridges from Zenoh. open_session -> +step_request* (current_pA stimulus, spikes recording) -> close_session. +""" +import json, socket, sys, time + +HOST, PORT = "127.0.0.1", 28474 +NCP = "0.5" +HASH = "24e8e6e31e1dec8a" + +NETWORKS = [ + # (label, nest model, pop size, [current_pA per 100ms step]) + ("iaf_psc_alpha (current LIF)", "iaf_psc_alpha", 10, [500.0, 750.0, 1000.0]), + ("iaf_psc_exp (exp-synapse LIF)", "iaf_psc_exp", 10, [500.0, 750.0, 1000.0]), + ("izhikevich (regular spiking)", "izhikevich", 8, [10.0, 15.0, 20.0]), + ("hh_psc_alpha (Hodgkin-Huxley)", "hh_psc_alpha", 6, [650.0, 800.0, 1000.0]), + ("aeif_cond_alpha (adaptive EIF)","aeif_cond_alpha", 6, [500.0, 750.0, 1000.0]), +] + +def rpc(sock, rdr, msg): + sock.sendall((json.dumps(msg) + "\n").encode()) + line = rdr.readline() + if not line: + raise RuntimeError("connection closed by bridge") + return json.loads(line) + +def run_one(sock, rdr, label, model, n, currents): + sid = f"nest-{model}" + opened = rpc(sock, rdr, { + "ncp_version": NCP, "kind": "open_session", "session_id": sid, + "network": {"kind": "builtin", "ref": model, "population_sizes": {"pop": n}}, + "record": {"targets": [{"port": "spk", "target": "pop", "observable": "spikes"}]}, + "stimulus": {"targets": [{"port": "drive", "target": "pop", "kind": "current_pA"}]}, + "sim": {"dt_ms": 0.1, "chunk_ms": 10.0, "mode": "stream"}, + "bindings": [], "contract_hash": HASH, + }) + if opened.get("kind") == "error" or opened.get("ok") is False: + return {"label": label, "model": model, "ok": False, "detail": opened} + backend = opened.get("backend") + total_spikes, per_step = 0, [] + for i, cur in enumerate(currents): + obs = rpc(sock, rdr, { + "ncp_version": NCP, "kind": "step_request", "session_id": sid, "advance_ms": 100.0, + "stimulus": {"kind": "stimulus_frame", "session_id": sid, "t": float(i), + "values": {"drive": {"data": [cur], "unit": "pA"}}}, + }) + if obs.get("kind") == "error": + return {"label": label, "model": model, "ok": False, "detail": obs} + rec = (obs.get("records") or {}).get("spk", {}) + nspk = len(rec.get("times", []) or []) + total_spikes += nspk + per_step.append((cur, nspk, obs.get("sim_time_ms"))) + closed = rpc(sock, rdr, {"ncp_version": NCP, "kind": "close_session", "session_id": sid}) + return {"label": label, "model": model, "ok": True, "backend": backend, + "pop": n, "total_spikes": total_spikes, "per_step": per_step, + "closed_ok": closed.get("ok")} + +def main(): + t0 = time.time() + with socket.create_connection((HOST, PORT), timeout=120) as s: + s.settimeout(120) + rdr = s.makefile("r") + results = [] + for (label, model, n, currents) in NETWORKS: + try: + results.append(run_one(s, rdr, label, model, n, currents)) + except Exception as e: + results.append({"label": label, "model": model, "ok": False, "detail": str(e)}) + print(f"\n=== 5 NEST spiking sims via NCP (NEST backend), {time.time()-t0:.1f}s ===") + print(f"{'network':32} {'pop':>4} {'spikes':>7} steps(curr_pA->spikes)") + print("-" * 86) + ok = 0 + for r in results: + if r.get("ok"): + ok += 1 + steps = " ".join(f"{int(c)}->{ns}" for (c, ns, _) in r["per_step"]) + print(f"{r['label']:32} {r['pop']:>4} {r['total_spikes']:>7} {steps}") + else: + print(f"{r['label']:32} FAILED: {str(r.get('detail'))[:80]}") + print("-" * 86) + print(f"backend={results[0].get('backend') if results else '?'} ok={ok}/{len(results)}") + sys.exit(0 if ok == len(results) else 1) + +if __name__ == "__main__": + main() diff --git a/ncp-core/examples/overhead.rs b/ncp-core/examples/overhead.rs new file mode 100644 index 0000000..b9f721f --- /dev/null +++ b/ncp-core/examples/overhead.rs @@ -0,0 +1,176 @@ +//! NCP overhead measurement — "is NCP low overhead?" (read-only use of the lib). +//! +//! Times the per-tick hot paths: JSON (de)serialization of the action/perception +//! frames, the safety governor, the reflex controller, and the binary BulkBlock +//! observation codec (vs JSON for the same payload). +//! +//! Run: cargo run -p ncp-core --release --example overhead + +use ncp_core::transport::Controller; +use ncp_core::{ + BulkBlock, ChannelValue, Column, CommandFrame, Map, Mode, ReflexController, SafetyGovernor, + SafetyLimits, SensorFrame, +}; +use std::hint::black_box; +use std::time::Instant; + +fn bench(iters: u64, mut f: F) -> f64 { + for _ in 0..(iters / 20).max(1) { + f(); + } // warm + let t = Instant::now(); + for _ in 0..iters { + f(); + } + t.elapsed().as_nanos() as f64 / iters as f64 +} +fn row(name: &str, ns: f64, bytes: usize) { + let per_sec = if ns > 0.0 { 1e9 / ns } else { 0.0 }; + let b = if bytes > 0 { + format!("{bytes} B") + } else { + "-".into() + }; + println!(" {name:38} {ns:9.1} ns/op {per_sec:>11.0} ops/s {b}"); +} + +fn main() { + // typical action frame + let mut ch = Map::new(); + ch.insert( + "velocity_setpoint".into(), + ChannelValue::vec3(0.4, -0.1, 0.2, Some("m/s")), + ); + let cmd = CommandFrame { + mode: Mode::Active, + ttl_ms: 200.0, + seq: 42, + channels: ch, + ..Default::default() + }; + let cmd_bytes = serde_json::to_vec(&cmd).unwrap(); + + // typical sensor frame + let mut sch = Map::new(); + sch.insert( + "pose_position".into(), + ChannelValue::vec3(1.0, 2.0, 3.0, Some("m")), + ); + sch.insert( + "pose_velocity".into(), + ChannelValue::vec3(0.1, 0.0, -0.2, Some("m/s")), + ); + let sensor = SensorFrame { + seq: 42, + t: 1.0, + channels: sch, + ..Default::default() + }; + let sensor_bytes = serde_json::to_vec(&sensor).unwrap(); + + println!("\n=== NCP per-tick hot-path overhead (JSON action/perception planes) ==="); + let n = 500_000; + row( + "CommandFrame serialize (serde_json)", + bench(n, || { + black_box(serde_json::to_vec(black_box(&cmd)).unwrap()); + }), + cmd_bytes.len(), + ); + row( + "CommandFrame deserialize", + bench(n, || { + let c: CommandFrame = serde_json::from_slice(black_box(&cmd_bytes)).unwrap(); + black_box(c); + }), + cmd_bytes.len(), + ); + row( + "SensorFrame serialize", + bench(n, || { + black_box(serde_json::to_vec(black_box(&sensor)).unwrap()); + }), + sensor_bytes.len(), + ); + row( + "SensorFrame deserialize", + bench(n, || { + let s: SensorFrame = serde_json::from_slice(black_box(&sensor_bytes)).unwrap(); + black_box(s); + }), + sensor_bytes.len(), + ); + + println!("\n=== control + safety compute ==="); + let mut gov = SafetyGovernor::new(SafetyLimits { + max_speed_mps: Some(5.0), + geofence_radius_m: Some(100.0), + command_timeout_ms: 1000.0, + ..Default::default() + }); + row( + "SafetyGovernor.govern", + bench(n, || { + black_box(gov.govern(black_box(&cmd), Some(black_box(&sensor)), 1.0, Some(0.99))); + }), + 0, + ); + let mut ctrl = ReflexController::default(); + row( + "ReflexController.step", + bench(n, || { + black_box(ctrl.step(Some(black_box(&sensor)), 50.0)); + }), + 0, + ); + + println!("\n=== bulk observation codec: binary BulkBlock vs JSON (1000 spike times) ==="); + let times: Vec = (0..1000).map(|i| i as f64 * 0.137).collect(); + let block = BulkBlock::new().with("times", Column::F64(times.clone())); + let bulk_bytes = block.encode(); + let json_bytes = serde_json::to_vec(×).unwrap(); + row( + "BulkBlock encode (1000 f64)", + bench(50_000, || { + black_box( + BulkBlock::new() + .with("times", Column::F64(black_box(times.clone()))) + .encode(), + ); + }), + bulk_bytes.len(), + ); + row( + "BulkBlock decode", + bench(50_000, || { + black_box(BulkBlock::decode(black_box(&bulk_bytes)).unwrap()); + }), + bulk_bytes.len(), + ); + row( + "(JSON encode same 1000 f64)", + bench(50_000, || { + black_box(serde_json::to_vec(black_box(×)).unwrap()); + }), + json_bytes.len(), + ); + + println!("\n=== verdict ==="); + println!( + " action frame: {} B JSON, ser+de ~{:.0}+{:.0} ns", + cmd_bytes.len(), + bench(n, || { + black_box(serde_json::to_vec(&cmd).unwrap()); + }), + bench(n, || { + let c: CommandFrame = serde_json::from_slice(&cmd_bytes).unwrap(); + black_box(c); + }) + ); + println!( + " bulk codec: BulkBlock {} B vs JSON {} B ({:.1}x smaller) for 1000 floats", + bulk_bytes.len(), + json_bytes.len(), + json_bytes.len() as f64 / bulk_bytes.len() as f64 + ); +} diff --git a/ncp-core/examples/uav_control_safety.rs b/ncp-core/examples/uav_control_safety.rs new file mode 100644 index 0000000..ee1936f --- /dev/null +++ b/ncp-core/examples/uav_control_safety.rs @@ -0,0 +1,321 @@ +//! Full UAV motion + controls + safety exercise for NCP (read-only use of the lib). +//! +//! Demonstrates, end to end and deterministically: +//! 1. Closed-loop flight: NeuroControlLoop + ReflexController fly a simulated +//! quad from an offset to the target (velocity setpoints, speed-clamped). +//! 2. SafetyGovernor gates: speed clamp, geofence -> latched ESTOP, stale-sensor +//! HOLD, ESTOP-mode passthrough, non-finite clock fail-safe, horizon clamp. +//! 3. ActionBuffer predictive horizon replay through a dropout + ttl expiry. +//! 4. CommandWatchdog ttl deadline + out-of-order seq must not refresh it. +//! +//! Run: cargo run -p ncp-core --example uav_control_safety + +use ncp_core::{ + ActionBuffer, ChannelValue, CommandFrame, CommandWatchdog, InProcessTransport, Map, Mode, + NeuroControlLoop, ReflexController, SafetyGovernor, SafetyLimits, SensorFrame, +}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; + +fn vel_map(x: f64, y: f64, z: f64) -> Map { + let mut m = Map::new(); + m.insert( + "velocity_setpoint".into(), + ChannelValue::vec3(x, y, z, Some("m/s")), + ); + m +} +fn sensor_at(seq: i64, t: f64, p: [f64; 3], v: [f64; 3]) -> SensorFrame { + let mut ch = Map::new(); + ch.insert( + "pose_position".into(), + ChannelValue::vec3(p[0], p[1], p[2], Some("m")), + ); + ch.insert( + "pose_velocity".into(), + ChannelValue::vec3(v[0], v[1], v[2], Some("m/s")), + ); + SensorFrame { + seq, + t, + channels: ch, + ..Default::default() + } +} +fn vmag(c: &ChannelValue) -> f64 { + c.data.iter().map(|x| x * x).sum::().sqrt() +} +fn vget(m: &Map) -> [f64; 3] { + let c = m.get("velocity_setpoint").expect("velocity_setpoint"); + [c.data[0], c.data[1], c.data[2]] +} + +struct T { + pass: u32, + fail: u32, +} +impl T { + fn check(&mut self, name: &str, ok: bool, detail: String) { + if ok { + self.pass += 1; + println!(" PASS {name} {detail}"); + } else { + self.fail += 1; + println!(" FAIL {name} {detail}"); + } + } +} + +fn main() { + let mut t = T { pass: 0, fail: 0 }; + + // ── 1. Closed-loop flight to target ───────────────────────────────────── + println!("\n[1] Closed-loop PD flight (ReflexController via NeuroControlLoop)"); + let clock = Arc::new(AtomicU64::new(0)); // milliseconds + let ck = clock.clone(); + let mut loop_ = NeuroControlLoop::new( + InProcessTransport::new(), + ReflexController { + kp: 1.0, + kd: 0.3, + max_speed: 1.5, + ..Default::default() + }, + 20.0, + SafetyLimits { + command_timeout_ms: 1000.0, + ..Default::default() + }, + ) + .with_clock(Box::new(move || ck.load(Ordering::Relaxed) as f64 / 1000.0)); + + let dt = 0.05_f64; // 20 Hz + let mut pos: [f64; 3] = [4.0, 3.0, -2.0]; + let mut vel: [f64; 3] = [0.0, 0.0, 0.0]; + let start_dist = (pos[0] * pos[0] + pos[1] * pos[1] + pos[2] * pos[2]).sqrt(); + let mut max_speed_seen = 0.0_f64; // vector magnitude + let mut max_axis_seen = 0.0_f64; // per-component + for k in 0..120 { + clock.fetch_add((dt * 1000.0) as u64, Ordering::Relaxed); + loop_ + .transport + .push_sensor(sensor_at(k, k as f64 * dt, pos, vel)); + let cmd = loop_.tick(); + let v = vget(&cmd.channels); + max_speed_seen = max_speed_seen.max((v[0] * v[0] + v[1] * v[1] + v[2] * v[2]).sqrt()); + max_axis_seen = max_axis_seen + .max(v[0].abs()) + .max(v[1].abs()) + .max(v[2].abs()); + vel = v; + for i in 0..3 { + pos[i] += v[i] * dt; + } + if k % 30 == 0 { + println!( + " t={:.2}s pos=({:5.2},{:5.2},{:5.2}) v=({:5.2},{:5.2},{:5.2})", + k as f64 * dt, + pos[0], + pos[1], + pos[2], + v[0], + v[1], + v[2] + ); + } + } + let end_dist = (pos[0] * pos[0] + pos[1] * pos[1] + pos[2] * pos[2]).sqrt(); + t.check( + "flight converges to target", + end_dist < 0.1, + format!("dist {start_dist:.2}m -> {end_dist:.3}m"), + ); + t.check( + "ReflexController per-axis clamp (<=1.5)", + max_axis_seen <= 1.5 + 1e-9, + format!("peak |axis|={max_axis_seen:.3} m/s"), + ); + println!(" NOTE: ReflexController clamps PER-AXIS, so vector speed reached {max_speed_seen:.3} m/s (up to sqrt(3)*max_speed); SafetyGovernor magnitude-clamps. [finding]"); + + // ── 2. SafetyGovernor gates ───────────────────────────────────────────── + println!("\n[2] SafetyGovernor gates"); + let fresh = sensor_at(1, 1.0, [0.0, 0.0, 0.0], [0.0, 0.0, 0.0]); + + // 2a speed clamp + let mut gov = SafetyGovernor::new(SafetyLimits { + max_speed_mps: Some(5.0), + command_timeout_ms: 1000.0, + ..Default::default() + }); + let cmd = CommandFrame { + mode: Mode::Active, + channels: vel_map(100.0, 0.0, 0.0), + ..Default::default() + }; + let out = gov.govern(&cmd, Some(&fresh), 1.0, Some(0.99)); + let mag = vmag(out.channels.get("velocity_setpoint").unwrap()); + t.check( + "over-speed command clamped", + (mag - 5.0).abs() < 1e-6, + format!("|v| 100 -> {mag:.3} m/s"), + ); + + // 2b geofence breach -> latched estop + let mut gov_g = SafetyGovernor::new(SafetyLimits { + geofence_radius_m: Some(10.0), + command_timeout_ms: 1000.0, + ..Default::default() + }); + let beyond = sensor_at(1, 1.0, [20.0, 0.0, 0.0], [0.0, 0.0, 0.0]); + let out = gov_g.govern( + &CommandFrame { + mode: Mode::Active, + channels: vel_map(1.0, 0.0, 0.0), + ..Default::default() + }, + Some(&beyond), + 1.0, + Some(0.99), + ); + t.check( + "geofence breach -> ESTOP latch", + out.mode == Mode::Estop && gov_g.is_estopped(), + format!("mode={:?} estopped={}", out.mode, gov_g.is_estopped()), + ); + + // 2c stale sensor -> HOLD (non-latching) + let mut gov_s = SafetyGovernor::new(SafetyLimits { + command_timeout_ms: 500.0, + ..Default::default() + }); + let out = gov_s.govern(&cmd, Some(&fresh), 10.0, Some(0.0)); + t.check( + "stale sensor -> HOLD", + out.mode == Mode::Hold && !gov_s.is_estopped(), + format!("mode={:?}", out.mode), + ); + + // 2d ESTOP-mode command: govern preserves the mode for the plant to enforce. + let mut gov_e = SafetyGovernor::new(SafetyLimits { + command_timeout_ms: 1000.0, + ..Default::default() + }); + let out = gov_e.govern( + &CommandFrame { + mode: Mode::Estop, + channels: vel_map(9.0, 9.0, 9.0), + ..Default::default() + }, + Some(&fresh), + 1.0, + Some(0.99), + ); + let z = vget(&out.channels); + t.check( + "ESTOP mode preserved for plant", + out.mode == Mode::Estop, + format!("mode={:?}", out.mode), + ); + println!(" NOTE: govern() did NOT zero the channels of an incoming estop/hold-mode command (v={z:?}); de-energizing relies on the plant honoring `mode`. Defense-in-depth candidate. [finding]"); + + // 2e non-finite clock -> fail-safe HOLD (not fail-open) + let mut gov_n = SafetyGovernor::new(SafetyLimits { + command_timeout_ms: 500.0, + ..Default::default() + }); + let out = gov_n.govern(&cmd, Some(&fresh), f64::NAN, Some(0.0)); + t.check( + "NaN clock -> HOLD (fail-safe)", + out.mode == Mode::Hold, + format!("mode={:?}", out.mode), + ); + + // 2f horizon steps are clamped too + let mut gov_h = SafetyGovernor::new(SafetyLimits { + max_speed_mps: Some(5.0), + command_timeout_ms: 1000.0, + ..Default::default() + }); + let cmd_h = CommandFrame { + mode: Mode::Active, + channels: vel_map(1.0, 0.0, 0.0), + horizon: vec![vel_map(50.0, 0.0, 0.0)], + horizon_dt_ms: Some(100.0), + ..Default::default() + }; + let out = gov_h.govern(&cmd_h, Some(&fresh), 1.0, Some(0.99)); + let hmag = out + .horizon + .first() + .map(|m| vmag(m.get("velocity_setpoint").unwrap())) + .unwrap_or(0.0); + t.check( + "predictive horizon clamped", + (hmag - 5.0).abs() < 1e-6, + format!("horizon |v| 50 -> {hmag:.3} m/s"), + ); + + // ── 3. ActionBuffer predictive replay through a dropout ────────────────── + println!("\n[3] ActionBuffer horizon replay + ttl"); + let mut ab = ActionBuffer::new(); + let cmd_p = CommandFrame { + mode: Mode::Active, + ttl_ms: 500.0, + channels: vel_map(1.0, 0.0, 0.0), + horizon: vec![vel_map(2.0, 0.0, 0.0), vel_map(3.0, 0.0, 0.0)], + horizon_dt_ms: Some(100.0), + ..Default::default() + }; + ab.on_command(0.0, cmd_p); + let a0 = ab + .active(0.0) + .and_then(|m| m.get("velocity_setpoint").map(|c| c.data[0])); + let a1 = ab + .active(0.1) + .and_then(|m| m.get("velocity_setpoint").map(|c| c.data[0])); + let a2 = ab + .active(0.2) + .and_then(|m| m.get("velocity_setpoint").map(|c| c.data[0])); + t.check( + "replay tick0/h1/h2 through dropout", + a0 == Some(1.0) && a1 == Some(2.0) && a2 == Some(3.0), + format!("{a0:?} {a1:?} {a2:?}"), + ); + t.check( + "ttl expiry -> HOLD", + ab.should_hold(0.6), + format!("should_hold(0.6s)={}", ab.should_hold(0.6)), + ); + + // ── 4. CommandWatchdog deadline + out-of-order seq ────────────────────── + println!("\n[4] CommandWatchdog"); + let mut wd = CommandWatchdog::new(); + wd.on_command(0.0, 200.0, 1); + let within = wd.should_hold(0.1); + let expired = wd.should_hold(0.3); + t.check( + "watchdog within ttl actuates", + !within, + format!("should_hold(0.1s)={within}"), + ); + t.check( + "watchdog past ttl HOLDs", + expired, + format!("should_hold(0.3s)={expired}"), + ); + // refresh with a fresh command, then a stale (older seq) one must not refresh + wd.on_command(0.25, 200.0, 2); + wd.on_command(0.30, 200.0, 1); // older seq -> must be ignored + let held = wd.should_hold(0.50); // 0.50 - 0.25 = 250ms > 200ms ttl + t.check( + "out-of-order seq doesn't refresh deadline", + held, + format!("should_hold(0.50s)={held}"), + ); + + println!( + "\n=== UAV control + safety: {} passed, {} failed ===", + t.pass, t.fail + ); + std::process::exit(if t.fail == 0 { 0 } else { 1 }); +} diff --git a/ncp-zenoh/examples/uav_drone_loop.rs b/ncp-zenoh/examples/uav_drone_loop.rs new file mode 100644 index 0000000..1503361 --- /dev/null +++ b/ncp-zenoh/examples/uav_drone_loop.rs @@ -0,0 +1,207 @@ +//! UAV "drone-in-the-loop" driven entirely by NCP CommandFrames over Zenoh. +//! +//! This exercises the **action plane** end-to-end, true to wire v0.5: +//! controller --CommandFrame{velocity_setpoint, mode, ttl_ms, seq}--> plant +//! on the exact key CREBAIN's bridge subscribes to +//! (`engram/ncp/session//command`, realm overridable via NCP_REALM). +//! +//! The plant is a minimal quad model: it integrates the commanded velocity into a +//! position, and it ENFORCES the protocol's two safety gates locally: `mode` in +//! {hold, estop} de-energizes (zero velocity), and the `ttl_ms` watchdog fails +//! safe to HOLD when no fresh command arrives — so a dropout or an e-stop visibly +//! stops the drone. +//! +//! It prints the trajectory and also appends it as JSONL to +//! $NCP_TRAJ_OUT (default ./ncp_drone_trajectory.jsonl) so an external consumer +//! (e.g. the CREBAIN browser drone, replayed via Playwright) can fly the same path. +//! +//! Run: cargo run -p ncp-zenoh --example uav_drone_loop +//! Single in-process Zenoh session (scouting off) — no router required. + +use ncp_core::keys::Keys; +use ncp_core::{ChannelValue, CommandFrame, Mode}; +use ncp_zenoh::{ZenohBus, ZenohConfig}; +use std::io::Write; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +#[derive(Clone, Copy, Debug, Default)] +struct DroneState { + x: f64, + y: f64, + z: f64, +} + +/// Latest action-plane command + when it arrived (for the ttl watchdog). +#[derive(Clone)] +struct LatestCmd { + frame: CommandFrame, + arrived: Instant, +} + +fn vel(cmd: &CommandFrame) -> [f64; 3] { + match cmd.channels.get("velocity_setpoint") { + Some(c) if c.data.len() >= 3 => [c.data[0], c.data[1], c.data[2]], + _ => [0.0, 0.0, 0.0], + } +} + +#[tokio::main(flavor = "multi_thread", worker_threads = 2)] +async fn main() { + let realm = std::env::var("NCP_REALM").unwrap_or_else(|_| "engram/ncp".to_string()); + let session = "uav1"; + let traj_path = + std::env::var("NCP_TRAJ_OUT").unwrap_or_else(|_| "ncp_drone_trajectory.jsonl".to_string()); + + // One in-process Zenoh session; loopback delivery (no external discovery). + let mut cfg = ZenohConfig::default(); + cfg.insert_json5("scouting/multicast/enabled", "false") + .unwrap(); + cfg.insert_json5("scouting/gossip/enabled", "false") + .unwrap(); + let bus = ZenohBus::with_config(cfg, Keys::new(realm.clone())) + .await + .expect("open zenoh bus"); + println!("NCP UAV loop realm={realm} key=ncp/session/{session}/command -> {traj_path}"); + + // ---- PLANT: subscribe to the action plane ------------------------------- + let latest: Arc>> = Arc::new(Mutex::new(None)); + let sink = latest.clone(); + bus.subscribe_commands(session, move |_k, bytes| { + if let Ok(frame) = serde_json::from_slice::(&bytes) { + *sink.lock().unwrap() = Some(LatestCmd { + frame, + arrived: Instant::now(), + }); + } + }) + .await + .expect("subscribe commands"); + + // Let the subscription declaration settle before the controller publishes. + tokio::time::sleep(Duration::from_millis(300)).await; + + // ---- PLANT integrator: 50 Hz, applies safety gates ---------------------- + let traj = Arc::new(Mutex::new(Vec::::new())); + let plant_traj = traj.clone(); + let plant_latest = latest.clone(); + let plant = tokio::spawn(async move { + let dt = 0.02_f64; // 50 Hz + let mut s = DroneState::default(); + let mut t = 0.0_f64; + let steps = (8.0 / dt) as usize; // 8 s + let mut tick = Duration::from_millis(20); + loop { + tokio::time::sleep(tick).await; + t += dt; + + // Resolve the active command under the protocol's safety gates. + let (mode, mut v) = match plant_latest.lock().unwrap().clone() { + Some(c) => { + let age_ms = c.arrived.elapsed().as_secs_f64() * 1000.0; + if age_ms > c.frame.ttl_ms { + ("WATCHDOG_HOLD", [0.0, 0.0, 0.0]) // ttl expired -> fail-safe + } else { + match c.frame.mode { + Mode::Active => ("active", vel(&c.frame)), + Mode::Init => ("init", [0.0, 0.0, 0.0]), + Mode::Hold => ("hold", [0.0, 0.0, 0.0]), + Mode::Estop => ("estop", [0.0, 0.0, 0.0]), + } + } + } + None => ("no_command", [0.0, 0.0, 0.0]), + }; + // ground constraint: don't sink below 0 + if s.y <= 0.0 && v[1] < 0.0 { + v[1] = 0.0; + } + s.x += v[0] * dt; + s.y += v[1] * dt; + s.z += v[2] * dt; + if s.y < 0.0 { + s.y = 0.0; + } + + let line = format!( + "{{\"t\":{:.2},\"x\":{:.3},\"y\":{:.3},\"z\":{:.3},\"vx\":{:.3},\"vy\":{:.3},\"vz\":{:.3},\"mode\":\"{}\"}}", + t, s.x, s.y, s.z, v[0], v[1], v[2], mode + ); + plant_traj.lock().unwrap().push(line); + if (t * 50.0) as usize % 10 == 0 { + // ~5 Hz console print + println!( + " t={:4.2}s pos=({:6.2},{:6.2},{:6.2}) v=({:5.2},{:5.2},{:5.2}) [{}]", + t, s.x, s.y, s.z, v[0], v[1], v[2], mode + ); + } + if t >= 8.0 - dt || ((t * 50.0) as usize) >= steps { + break; + } + } + // ensure unused var warning-free + let _ = &mut tick; + }); + + // ---- CONTROLLER: publish a flight plan as CommandFrames ----------------- + // Plan (8 s): climb -> forward -> right -> HOLD (mode) -> dropout (ttl) -> ESTOP. + let plan: &[(f64, &str, [f64; 3])] = &[ + (1.5, "active", [0.0, 1.5, 0.0]), // climb to ~2.25 m + (1.5, "active", [2.0, 0.0, 0.0]), // fly +x + (1.5, "active", [0.0, 0.0, 2.0]), // fly +z + (1.0, "hold", [0.0, 0.0, 0.0]), // explicit HOLD (mode gate) + (1.0, "dropout", [0.0, 0.0, 0.0]), // stop publishing -> ttl watchdog HOLD + (1.0, "estop", [9.0, 9.0, 9.0]), // ESTOP: nonzero vel MUST be ignored + ]; + let hz = 20.0; + let period = Duration::from_secs_f64(1.0 / hz); + let mut seq: i64 = 0; + let mut t = 0.0_f64; + for (dur, mode_s, v) in plan { + let n = (dur * hz) as usize; + for _ in 0..n { + t += 1.0 / hz; + if *mode_s == "dropout" { + // simulate a control dropout: publish nothing; plant must hold via ttl + tokio::time::sleep(period).await; + continue; + } + seq += 1; + let mode = match *mode_s { + "active" => Mode::Active, + "hold" => Mode::Hold, + "estop" => Mode::Estop, + _ => Mode::Hold, + }; + let mut channels = ncp_core::Map::new(); + channels.insert( + "velocity_setpoint".to_string(), + ChannelValue::vec3(v[0], v[1], v[2], Some("m/s")), + ); + let cmd = CommandFrame { + seq, + t: t * 1000.0, + ttl_ms: 250.0, // ~5 missed frames at 20 Hz + mode, + channels, + ..Default::default() + }; + let payload = serde_json::to_vec(&cmd).unwrap(); + bus.publish_command(session, &payload).await.unwrap(); + tokio::time::sleep(period).await; + } + } + + plant.await.unwrap(); + + // Persist the trajectory for external replay (CREBAIN browser drone). + let lines = traj.lock().unwrap().clone(); + if let Ok(mut f) = std::fs::File::create(&traj_path) { + for l in &lines { + let _ = writeln!(f, "{l}"); + } + println!("wrote {} trajectory samples to {}", lines.len(), traj_path); + } + let _ = bus.close().await; + println!("done."); +}