diff --git a/Cargo.lock b/Cargo.lock index 696d5c2..8650cf8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3225,6 +3225,7 @@ dependencies = [ "spl-associated-token-account", "spl-token", "spl-token-client", + "thiserror 2.0.18", "tokio", "toml", "trait-variant", diff --git a/Cargo.toml b/Cargo.toml index 8977396..2a3ab77 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ borsh = "1.5.1" fern = { version = "0.6.2", features = ["colored"] } trait-variant = "0.1.2" toml = "0.8.19" +thiserror = "2.0.11" [profile.release] opt-level = 3 diff --git a/README.md b/README.md index c23d7dd..c1b296a 100644 --- a/README.md +++ b/README.md @@ -39,6 +39,11 @@ jito_url = "https://amsterdam.mainnet.block-engine.jito.wtf/api/v1/transactions? kernel_tcp_bypass = true kernel_tcp_bypass_engine = "af_xdp_or_dpdk_external" kernel_bypass_socket_path = "/tmp/slotstrike-kernel-bypass.sock" +fpga_enabled = false +fpga_vendor = "generic" +fpga_ingress_mode = "auto" +fpga_direct_device_path = "/dev/slotstrike-fpga0" +fpga_dma_socket_path = "/tmp/slotstrike-fpga-dma.sock" [telemetry] enabled = true @@ -76,7 +81,12 @@ slippage_pct = "1" - `kernel_bypass_socket_path`: unix socket for external AF_XDP/DPDK bypass feed bridge. - `fpga_enabled`: force FPGA ingress if available. - `fpga_verbose`: verbose FPGA diagnostics. -- `fpga_vendor`: vendor label. +- `fpga_vendor`: `mock_dma`, `generic`, `exanic`, `xilinx`, `amd`, `solarflare`, or `napatech`. +- `fpga_ingress_mode`: `auto`, `mock_dma`, `direct_device`, or `external_socket`. +- `fpga_direct_device_path`: direct vendor-device ingest path (character device/FIFO/file) used by `direct_device`. +- `fpga_dma_socket_path`: unix socket for `external_socket` bridge mode. +- direct frame wire format: one frame per line. Preferred JSON: `{"payload_base64":"...","hardware_timestamp_ns":123}`. + Alternate wire format: base64 payload-only line. - `replay_benchmark`: run synthetic replay instead of live strategy. - `replay_event_count`: replay event count. - `replay_burst_size`: replay burst size. @@ -98,6 +108,17 @@ slippage_pct = "1" Note: monetary/percentage rule values are strings and parsed via fixed-point/integer-safe logic to avoid float drift. +## FPGA Direct Mode + +For production users integrating vendor devices directly: + +1. Set `fpga_enabled = true`. +2. Set `fpga_ingress_mode = "direct_device"`. +3. Set `fpga_direct_device_path` to your driver/device export path. + +Startup now fails fast if the direct device path is missing or not readable. +Full operator contract and wire format: `docs/operations/fpga_direct_ingress.md`. + Multiple mint addresses: Use one `[[rules]]` block per mint address (the `rules` section is an array of tables). @@ -169,6 +190,7 @@ Useful runtime flags: - `--fpga-verbose` If you set `kernel_tcp_bypass_engine = "openonload"`, run under Onload (or equivalent preload setup) to activate acceleration. +If you set `fpga_enabled = true`, startup now fails fast unless FPGA prerequisites are available for the selected mode. Note: ingress feed transport and tx submission transport are separate concerns. Ingress can use FPGA/kernel-bypass/websocket, while tx submission uses `direct` RPC or `jito` based on `tx_submission_mode`. @@ -260,6 +282,7 @@ Tune with: - On-call playbook: `docs/runbooks/oncall.md` - Contribution guide: `CONTRIBUTING.md` - FPGA NIC deployment/PTP/rollback: `docs/operations/fpga_nic_deployment.md` +- FPGA direct ingress contract: `docs/operations/fpga_direct_ingress.md` ## Disclaimer diff --git a/docs/operations/fpga_direct_ingress.md b/docs/operations/fpga_direct_ingress.md new file mode 100644 index 0000000..9bab7b9 --- /dev/null +++ b/docs/operations/fpga_direct_ingress.md @@ -0,0 +1,73 @@ +# FPGA Direct Ingress Contract + +## Purpose + +This document defines the production contract for `fpga_ingress_mode = "direct_device"` in slotstrike. +It is intended for operators integrating a vendor FPGA/NIC driver directly into runtime ingress. + +## Runtime Configuration + +Set these `[runtime]` fields in `slotstrike.toml`: + +```toml +fpga_enabled = true +fpga_vendor = "xilinx" # generic | exanic | xilinx | amd | solarflare | napatech | mock_dma +fpga_ingress_mode = "direct_device" # auto | mock_dma | direct_device | external_socket +fpga_direct_device_path = "/dev/slotstrike-fpga0" +fpga_dma_socket_path = "/tmp/slotstrike-fpga-dma.sock" # only used by external_socket mode +``` + +`auto` resolves to: + +1. `mock_dma` backend when `fpga_vendor = "mock_dma"`. +2. `direct_device` backend for supported hardware vendors. + +## Readiness Rules (Fail Fast) + +At startup, slotstrike rejects FPGA mode when prerequisites fail: + +1. Unsupported vendor/mode combination. +2. Missing `fpga_direct_device_path`. +3. Path exists but is not a char device, FIFO, or readable file. +4. Path exists but cannot be opened. + +The process exits before wallet/RPC/rulebook initialization if ingress readiness fails. + +## Direct Device Wire Format + +The direct device reader consumes one frame per line. + +Preferred line format (JSON): + +```json +{"payload_base64":"c2lnbmF0dXJlPWFiYzEyMwo...","hardware_timestamp_ns":1730000000123456789} +``` + +Allowed alternatives: + +1. JSON with `payload` (plain UTF-8 string) instead of `payload_base64`. +2. Payload-only base64 line (no JSON envelope). + +Each payload is decoded by slotstrike's deterministic DMA parser and then passed through pool prefiltering. + +## Payload Contract + +Decoded payload must contain newline-separated fields: + +```text +signature= +has_error=<0|1|true|false> +log= +log= +... +``` + +If payload parsing fails, slotstrike drops that frame and continues reading. + +## Operational Notes + +1. Keep the device producer line-buffered to avoid burst latency from partial frames. +2. Use hardware timestamps where available and include `hardware_timestamp_ns` in the JSON envelope. +3. Prefer character device or FIFO paths over regular files in production. +4. For rollback, switch `fpga_enabled = false` and restart service. + diff --git a/docs/operations/fpga_nic_deployment.md b/docs/operations/fpga_nic_deployment.md index d1ccb3c..c10ca83 100644 --- a/docs/operations/fpga_nic_deployment.md +++ b/docs/operations/fpga_nic_deployment.md @@ -46,6 +46,9 @@ ethtool -i ```bash fpga_enabled = true fpga_vendor = "" +fpga_ingress_mode = "direct_device" +fpga_direct_device_path = "/dev/slotstrike-fpga0" +fpga_dma_socket_path = "/tmp/slotstrike-fpga-dma.sock" fpga_verbose = false ``` diff --git a/slotstrike.example.toml b/slotstrike.example.toml index c536448..5003a21 100644 --- a/slotstrike.example.toml +++ b/slotstrike.example.toml @@ -13,6 +13,12 @@ kernel_bypass_socket_path = "/tmp/slotstrike-kernel-bypass.sock" fpga_enabled = false fpga_verbose = false fpga_vendor = "generic" +# supported: "auto", "mock_dma", "direct_device", "external_socket" +fpga_ingress_mode = "auto" +# used when fpga_enabled=true and fpga_ingress_mode resolves to direct_device +fpga_direct_device_path = "/dev/slotstrike-fpga0" +# used when fpga_enabled=true and fpga_ingress_mode=external_socket +fpga_dma_socket_path = "/tmp/slotstrike-fpga-dma.sock" replay_benchmark = false replay_event_count = 50000 replay_burst_size = 512 diff --git a/src/adapters/fpga_feed.rs b/src/adapters/fpga_feed.rs index c488666..f1e27db 100644 --- a/src/adapters/fpga_feed.rs +++ b/src/adapters/fpga_feed.rs @@ -1,5 +1,19 @@ -use std::{collections::VecDeque, sync::Arc, thread}; +use std::{ + collections::VecDeque, + io::{BufRead, BufReader}, + sync::Arc, + thread, + time::Duration, +}; + +#[cfg(unix)] +use std::{ + os::unix::{fs::FileTypeExt, net::UnixStream}, + path::{Path, PathBuf}, +}; +use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64_STANDARD}; +use serde::Deserialize; use tokio::sync::mpsc; use crate::{ @@ -7,12 +21,30 @@ use crate::{ IngressMetadata, IngressSource, RawLogEvent, normalize_hardware_timestamp_ns, unix_timestamp_now_ns, }, + domain::value_objects::FpgaIngressMode, ports::fpga_feed::{FpgaFeedError, FpgaFeedPort}, slices::sniper::pool_filter::is_pool_creation_dma_payload, }; const MOCK_DMA_VENDOR: &str = "mock_dma"; const MOCK_DMA_FRAME_ENV: &str = "FPGA_DMA_MOCK_FRAME"; +const DEFAULT_FPGA_DMA_SOCKET_PATH: &str = "/tmp/slotstrike-fpga-dma.sock"; +const DEFAULT_FPGA_DIRECT_DEVICE_PATH: &str = "/dev/slotstrike-fpga0"; +const EXTERNAL_DMA_VENDORS: [&str; 6] = [ + "generic", + "exanic", + "xilinx", + "amd", + "solarflare", + "napatech", +]; + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +enum FpgaIngressBackend { + MockDmaRing, + DirectDevice, + ExternalSocket, +} #[derive(Clone, Debug)] struct DmaFrame { @@ -61,6 +93,16 @@ impl DmaRing { } } +#[derive(Clone, Debug, Deserialize)] +struct ExternalDmaFrame { + #[serde(default)] + hardware_timestamp_ns: Option, + #[serde(default)] + payload: Option, + #[serde(default)] + payload_base64: Option, +} + #[derive(Clone, Debug, Eq, PartialEq)] pub struct DecodedDmaPayload { signature: String, @@ -89,37 +131,174 @@ impl DecodedDmaPayload { pub struct FpgaFeedAdapter { vendor: String, verbose: bool, + ingress_mode: FpgaIngressMode, + direct_device_path: String, + dma_socket_path: String, mock_dma_payload: Option>, } impl FpgaFeedAdapter { - #[expect( - clippy::missing_const_for_fn, - reason = "runtime constructor takes owned String and is not used in const contexts" - )] pub fn new(vendor: String, verbose: bool) -> Self { Self { vendor, verbose, + ingress_mode: FpgaIngressMode::Auto, + direct_device_path: DEFAULT_FPGA_DIRECT_DEVICE_PATH.to_owned(), + dma_socket_path: DEFAULT_FPGA_DMA_SOCKET_PATH.to_owned(), mock_dma_payload: None, } } - fn bootstrap_dma_ring(&self) -> Result { - if !self.vendor.eq_ignore_ascii_case(MOCK_DMA_VENDOR) { - return Err(FpgaFeedError::Unavailable(format!( - "vendor '{}' does not expose FPGA DMA ring integration yet", - self.vendor - ))); + pub const fn with_ingress_mode(mut self, ingress_mode: FpgaIngressMode) -> Self { + self.ingress_mode = ingress_mode; + self + } + + pub fn with_direct_device_path(mut self, direct_device_path: String) -> Self { + self.direct_device_path = direct_device_path; + self + } + + pub fn with_dma_socket_path(mut self, dma_socket_path: String) -> Self { + self.dma_socket_path = dma_socket_path; + self + } + + pub fn dma_socket_path(&self) -> &str { + &self.dma_socket_path + } + + pub fn direct_device_path(&self) -> &str { + &self.direct_device_path + } + + fn ingress_backend(&self) -> Result { + match self.ingress_mode { + FpgaIngressMode::Auto => { + if self.vendor.eq_ignore_ascii_case(MOCK_DMA_VENDOR) { + return Ok(FpgaIngressBackend::MockDmaRing); + } + + if is_direct_dma_vendor(&self.vendor) { + return Ok(FpgaIngressBackend::DirectDevice); + } + + Err(FpgaFeedError::UnsupportedVendor { + vendor: self.vendor.clone(), + }) + } + FpgaIngressMode::MockDma => Ok(FpgaIngressBackend::MockDmaRing), + FpgaIngressMode::DirectDevice => { + if is_direct_dma_vendor(&self.vendor) { + Ok(FpgaIngressBackend::DirectDevice) + } else { + Err(FpgaFeedError::UnsupportedVendor { + vendor: self.vendor.clone(), + }) + } + } + FpgaIngressMode::ExternalSocket => { + if is_external_dma_vendor(&self.vendor) { + Ok(FpgaIngressBackend::ExternalSocket) + } else { + Err(FpgaFeedError::UnsupportedVendor { + vendor: self.vendor.clone(), + }) + } + } } + } - let payload = self.resolve_mock_dma_payload().ok_or_else(|| { - FpgaFeedError::Unavailable(format!( - "mock FPGA DMA ring requires '{}' environment payload", - MOCK_DMA_FRAME_ENV - )) + fn validate_mock_ring_ready(&self) -> Result<(), FpgaFeedError> { + if self.resolve_mock_dma_payload().is_none() { + return Err(FpgaFeedError::MissingMockPayloadEnv { + env_var: MOCK_DMA_FRAME_ENV, + }); + } + + Ok(()) + } + + #[cfg(unix)] + fn validate_direct_device_ready(&self) -> Result<(), FpgaFeedError> { + let device_path = Path::new(&self.direct_device_path); + if !device_path.exists() { + return Err(FpgaFeedError::DirectDevicePathMissing { + device_path: PathBuf::from(device_path), + }); + } + + let metadata = std::fs::metadata(device_path).map_err(|_source| { + FpgaFeedError::DirectDevicePathMissing { + device_path: PathBuf::from(device_path), + } + })?; + let file_type = metadata.file_type(); + + if !(file_type.is_char_device() || file_type.is_file() || file_type.is_fifo()) { + return Err(FpgaFeedError::DirectDeviceUnavailable { + device_path: PathBuf::from(device_path), + }); + } + + if std::fs::File::open(device_path).is_err() { + return Err(FpgaFeedError::DirectDeviceUnavailable { + device_path: PathBuf::from(device_path), + }); + } + + Ok(()) + } + + #[cfg(not(unix))] + fn validate_direct_device_ready(&self) -> Result<(), FpgaFeedError> { + Err(FpgaFeedError::DirectDeviceRequiresUnixTarget) + } + + #[cfg(unix)] + fn validate_external_socket_ready(&self) -> Result<(), FpgaFeedError> { + let socket_path = Path::new(&self.dma_socket_path); + if !socket_path.exists() { + return Err(FpgaFeedError::DmaSocketPathMissing { + socket_path: PathBuf::from(socket_path), + }); + } + + let metadata = std::fs::metadata(socket_path).map_err(|_source| { + FpgaFeedError::DmaSocketPathMissing { + socket_path: PathBuf::from(socket_path), + } })?; + if !metadata.file_type().is_socket() { + return Err(FpgaFeedError::DmaSocketUnavailable { + socket_path: PathBuf::from(socket_path), + }); + } + + if UnixStream::connect(socket_path).is_err() { + return Err(FpgaFeedError::DmaSocketUnavailable { + socket_path: PathBuf::from(socket_path), + }); + } + + Ok(()) + } + + #[cfg(not(unix))] + fn validate_external_socket_ready(&self) -> Result<(), FpgaFeedError> { + Err(FpgaFeedError::ExternalSocketRequiresUnixTarget) + } + + fn bootstrap_dma_ring(&self) -> Result { + self.validate_mock_ring_ready()?; + + let payload = + self.resolve_mock_dma_payload() + .ok_or(FpgaFeedError::MissingMockPayloadEnv { + env_var: MOCK_DMA_FRAME_ENV, + })?; + let frame = DmaFrame::new(unix_timestamp_now_ns(), payload); let mut ring = DmaRing::with_capacity(1_024); ring.push(frame); @@ -138,10 +317,260 @@ impl FpgaFeedAdapter { decode_dma_payload(frame.payload()) } + fn process_frame( + frame: &DmaFrame, + sender: &mpsc::UnboundedSender, + verbose: bool, + ) -> bool { + if verbose { + log::debug!( + "FPGA DMA RX > ts={} ns, bytes={}", + frame.hardware_timestamp_ns(), + frame.payload().len() + ); + } + + if !is_pool_creation_dma_payload(frame.payload()) { + if verbose { + log::debug!("FPGA DMA RX > frame skipped by deterministic prefilter"); + } + return true; + } + + let parsed = match Self::decode_dma_frame(frame) { + Ok(value) => value, + Err(error) => { + log::warn!("FPGA DMA decode failed: {}", error); + return true; + } + }; + + let received_timestamp_ns = unix_timestamp_now_ns(); + let normalized_timestamp_ns = normalize_hardware_timestamp_ns( + Some(frame.hardware_timestamp_ns()), + received_timestamp_ns, + ); + let event = RawLogEvent { + signature: parsed.signature, + logs: parsed.logs, + has_error: parsed.has_error, + ingress: IngressMetadata { + source: IngressSource::FpgaDma, + hardware_timestamp_ns: Some(frame.hardware_timestamp_ns()), + received_timestamp_ns, + normalized_timestamp_ns, + }, + }; + + if sender.send(event).is_err() { + log::warn!("FPGA event channel closed. Stopping DMA stream."); + return false; + } + + true + } + + #[cfg(unix)] + fn parse_external_frame(raw_frame: &str) -> Result { + let parsed = serde_json::from_str::(raw_frame) + .map_err(|_source| FpgaFeedError::ExternalFrameInvalidJson)?; + + let payload = if let Some(payload_base64) = parsed.payload_base64 { + BASE64_STANDARD + .decode(payload_base64.as_bytes()) + .map_err(|_source| FpgaFeedError::ExternalFrameInvalidBase64)? + } else if let Some(payload) = parsed.payload { + payload.into_bytes() + } else { + return Err(FpgaFeedError::ExternalFrameMissingPayload); + }; + + if payload.is_empty() { + return Err(FpgaFeedError::ExternalFrameMissingPayload); + } + + let hardware_timestamp_ns = parsed + .hardware_timestamp_ns + .unwrap_or_else(unix_timestamp_now_ns); + Ok(DmaFrame::new( + hardware_timestamp_ns, + Arc::<[u8]>::from(payload), + )) + } + + #[cfg(unix)] + fn parse_wire_frame(raw_frame: &str) -> Result { + if raw_frame.starts_with('{') { + return Self::parse_external_frame(raw_frame); + } + + if let Ok(payload) = BASE64_STANDARD.decode(raw_frame.as_bytes()) + && !payload.is_empty() + { + return Ok(DmaFrame::new( + unix_timestamp_now_ns(), + Arc::<[u8]>::from(payload), + )); + } + + Ok(DmaFrame::new( + unix_timestamp_now_ns(), + Arc::<[u8]>::from(raw_frame.as_bytes()), + )) + } + + #[cfg(unix)] + fn spawn_external_socket_stream(&self, sender: mpsc::UnboundedSender) { + let socket_path = self.dma_socket_path.clone(); + let verbose = self.verbose; + + thread::spawn(move || { + loop { + let stream = match UnixStream::connect(Path::new(&socket_path)) { + Ok(value) => value, + Err(error) => { + log::warn!( + "FPGA DMA socket reconnect failed for '{}': {}", + socket_path, + error + ); + thread::sleep(Duration::from_secs(1)); + continue; + } + }; + + log::info!( + "Listening for FPGA DMA frames via external socket {}", + socket_path + ); + + let mut reader = BufReader::new(stream); + loop { + let mut raw_frame = String::new(); + let read_len = match reader.read_line(&mut raw_frame) { + Ok(value) => value, + Err(error) => { + log::warn!("FPGA DMA socket read failed: {}", error); + break; + } + }; + + if read_len == 0 { + log::warn!("FPGA DMA socket closed by peer. Reconnecting."); + break; + } + + let payload = raw_frame.trim(); + if payload.is_empty() { + continue; + } + + let frame = match Self::parse_external_frame(payload) { + Ok(value) => value, + Err(error) => { + log::debug!("FPGA external frame parse failed: {}", error); + continue; + } + }; + + if !Self::process_frame(&frame, &sender, verbose) { + return; + } + } + + thread::sleep(Duration::from_millis(250)); + } + }); + } + + #[cfg(unix)] + fn spawn_direct_device_stream( + &self, + sender: mpsc::UnboundedSender, + ) -> Result<(), FpgaFeedError> { + self.validate_direct_device_ready()?; + + let device_path = self.direct_device_path.clone(); + let verbose = self.verbose; + + thread::spawn(move || { + loop { + let device = match std::fs::File::open(Path::new(&device_path)) { + Ok(value) => value, + Err(error) => { + log::warn!( + "FPGA direct device open failed for '{}': {}", + device_path, + error + ); + thread::sleep(Duration::from_secs(1)); + continue; + } + }; + + log::info!("Reading FPGA DMA frames from direct device {}", device_path); + + let mut reader = BufReader::new(device); + loop { + let mut raw_frame = String::new(); + let read_len = match reader.read_line(&mut raw_frame) { + Ok(value) => value, + Err(error) => { + log::warn!("FPGA direct device read failed: {}", error); + break; + } + }; + + if read_len == 0 { + log::warn!("FPGA direct device reached EOF. Reopening."); + break; + } + + let payload = raw_frame.trim(); + if payload.is_empty() { + continue; + } + + let frame = match Self::parse_wire_frame(payload) { + Ok(value) => value, + Err(error) => { + log::debug!("FPGA direct frame parse failed: {}", error); + continue; + } + }; + + if !Self::process_frame(&frame, &sender, verbose) { + return; + } + } + + thread::sleep(Duration::from_millis(250)); + } + }); + + Ok(()) + } + + #[cfg(not(unix))] + fn spawn_direct_device_stream( + &self, + _sender: mpsc::UnboundedSender, + ) -> Result<(), FpgaFeedError> { + Err(FpgaFeedError::DirectDeviceRequiresUnixTarget) + } + + #[cfg(not(unix))] + fn spawn_external_socket_stream(&self, _sender: mpsc::UnboundedSender) { + log::error!("{}", FpgaFeedError::ExternalSocketRequiresUnixTarget); + } + pub fn with_mock_dma_payload(vendor: String, verbose: bool, payload: &[u8]) -> Self { Self { vendor, verbose, + ingress_mode: FpgaIngressMode::MockDma, + direct_device_path: DEFAULT_FPGA_DIRECT_DEVICE_PATH.to_owned(), + dma_socket_path: DEFAULT_FPGA_DMA_SOCKET_PATH.to_owned(), mock_dma_payload: Some(Arc::<[u8]>::from(payload)), } } @@ -164,11 +593,22 @@ impl FpgaFeedPort for FpgaFeedAdapter { fn describe(&self) -> String { if self.verbose { format!( - "FPGA feed enabled (vendor={}, verbose=true, hardware timestamps active, DMA ring path)", - self.vendor + "FPGA feed enabled (vendor={}, mode={}, verbose=true, hardware timestamps active, direct_device_path={}, dma_socket_path={})", + self.vendor, self.ingress_mode, self.direct_device_path, self.dma_socket_path ) } else { - format!("FPGA feed enabled (vendor={}, verbose=false)", self.vendor) + format!( + "FPGA feed enabled (vendor={}, mode={}, verbose=false, direct_device_path={}, dma_socket_path={})", + self.vendor, self.ingress_mode, self.direct_device_path, self.dma_socket_path + ) + } + } + + fn validate_ready(&self) -> Result<(), FpgaFeedError> { + match self.ingress_backend()? { + FpgaIngressBackend::MockDmaRing => self.validate_mock_ring_ready(), + FpgaIngressBackend::DirectDevice => self.validate_direct_device_ready(), + FpgaIngressBackend::ExternalSocket => self.validate_external_socket_ready(), } } @@ -176,66 +616,35 @@ impl FpgaFeedPort for FpgaFeedAdapter { &self, sender: mpsc::UnboundedSender, ) -> Result<(), FpgaFeedError> { - let mut dma_ring = self.bootstrap_dma_ring()?; - let verbose = self.verbose; - - thread::spawn(move || { - while let Some(frame) = dma_ring.pop() { - if verbose { - log::debug!( - "FPGA DMA RX > ts={} ns, bytes={}", - frame.hardware_timestamp_ns(), - frame.payload().len() - ); - } - - if !is_pool_creation_dma_payload(frame.payload()) { - if verbose { - log::debug!("FPGA DMA RX > frame skipped by deterministic prefilter"); + self.validate_ready()?; + + match self.ingress_backend()? { + FpgaIngressBackend::MockDmaRing => { + let mut dma_ring = self.bootstrap_dma_ring()?; + let verbose = self.verbose; + + thread::spawn(move || { + while let Some(frame) = dma_ring.pop() { + if !Self::process_frame(&frame, &sender, verbose) { + return; + } } - continue; - } + }); - let parsed = match FpgaFeedAdapter::decode_dma_frame(&frame) { - Ok(value) => value, - Err(error) => { - log::warn!("FPGA DMA decode failed: {}", error); - continue; - } - }; - - let received_timestamp_ns = unix_timestamp_now_ns(); - let normalized_timestamp_ns = normalize_hardware_timestamp_ns( - Some(frame.hardware_timestamp_ns()), - received_timestamp_ns, - ); - let event = RawLogEvent { - signature: parsed.signature, - logs: parsed.logs, - has_error: parsed.has_error, - ingress: IngressMetadata { - source: IngressSource::FpgaDma, - hardware_timestamp_ns: Some(frame.hardware_timestamp_ns()), - received_timestamp_ns, - normalized_timestamp_ns, - }, - }; - - if sender.send(event).is_err() { - log::warn!("FPGA event channel closed. Stopping DMA stream."); - return; - } + Ok(()) } - }); - - Ok(()) + FpgaIngressBackend::DirectDevice => self.spawn_direct_device_stream(sender), + FpgaIngressBackend::ExternalSocket => { + self.spawn_external_socket_stream(sender); + Ok(()) + } + } } } pub fn decode_dma_payload(payload: &[u8]) -> Result { - let payload = std::str::from_utf8(payload).map_err(|_parse_error| { - FpgaFeedError::InvalidFrame("FPGA DMA payload is not valid UTF-8".to_owned()) - })?; + let payload = + std::str::from_utf8(payload).map_err(|_parse_error| FpgaFeedError::InvalidPayloadUtf8)?; let mut signature: Option = None; let mut logs = Vec::new(); @@ -244,9 +653,7 @@ pub fn decode_dma_payload(payload: &[u8]) -> Result Result Result Option { None } +fn is_external_dma_vendor(vendor: &str) -> bool { + EXTERNAL_DMA_VENDORS + .iter() + .any(|supported_vendor| vendor.eq_ignore_ascii_case(supported_vendor)) +} + +fn is_direct_dma_vendor(vendor: &str) -> bool { + EXTERNAL_DMA_VENDORS + .iter() + .any(|supported_vendor| vendor.eq_ignore_ascii_case(supported_vendor)) +} + #[cfg(test)] mod tests { use std::time::Duration; + #[cfg(unix)] + use std::{ + io::Write, + os::unix::net::UnixListener, + path::PathBuf, + sync::Arc, + time::{SystemTime, UNIX_EPOCH}, + }; + use super::*; use crate::adapters::raydium::{RAYDIUM_STANDARD_AMM_PROGRAM_ID, RAYDIUM_V4_PROGRAM_ID}; use crate::ports::fpga_feed::FpgaFeedError; @@ -319,6 +741,8 @@ mod tests { assert!(adapter.describe().contains("exanic")); assert!(adapter.verbose()); assert_eq!(adapter.vendor(), "exanic"); + assert!(adapter.describe().contains(adapter.direct_device_path())); + assert!(adapter.describe().contains(adapter.dma_socket_path())); } #[test] @@ -348,15 +772,46 @@ mod tests { } #[test] - fn non_mock_vendor_reports_unavailable_dma_ring() { - let adapter = FpgaFeedAdapter::new("exanic".to_owned(), false); + fn unsupported_vendor_reports_error() { + let adapter = FpgaFeedAdapter::new("unknown_vendor".to_owned(), false); let (sender, _receiver) = mpsc::unbounded_channel(); let result = adapter.spawn_stream(sender); assert!(result.is_err()); if let Err(error) = result { - assert!(matches!(error, FpgaFeedError::Unavailable(_))); + assert!(matches!(error, FpgaFeedError::UnsupportedVendor { .. })); + } + } + + #[test] + fn direct_vendor_requires_device_path() { + let adapter = FpgaFeedAdapter::new("exanic".to_owned(), false) + .with_ingress_mode(FpgaIngressMode::DirectDevice) + .with_direct_device_path("/tmp/slotstrike-missing-fpga-device".to_owned()); + + let ready = adapter.validate_ready(); + assert!(ready.is_err()); + + if let Err(error) = ready { + assert!(matches!( + error, + FpgaFeedError::DirectDevicePathMissing { .. } + )); + } + } + + #[test] + fn external_vendor_requires_socket_path() { + let adapter = FpgaFeedAdapter::new("exanic".to_owned(), false) + .with_ingress_mode(FpgaIngressMode::ExternalSocket) + .with_dma_socket_path("/tmp/slotstrike-missing-fpga.sock".to_owned()); + + let ready = adapter.validate_ready(); + assert!(ready.is_err()); + + if let Err(error) = ready { + assert!(matches!(error, FpgaFeedError::DmaSocketPathMissing { .. })); } } @@ -391,6 +846,108 @@ mod tests { } } + #[cfg(unix)] + #[tokio::test] + async fn external_socket_vendor_streams_events() { + let socket_path = unique_socket_path("slotstrike-fpga-test"); + let _remove_before = std::fs::remove_file(&socket_path); + + let listener = UnixListener::bind(&socket_path); + assert!(listener.is_ok()); + let listener = if let Ok(listener) = listener { + listener + } else { + return; + }; + + let frame_payload = format!( + "signature=external123\nhas_error=0\nlog=Program {}\nlog=Program log: initialize2", + RAYDIUM_V4_PROGRAM_ID + ); + let encoded_payload = BASE64_STANDARD.encode(frame_payload.as_bytes()); + let frame_json = format!( + "{{\"hardware_timestamp_ns\":123456789,\"payload_base64\":\"{}\"}}\n", + encoded_payload + ); + + let thread_frame_json = frame_json.clone(); + let server_thread = thread::spawn(move || { + for accept_attempt in 0..2 { + let accepted = listener.accept(); + if let Ok((mut stream, _address)) = accepted + && accept_attempt == 1 + { + let _write_result = stream.write_all(thread_frame_json.as_bytes()); + let _flush_result = stream.flush(); + } + } + }); + + let adapter = FpgaFeedAdapter::new("xilinx".to_owned(), false) + .with_ingress_mode(FpgaIngressMode::ExternalSocket) + .with_dma_socket_path(socket_path.to_string_lossy().to_string()); + let (sender, mut receiver) = mpsc::unbounded_channel(); + + let spawned = adapter.spawn_stream(sender); + assert!(spawned.is_ok()); + + let received = tokio::time::timeout(Duration::from_secs(2), receiver.recv()).await; + assert!(received.is_ok()); + + if let Ok(event) = received { + assert!(event.is_some()); + + if let Some(event) = event { + assert_eq!(event.signature, "external123"); + assert!(!event.has_error); + assert_eq!(event.ingress.source.as_str(), "fpga_dma"); + assert_eq!(event.ingress.hardware_timestamp_ns, Some(123456789)); + } + } + + let join_result = server_thread.join(); + assert!(join_result.is_ok()); + + let _remove_after = std::fs::remove_file(&socket_path); + } + + #[cfg(unix)] + #[tokio::test] + async fn direct_device_vendor_streams_events() { + let device_path = unique_device_file_path("slotstrike-fpga-direct-device-test"); + let payload = format!( + "signature=direct123\nhas_error=0\nlog=Program {}\nlog=Program log: initialize2", + RAYDIUM_V4_PROGRAM_ID + ); + let encoded_payload = BASE64_STANDARD.encode(payload.as_bytes()); + let frame_line = format!("{}\n", encoded_payload); + let write_result = std::fs::write(&device_path, frame_line.as_bytes()); + assert!(write_result.is_ok()); + + let adapter = FpgaFeedAdapter::new("xilinx".to_owned(), false) + .with_ingress_mode(FpgaIngressMode::DirectDevice) + .with_direct_device_path(device_path.to_string_lossy().to_string()); + let (sender, mut receiver) = mpsc::unbounded_channel(); + + let spawned = adapter.spawn_stream(sender); + assert!(spawned.is_ok()); + + let received = tokio::time::timeout(Duration::from_secs(2), receiver.recv()).await; + assert!(received.is_ok()); + + if let Ok(event) = received { + assert!(event.is_some()); + + if let Some(event) = event { + assert_eq!(event.signature, "direct123"); + assert!(!event.has_error); + assert_eq!(event.ingress.source.as_str(), "fpga_dma"); + } + } + + let _remove_after = std::fs::remove_file(&device_path); + } + #[test] fn deterministic_prefilter_accepts_pool_creation() { let payload = format!( @@ -408,4 +965,24 @@ mod tests { ); assert!(!is_pool_creation_dma_payload(payload.as_bytes())); } + + #[cfg(unix)] + fn unique_socket_path(prefix: &str) -> PathBuf { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .ok() + .map_or(0, |value| value.as_nanos()); + let file_name = format!("{}-{}-{}.sock", prefix, std::process::id(), nanos); + PathBuf::from("/tmp").join(file_name) + } + + #[cfg(unix)] + fn unique_device_file_path(prefix: &str) -> PathBuf { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .ok() + .map_or(0, |value| value.as_nanos()); + let file_name = format!("{}-{}-{}.txt", prefix, std::process::id(), nanos); + PathBuf::from("/tmp").join(file_name) + } } diff --git a/src/adapters/network_path.rs b/src/adapters/network_path.rs index 3cbc843..3ca1074 100644 --- a/src/adapters/network_path.rs +++ b/src/adapters/network_path.rs @@ -62,8 +62,8 @@ mod tests { domain::{ settings::{NetworkStackMode, RuntimeSettings}, value_objects::{ - KernelBypassEngine, NonEmptyText, PriorityFeesMicrolamports, ReplayBurstSize, - ReplayEventCount, TxSubmissionMode, + FpgaIngressMode, KernelBypassEngine, NonEmptyText, PriorityFeesMicrolamports, + ReplayBurstSize, ReplayEventCount, TxSubmissionMode, }, }, ports::network_path::NetworkPathPort, @@ -86,6 +86,11 @@ mod tests { fpga_enabled: mode == NetworkStackMode::Fpga, fpga_verbose: false, fpga_vendor: NonEmptyText::try_from("exanic".to_owned())?, + fpga_ingress_mode: FpgaIngressMode::DirectDevice, + fpga_direct_device_path: NonEmptyText::try_from("/dev/slotstrike-fpga0".to_owned())?, + fpga_dma_socket_path: NonEmptyText::try_from( + "/tmp/slotstrike-fpga-dma.sock".to_owned(), + )?, network_stack_mode: mode, run_replay_benchmark: false, replay_event_count: ReplayEventCount::new(50_000)?, diff --git a/src/adapters/solana_logs.rs b/src/adapters/solana_logs.rs index 45ef2b3..9ebab27 100644 --- a/src/adapters/solana_logs.rs +++ b/src/adapters/solana_logs.rs @@ -6,6 +6,8 @@ use std::{ #[cfg(unix)] use std::os::unix::net::UnixStream; +#[cfg(unix)] +use std::path::{Path, PathBuf}; use serde::Deserialize; use solana_client::{ @@ -67,23 +69,37 @@ impl SolanaPubsubLogStream { } } - fn validate_ready(&self) -> Result<(), LogStreamError> { + fn validate_startup_ready(&self) -> Result<(), LogStreamError> { if !self.wss_url.starts_with("wss://") && !self.wss_url.starts_with("ws://") { - return Err(LogStreamError::Unavailable(format!( - "invalid websocket url '{}' for {} path", - self.wss_url, self.path_name - ))); + return Err(LogStreamError::InvalidWebsocketUrl { + url: self.wss_url.clone(), + path: self.path_name, + }); } if self.source == IngressSource::KernelBypass && self.kernel_bypass_engine.is_none() { - return Err(LogStreamError::Unavailable( - "kernel bypass path missing engine selection".to_owned(), - )); + return Err(LogStreamError::MissingKernelBypassEngine); + } + + if self.requires_openonload_runtime() && !openonload_runtime_ready() { + return Err(LogStreamError::OpenOnloadRuntimeInactive); + } + + if self.source == IngressSource::KernelBypass && self.prefers_external_bypass_feed() { + self.validate_external_bypass_socket_ready()?; } Ok(()) } + const fn requires_openonload_runtime(&self) -> bool { + matches!(self.source, IngressSource::KernelBypass) + && matches!( + self.kernel_bypass_engine, + Some(KernelBypassEngine::OpenOnload) + ) + } + const fn prefers_external_bypass_feed(&self) -> bool { matches!( self.kernel_bypass_engine, @@ -95,6 +111,36 @@ impl SolanaPubsubLogStream { ) } + #[cfg(unix)] + fn validate_external_bypass_socket_ready(&self) -> Result<(), LogStreamError> { + let socket_path = self.kernel_bypass_socket_path()?; + let socket_path_ref = Path::new(&socket_path); + if !socket_path_ref.exists() { + return Err(LogStreamError::KernelBypassSocketUnavailable { + socket_path: PathBuf::from(socket_path_ref), + }); + } + + if UnixStream::connect(socket_path_ref).is_err() { + return Err(LogStreamError::KernelBypassSocketUnavailable { + socket_path: PathBuf::from(socket_path_ref), + }); + } + + Ok(()) + } + + #[cfg(not(unix))] + fn validate_external_bypass_socket_ready(&self) -> Result<(), LogStreamError> { + Err(LogStreamError::ExternalBypassRequiresUnixTarget) + } + + fn kernel_bypass_socket_path(&self) -> Result { + self.kernel_bypass_socket_path + .clone() + .ok_or(LogStreamError::MissingKernelBypassSocketPath) + } + fn spawn_websocket_stream(&self, sender: mpsc::UnboundedSender) { let wss_url = self.wss_url.clone(); let source = self.source; @@ -147,16 +193,8 @@ impl SolanaPubsubLogStream { &self, sender: mpsc::UnboundedSender, ) -> Result<(), LogStreamError> { - let socket_path = self.kernel_bypass_socket_path.clone().ok_or_else(|| { - LogStreamError::Unavailable("Missing kernel bypass socket path".to_owned()) - })?; - - if UnixStream::connect(&socket_path).is_err() { - return Err(LogStreamError::Unavailable(format!( - "kernel bypass socket unavailable at '{}' (expected external AF_XDP/DPDK bridge)", - socket_path - ))); - } + let socket_path = self.kernel_bypass_socket_path()?; + self.validate_external_bypass_socket_ready()?; thread::spawn(move || { loop { @@ -240,9 +278,7 @@ impl SolanaPubsubLogStream { &self, _sender: mpsc::UnboundedSender, ) -> Result<(), LogStreamError> { - Err(LogStreamError::Unavailable( - "kernel bypass external socket mode requires unix target".to_owned(), - )) + Err(LogStreamError::ExternalBypassRequiresUnixTarget) } } @@ -251,11 +287,15 @@ impl LogStreamPort for SolanaPubsubLogStream { self.path_name } + fn validate_ready(&self) -> Result<(), LogStreamError> { + self.validate_startup_ready() + } + fn spawn_stream( &self, sender: mpsc::UnboundedSender, ) -> Result<(), LogStreamError> { - self.validate_ready()?; + self.validate_startup_ready()?; if self.source == IngressSource::KernelBypass && self.prefers_external_bypass_feed() { return self.spawn_external_bypass_stream(sender); @@ -266,9 +306,32 @@ impl LogStreamPort for SolanaPubsubLogStream { } } +fn openonload_runtime_ready() -> bool { + openonload_runtime_ready_with(openonload_device_available(), openonload_preload_active()) +} + +const fn openonload_runtime_ready_with(device_available: bool, preload_active: bool) -> bool { + device_available && preload_active +} + +fn openonload_device_available() -> bool { + std::path::Path::new("/dev/onload").exists() +} + +fn openonload_preload_active() -> bool { + if let Ok(preload) = std::env::var("LD_PRELOAD") { + let normalized = preload.to_ascii_lowercase(); + if normalized.contains("libonload") || normalized.contains("libcitransport") { + return true; + } + } + + std::env::var("ONLOAD_PRELOAD").is_ok() +} + #[cfg(test)] mod tests { - use super::SolanaPubsubLogStream; + use super::{SolanaPubsubLogStream, openonload_runtime_ready_with}; use crate::domain::value_objects::KernelBypassEngine; use crate::ports::log_stream::{LogStreamError, LogStreamPort}; use tokio::sync::mpsc; @@ -285,7 +348,7 @@ mod tests { let started = stream.spawn_stream(sender); assert!(started.is_err()); if let Err(error) = started { - assert!(matches!(error, LogStreamError::Unavailable(_))); + assert!(matches!(error, LogStreamError::InvalidWebsocketUrl { .. })); } } @@ -301,4 +364,32 @@ mod tests { assert_eq!(kernel_stream.path_name(), "kernel_bypass"); assert_eq!(standard_stream.path_name(), "standard_tcp"); } + + #[test] + fn openonload_kernel_bypass_startup_reflects_runtime_state() { + let stream = SolanaPubsubLogStream::kernel_bypass( + "wss://example".to_owned(), + KernelBypassEngine::OpenOnload, + "/tmp/slotstrike-kernel-bypass.sock".to_owned(), + ); + let (sender, _receiver) = mpsc::unbounded_channel(); + + let started = stream.spawn_stream(sender); + if super::openonload_runtime_ready() { + assert!(started.is_ok()); + } else { + assert!(matches!( + started, + Err(LogStreamError::OpenOnloadRuntimeInactive) + )); + } + } + + #[test] + fn openonload_runtime_requires_device_and_preload() { + assert!(!openonload_runtime_ready_with(false, false)); + assert!(!openonload_runtime_ready_with(true, false)); + assert!(!openonload_runtime_ready_with(false, true)); + assert!(openonload_runtime_ready_with(true, true)); + } } diff --git a/src/adapters/toml_rules.rs b/src/adapters/toml_rules.rs index b8c552f..a93e99e 100644 --- a/src/adapters/toml_rules.rs +++ b/src/adapters/toml_rules.rs @@ -116,12 +116,8 @@ impl RuleRepository for TomlRuleRepository { file_type: &str, initial: bool, ) -> Result, io::Error> { - let config = load_sniper_config_file(&self.config_path).map_err(|message| { - io::Error::new( - io::ErrorKind::InvalidData, - format!("config parse failed: {}", message), - ) - })?; + let config = load_sniper_config_file(&self.config_path) + .map_err(|source| io::Error::new(io::ErrorKind::InvalidData, source))?; let expected_kind = match file_type { "MINTS" => RuleKind::Mint, diff --git a/src/app/bootstrap.rs b/src/app/bootstrap.rs index df97227..bc77e25 100644 --- a/src/app/bootstrap.rs +++ b/src/app/bootstrap.rs @@ -1,4 +1,4 @@ -use std::{fmt::Write as _, io::IsTerminal, sync::Arc}; +use std::{fmt::Write as _, io::IsTerminal, path::PathBuf, sync::Arc}; use log::LevelFilter; use solana_client::nonblocking::rpc_client::RpcClient; @@ -15,9 +15,19 @@ use crate::{ solana_logs::SolanaPubsubLogStream, toml_rules::TomlRuleRepository, }, app::{ - context::ExecutionContext, logging::init_logging, systemd::maybe_handle_service_command, + context::ExecutionContext, + errors::{ + AppError, IngressStartupError, KeypairLoadError, RulebookLoadError, WalletBalanceError, + }, + logging::init_logging, + readiness::validate_ingress_readiness, + systemd::maybe_handle_service_command, + }, + domain::{ + events::RawLogEvent, + settings::{NetworkStackMode, RuntimeSettings}, + value_objects::sol_amount::Lamports, }, - domain::{settings::RuntimeSettings, value_objects::sol_amount::Lamports}, ports::{fpga_feed::FpgaFeedPort, log_stream::LogStreamPort, network_path::NetworkPathPort}, slices::{ config_sync::service::{ConfigSyncService, load_rulebook}, @@ -36,7 +46,7 @@ pub async fn run() { } } -async fn run_inner() -> Result<(), String> { +async fn run_inner() -> Result<(), AppError> { let args = std::env::args().skip(1).collect::>(); if maybe_handle_service_command(&args)? { return Ok(()); @@ -59,19 +69,36 @@ async fn run_inner() -> Result<(), String> { return Ok(()); } - let keypair = Arc::new(load_keypair(&settings.keypair_path).await?); - let rpc = Arc::new(RpcClient::new(settings.rpc_url.clone())); - let network_path = NetworkPathProfile::from_settings(&settings); let fpga_feed = FpgaFeedAdapter::new( settings.fpga_vendor.as_str().to_owned(), settings.fpga_verbose, + ) + .with_ingress_mode(settings.fpga_ingress_mode) + .with_direct_device_path(settings.fpga_direct_device_path.as_str().to_owned()) + .with_dma_socket_path(settings.fpga_dma_socket_path.as_str().to_owned()); + let kernel_bypass_stream = SolanaPubsubLogStream::kernel_bypass( + settings.wss_url.as_str().to_owned(), + settings.kernel_tcp_bypass_engine, + settings.kernel_bypass_socket_path.as_str().to_owned(), ); + let standard_tcp_stream = + SolanaPubsubLogStream::standard_tcp(settings.wss_url.as_str().to_owned()); + + validate_ingress_readiness( + &network_path, + &fpga_feed, + &kernel_bypass_stream, + &standard_tcp_stream, + )?; + + let keypair = Arc::new(load_keypair(&settings.keypair_path).await?); + let rpc = Arc::new(RpcClient::new(settings.rpc_url.clone())); let repository = Arc::new(TomlRuleRepository::new(settings.config_path.clone())); let initial_rulebook = load_rulebook(repository.as_ref(), true) .await - .map_err(|error| format!("Failed to read rules: {}", error))?; + .map_err(|source| RulebookLoadError::Read { source })?; let (rulebook_tx, rulebook_rx) = watch::channel(Arc::clone(&initial_rulebook)); @@ -86,13 +113,62 @@ async fn run_inner() -> Result<(), String> { .get_balance(&keypair.pubkey()) .await .map(|lamports| Lamports::new(lamports).as_sol_string()) - .map_err(|error| format!("Failed to read wallet balance: {}", error))?; + .map_err(|source| WalletBalanceError::Read { source })?; let mint_rules = initial_rulebook.mint_log_lines(); let deployer_rules = initial_rulebook.deployer_log_lines(); - let mints_string = format_rules(&mint_rules); - let deployers_string = format_rules(&deployer_rules); + log_runtime_settings( + &settings, + &network_path, + &keypair.pubkey(), + &balance, + &mint_rules, + &deployer_rules, + ); + maybe_log_fpga_feed(&settings, &fpga_feed); + let context = Arc::new(ExecutionContext { + priority_fees: settings.priority_fees.as_u64(), + rpc, + keypair, + tx_submission_mode: settings.tx_submission_mode, + jito_url: Arc::new(settings.jito_url.clone()), + }); + + let telemetry = Arc::new(if settings.telemetry_enabled { + LatencyTelemetry::new(settings.latency_sample_capacity, settings.latency_slo_ns) + } else { + LatencyTelemetry::disabled() + }); + Arc::clone(&telemetry).spawn_reporter(std::time::Duration::from_secs( + settings.latency_report_period_secs, + )); + + let (events_tx, events_rx) = mpsc::unbounded_channel(); + start_ingress_stream( + &network_path, + &fpga_feed, + &kernel_bypass_stream, + &standard_tcp_stream, + events_tx, + )?; + + let engine = SniperEngine::new(context, events_rx, rulebook_rx, telemetry); + engine.run().await; + + Ok(()) +} + +fn log_runtime_settings( + settings: &RuntimeSettings, + network_path: &NetworkPathProfile, + wallet: &solana_sdk::pubkey::Pubkey, + balance: &str, + mint_rules: &[String], + deployer_rules: &[String], +) { + let mints_string = format_rules(mint_rules); + let deployers_string = format_rules(deployer_rules); log::info!( "Settings: \ \n\tWallet: {}\ @@ -110,8 +186,11 @@ async fn run_inner() -> Result<(), String> { \n\tNETWORK_PATH: {}\ \n\tKERNEL_TCP_BYPASS: {}\ \n\tFPGA_ENABLED: {}\ +\n\tFPGA_INGRESS_MODE: {}\ +\n\tFPGA_DIRECT_DEVICE_PATH: {}\ +\n\tFPGA_DMA_SOCKET_PATH: {}\ \n\tTELEMETRY_ENABLED: {}", - keypair.pubkey(), + wallet, balance, settings.priority_fees.as_u64(), mints_string, @@ -124,9 +203,14 @@ async fn run_inner() -> Result<(), String> { network_path.describe(), network_path.kernel_bypass_enabled(), network_path.fpga_enabled(), + settings.fpga_ingress_mode.as_str(), + settings.fpga_direct_device_path.as_str(), + settings.fpga_dma_socket_path.as_str(), settings.telemetry_enabled, ); +} +fn maybe_log_fpga_feed(settings: &RuntimeSettings, fpga_feed: &FpgaFeedAdapter) { if settings.fpga_enabled { log::info!( "FPGA_FEED: {} (vendor={}, verbose={})", @@ -135,113 +219,76 @@ async fn run_inner() -> Result<(), String> { fpga_feed.verbose(), ); } +} - let context = Arc::new(ExecutionContext { - priority_fees: settings.priority_fees.as_u64(), - rpc, - keypair, - tx_submission_mode: settings.tx_submission_mode, - jito_url: Arc::new(settings.jito_url.clone()), - }); - - let telemetry = Arc::new(if settings.telemetry_enabled { - LatencyTelemetry::new(settings.latency_sample_capacity, settings.latency_slo_ns) - } else { - LatencyTelemetry::disabled() - }); - Arc::clone(&telemetry).spawn_reporter(std::time::Duration::from_secs( - settings.latency_report_period_secs, - )); - - let (events_tx, events_rx) = mpsc::unbounded_channel(); - let kernel_bypass_stream = SolanaPubsubLogStream::kernel_bypass( - settings.wss_url.as_str().to_owned(), - settings.kernel_tcp_bypass_engine, - settings.kernel_bypass_socket_path.as_str().to_owned(), - ); - let standard_tcp_stream = - SolanaPubsubLogStream::standard_tcp(settings.wss_url.as_str().to_owned()); - - let mut stream_started = false; - - if settings.fpga_enabled { - match fpga_feed.spawn_stream(events_tx.clone()) { - Ok(()) => { - log::info!( - "Ingress path selected: FPGA DMA ring -> strategy events (zero-copy frame parse)" - ); - stream_started = true; - } - Err(error) => { - log::warn!( - "FPGA ingress unavailable: {}. Continuing failover chain.", - error - ); - } +fn start_ingress_stream( + network_path: &NetworkPathProfile, + fpga_feed: &FpgaFeedAdapter, + kernel_bypass_stream: &SolanaPubsubLogStream, + standard_tcp_stream: &SolanaPubsubLogStream, + events_tx: mpsc::UnboundedSender, +) -> Result<(), IngressStartupError> { + match network_path.mode() { + NetworkStackMode::Fpga => { + fpga_feed + .spawn_stream(events_tx) + .map_err(|source| IngressStartupError::Fpga { source })?; + log::info!( + "Ingress path selected: FPGA DMA ring -> strategy events (zero-copy frame parse)" + ); } - } - - if !stream_started && network_path.kernel_bypass_enabled() { - match kernel_bypass_stream.spawn_stream(events_tx.clone()) { - Ok(()) => { - log::info!( - "Ingress path selected: {} -> strategy events", - kernel_bypass_stream.path_name() - ); - stream_started = true; - } - Err(error) => { - log::warn!( - "Kernel bypass ingress unavailable: {}. Falling back to standard tcp path.", - error - ); - } + NetworkStackMode::KernelBypass => { + kernel_bypass_stream + .spawn_stream(events_tx) + .map_err(|source| IngressStartupError::KernelBypass { source })?; + log::info!( + "Ingress path selected: {} -> strategy events", + kernel_bypass_stream.path_name() + ); + } + NetworkStackMode::StandardTcp => { + standard_tcp_stream + .spawn_stream(events_tx) + .map_err(|source| IngressStartupError::StandardTcp { source })?; + log::info!( + "Ingress path selected: {} -> strategy events", + standard_tcp_stream.path_name() + ); } } - if !stream_started { - standard_tcp_stream - .spawn_stream(events_tx) - .map_err(|error| { - format!( - "Failed to start {} ingress: {}", - standard_tcp_stream.path_name(), - error - ) - })?; - log::info!( - "Ingress path selected: {} -> strategy events", - standard_tcp_stream.path_name() - ); - stream_started = true; - } - - if !stream_started { - return Err("No ingress path could be started".to_owned()); - } - - let engine = SniperEngine::new(context, events_rx, rulebook_rx, telemetry); - engine.run().await; - Ok(()) } -async fn load_keypair(path: &str) -> Result { +async fn load_keypair(path: &str) -> Result { + let keypair_path = PathBuf::from(path); let mut keypair_file = File::open(path) .await - .map_err(|error| format!("Failed to open keypair file: {}", error))?; + .map_err(|source| KeypairLoadError::Open { + path: keypair_path.clone(), + source, + })?; let mut contents = String::new(); keypair_file .read_to_string(&mut contents) .await - .map_err(|error| format!("Failed to read keypair file: {}", error))?; - - let keypair_bytes = serde_json::from_str::>(&contents) - .map_err(|error| format!("Failed to parse keypair json: {}", error))?; + .map_err(|source| KeypairLoadError::Read { + path: keypair_path.clone(), + source, + })?; + + let keypair_bytes = serde_json::from_str::>(&contents).map_err(|source| { + KeypairLoadError::ParseJson { + path: keypair_path.clone(), + source, + } + })?; - Keypair::try_from(keypair_bytes.as_slice()) - .map_err(|error| format!("Invalid keypair bytes: {}", error)) + Keypair::try_from(keypair_bytes.as_slice()).map_err(|source| KeypairLoadError::InvalidBytes { + path: keypair_path, + source: Box::new(source), + }) } fn resolve_level_filter() -> LevelFilter { diff --git a/src/app/errors.rs b/src/app/errors.rs new file mode 100644 index 0000000..906fa82 --- /dev/null +++ b/src/app/errors.rs @@ -0,0 +1,115 @@ +use std::path::PathBuf; + +use solana_client::client_error::ClientError; +use thiserror::Error; + +use crate::{ + app::{logging::LoggingError, systemd::SystemdError}, + domain::settings::SettingsError, + ports::fpga_feed::FpgaFeedError, + ports::log_stream::LogStreamError, +}; + +#[derive(Debug, Error)] +pub enum AppError { + #[error(transparent)] + ServiceCommand(#[from] SystemdError), + #[error(transparent)] + Logging(#[from] LoggingError), + #[error(transparent)] + Settings(#[from] SettingsError), + #[error(transparent)] + Keypair(#[from] KeypairLoadError), + #[error(transparent)] + Rulebook(#[from] RulebookLoadError), + #[error(transparent)] + WalletBalance(#[from] WalletBalanceError), + #[error(transparent)] + IngressReadiness(#[from] IngressReadinessError), + #[error(transparent)] + IngressStartup(#[from] IngressStartupError), +} + +#[derive(Debug, Error)] +pub enum KeypairLoadError { + #[error("failed to open keypair file at {path}")] + Open { + path: PathBuf, + #[source] + source: std::io::Error, + }, + #[error("failed to read keypair file at {path}")] + Read { + path: PathBuf, + #[source] + source: std::io::Error, + }, + #[error("failed to parse keypair json at {path}")] + ParseJson { + path: PathBuf, + #[source] + source: serde_json::Error, + }, + #[error("invalid keypair bytes at {path}")] + InvalidBytes { + path: PathBuf, + #[source] + source: Box, + }, +} + +#[derive(Debug, Error)] +pub enum RulebookLoadError { + #[error("failed to read rules at startup")] + Read { + #[source] + source: std::io::Error, + }, +} + +#[derive(Debug, Error)] +pub enum WalletBalanceError { + #[error("failed to read wallet balance")] + Read { + #[source] + source: ClientError, + }, +} + +#[derive(Debug, Error)] +pub enum IngressReadinessError { + #[error("fpga ingress prerequisites are not satisfied")] + Fpga { + #[source] + source: FpgaFeedError, + }, + #[error("kernel bypass ingress prerequisites are not satisfied")] + KernelBypass { + #[source] + source: LogStreamError, + }, + #[error("standard tcp ingress prerequisites are not satisfied")] + StandardTcp { + #[source] + source: LogStreamError, + }, +} + +#[derive(Debug, Error)] +pub enum IngressStartupError { + #[error("failed to start fpga ingress path")] + Fpga { + #[source] + source: FpgaFeedError, + }, + #[error("failed to start kernel bypass ingress path")] + KernelBypass { + #[source] + source: LogStreamError, + }, + #[error("failed to start standard tcp ingress path")] + StandardTcp { + #[source] + source: LogStreamError, + }, +} diff --git a/src/app/logging.rs b/src/app/logging.rs index 56b43ad..1060f1e 100644 --- a/src/app/logging.rs +++ b/src/app/logging.rs @@ -9,6 +9,7 @@ use std::{ use chrono::Local; use colored::Colorize; use log::{Level, LevelFilter, Metadata, Record}; +use thiserror::Error; use tokio::fs; #[derive(Debug)] @@ -120,11 +121,20 @@ impl log::Log for AsyncLogger { fn flush(&self) {} } -pub async fn init_logging(level_filter: LevelFilter) -> Result<(), String> { +#[derive(Debug, Error)] +pub enum LoggingError { + #[error("failed to create log directory")] + CreateLogDirectory { + #[source] + source: std::io::Error, + }, +} + +pub async fn init_logging(level_filter: LevelFilter) -> Result<(), LoggingError> { let log_dir = PathBuf::from("log"); fs::create_dir_all(&log_dir) .await - .map_err(|error| format!("Failed to create log directory: {}", error))?; + .map_err(|source| LoggingError::CreateLogDirectory { source })?; let log_path = log_dir.join("output.ans"); let (sender, receiver) = mpsc::channel::(); diff --git a/src/app/mod.rs b/src/app/mod.rs index 0cfb66f..7f4d5f3 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -1,4 +1,6 @@ pub mod bootstrap; pub mod context; +pub mod errors; pub mod logging; +pub mod readiness; pub mod systemd; diff --git a/src/app/readiness.rs b/src/app/readiness.rs new file mode 100644 index 0000000..fa9e1d4 --- /dev/null +++ b/src/app/readiness.rs @@ -0,0 +1,166 @@ +use crate::{ + app::errors::IngressReadinessError, + ports::{fpga_feed::FpgaFeedPort, log_stream::LogStreamPort, network_path::NetworkPathPort}, +}; + +pub fn validate_ingress_readiness( + network_path: &dyn NetworkPathPort, + fpga_feed: &dyn FpgaFeedPort, + kernel_bypass_stream: &dyn LogStreamPort, + standard_tcp_stream: &dyn LogStreamPort, +) -> Result<(), IngressReadinessError> { + match network_path.mode() { + crate::domain::settings::NetworkStackMode::Fpga => fpga_feed + .validate_ready() + .map_err(|source| IngressReadinessError::Fpga { source }), + crate::domain::settings::NetworkStackMode::KernelBypass => kernel_bypass_stream + .validate_ready() + .map_err(|source| IngressReadinessError::KernelBypass { source }), + crate::domain::settings::NetworkStackMode::StandardTcp => standard_tcp_stream + .validate_ready() + .map_err(|source| IngressReadinessError::StandardTcp { source }), + } +} + +#[cfg(test)] +mod tests { + use crate::{ + app::readiness::validate_ingress_readiness, + domain::{events::RawLogEvent, settings::NetworkStackMode}, + ports::{ + fpga_feed::{FpgaFeedError, FpgaFeedPort}, + log_stream::{LogStreamError, LogStreamPort}, + network_path::NetworkPathPort, + }, + }; + use tokio::sync::mpsc; + + #[derive(Clone, Debug)] + struct FakeNetworkPath { + mode: NetworkStackMode, + } + + impl NetworkPathPort for FakeNetworkPath { + fn mode(&self) -> NetworkStackMode { + self.mode + } + + fn describe(&self) -> String { + "fake".to_owned() + } + + fn kernel_bypass_enabled(&self) -> bool { + false + } + + fn fpga_enabled(&self) -> bool { + self.mode == NetworkStackMode::Fpga + } + } + + #[derive(Clone, Debug)] + struct FakeFpgaFeed { + ready: Result<(), FpgaFeedError>, + } + + impl FpgaFeedPort for FakeFpgaFeed { + fn vendor(&self) -> &str { + "fake" + } + + fn verbose(&self) -> bool { + false + } + + fn describe(&self) -> String { + "fake".to_owned() + } + + fn validate_ready(&self) -> Result<(), FpgaFeedError> { + self.ready.clone() + } + + fn spawn_stream( + &self, + _sender: mpsc::UnboundedSender, + ) -> Result<(), FpgaFeedError> { + Ok(()) + } + } + + #[derive(Clone, Debug)] + struct FakeLogStream { + ready: Result<(), LogStreamError>, + } + + impl LogStreamPort for FakeLogStream { + fn path_name(&self) -> &'static str { + "fake" + } + + fn validate_ready(&self) -> Result<(), LogStreamError> { + self.ready.clone() + } + + fn spawn_stream( + &self, + _sender: mpsc::UnboundedSender, + ) -> Result<(), LogStreamError> { + Ok(()) + } + } + + #[test] + fn fpga_mode_returns_fpga_readiness_error() { + let network_path = FakeNetworkPath { + mode: NetworkStackMode::Fpga, + }; + let fpga_feed = FakeFpgaFeed { + ready: Err(FpgaFeedError::MissingMockPayloadEnv { + env_var: "FPGA_DMA_MOCK_FRAME", + }), + }; + let kernel_bypass_stream = FakeLogStream { ready: Ok(()) }; + let standard_tcp_stream = FakeLogStream { ready: Ok(()) }; + + let ready = validate_ingress_readiness( + &network_path, + &fpga_feed, + &kernel_bypass_stream, + &standard_tcp_stream, + ); + assert!(ready.is_err()); + if let Err(error) = ready { + assert!(matches!( + error, + crate::app::errors::IngressReadinessError::Fpga { .. } + )); + } + } + + #[test] + fn kernel_bypass_mode_returns_kernel_readiness_error() { + let network_path = FakeNetworkPath { + mode: NetworkStackMode::KernelBypass, + }; + let fpga_feed = FakeFpgaFeed { ready: Ok(()) }; + let kernel_bypass_stream = FakeLogStream { + ready: Err(LogStreamError::OpenOnloadRuntimeInactive), + }; + let standard_tcp_stream = FakeLogStream { ready: Ok(()) }; + + let ready = validate_ingress_readiness( + &network_path, + &fpga_feed, + &kernel_bypass_stream, + &standard_tcp_stream, + ); + assert!(ready.is_err()); + if let Err(error) = ready { + assert!(matches!( + error, + crate::app::errors::IngressReadinessError::KernelBypass { .. } + )); + } + } +} diff --git a/src/app/systemd.rs b/src/app/systemd.rs index b535a9e..5131f60 100644 --- a/src/app/systemd.rs +++ b/src/app/systemd.rs @@ -4,6 +4,8 @@ use std::{ process::Command, }; +use thiserror::Error; + const DEFAULT_SERVICE_NAME: &str = "slotstrike"; const DEFAULT_SYSTEMD_DIR: &str = "/etc/systemd/system"; const DEFAULT_CONFIG_PATH: &str = "slotstrike.toml"; @@ -26,7 +28,167 @@ enum ServiceAction { Uninstall, } -pub fn maybe_handle_service_command(args: &[String]) -> Result { +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum NameField { + Name, + User, + Group, +} + +impl NameField { + const fn as_str(self) -> &'static str { + match self { + Self::Name => "service name", + Self::User => "service user", + Self::Group => "service group", + } + } +} + +impl std::fmt::Display for NameField { + fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str(self.as_str()) + } +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum PathField { + SystemdDir, + ConfigPath, + WorkingDir, + BinaryPath, +} + +impl PathField { + const fn as_str(self) -> &'static str { + match self { + Self::SystemdDir => "systemd dir", + Self::ConfigPath => "config path", + Self::WorkingDir => "working dir", + Self::BinaryPath => "binary path", + } + } +} + +impl std::fmt::Display for PathField { + fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str(self.as_str()) + } +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum SystemctlAction { + DaemonReload, + EnableNow, + DisableNow, + ResetFailed, +} + +impl SystemctlAction { + const fn as_str(self) -> &'static str { + match self { + Self::DaemonReload => "daemon-reload", + Self::EnableNow => "enable --now", + Self::DisableNow => "disable --now", + Self::ResetFailed => "reset-failed", + } + } +} + +impl std::fmt::Display for SystemctlAction { + fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str(self.as_str()) + } +} + +#[derive(Debug, Error)] +pub enum SystemdError { + #[error("use only one of --install-service or --uninstall-service")] + ConflictingServiceActions, + #[error(transparent)] + ServiceOptions(#[from] ServiceOptionsError), + #[error(transparent)] + ServiceInstall(#[from] ServiceInstallError), + #[error(transparent)] + ServiceUninstall(#[from] ServiceUninstallError), +} + +#[derive(Debug, Error)] +pub enum ServiceOptionsError { + #[error("{field} must not be empty")] + EmptyName { field: NameField }, + #[error("{field} must not contain whitespace or '/'")] + InvalidNameCharacters { field: NameField }, + #[error("failed to resolve absolute path for {field}")] + ResolveAbsolutePath { + field: PathField, + #[source] + source: std::io::Error, + }, + #[error("failed to resolve current dir")] + ResolveCurrentDir { + #[source] + source: std::io::Error, + }, + #[error("failed to resolve current binary path")] + ResolveCurrentBinaryPath { + #[source] + source: std::io::Error, + }, + #[error("{field} must not contain spaces for systemd compatibility")] + PathContainsSpaces { field: PathField }, +} + +#[derive(Debug, Error)] +pub enum ServiceInstallError { + #[error("binary not found at {path}")] + BinaryNotFound { path: PathBuf }, + #[error("config not found at {path}")] + ConfigNotFound { path: PathBuf }, + #[error("failed to create systemd dir at {path}")] + CreateSystemdDir { + path: PathBuf, + #[source] + source: std::io::Error, + }, + #[error("failed to write unit file at {path}")] + WriteUnitFile { + path: PathBuf, + #[source] + source: std::io::Error, + }, + #[error(transparent)] + Systemctl(#[from] SystemctlError), +} + +#[derive(Debug, Error)] +pub enum ServiceUninstallError { + #[error("failed to remove unit file at {path}")] + RemoveUnitFile { + path: PathBuf, + #[source] + source: std::io::Error, + }, + #[error(transparent)] + Systemctl(#[from] SystemctlError), +} + +#[derive(Debug, Error)] +pub enum SystemctlError { + #[error("failed to execute systemctl {action}")] + Execute { + action: SystemctlAction, + #[source] + source: std::io::Error, + }, + #[error("systemctl {action} failed with exit code {code:?}")] + Failed { + action: SystemctlAction, + code: Option, + }, +} + +pub fn maybe_handle_service_command(args: &[String]) -> Result { let install = arg_flag(args, "--install-service"); let uninstall = arg_flag(args, "--uninstall-service"); @@ -35,7 +197,7 @@ pub fn maybe_handle_service_command(args: &[String]) -> Result { } if install && uninstall { - return Err("Use only one of --install-service or --uninstall-service".to_owned()); + return Err(SystemdError::ConflictingServiceActions); } let action = if install { @@ -53,36 +215,39 @@ pub fn maybe_handle_service_command(args: &[String]) -> Result { Ok(true) } -fn build_options(args: &[String]) -> Result { +fn build_options(args: &[String]) -> Result { let service_name = arg_value(args, "--service-name").unwrap_or_else(|| DEFAULT_SERVICE_NAME.to_owned()); - validate_name(&service_name, "service name")?; + validate_name(&service_name, NameField::Name)?; let service_user = arg_value(args, "--service-user") .or_else(|| env::var("SUDO_USER").ok()) .or_else(|| env::var("USER").ok()) .unwrap_or_else(|| "root".to_owned()); - validate_name(&service_user, "service user")?; + validate_name(&service_user, NameField::User)?; let service_group = arg_value(args, "--service-group") .or_else(|| primary_group_for_user(&service_user)) .unwrap_or_else(|| service_user.clone()); - validate_name(&service_group, "service group")?; + validate_name(&service_group, NameField::Group)?; let systemd_dir = absolutize( arg_value(args, "--systemd-dir").unwrap_or_else(|| DEFAULT_SYSTEMD_DIR.to_owned()), + PathField::SystemdDir, + )?; + let config_path = absolutize( + arg_value(args, "--config").unwrap_or_else(|| DEFAULT_CONFIG_PATH.to_owned()), + PathField::ConfigPath, )?; - let config_path = - absolutize(arg_value(args, "--config").unwrap_or_else(|| DEFAULT_CONFIG_PATH.to_owned()))?; let working_dir = - env::current_dir().map_err(|error| format!("Failed to resolve current dir: {}", error))?; + env::current_dir().map_err(|source| ServiceOptionsError::ResolveCurrentDir { source })?; let bin_path = env::current_exe() - .map_err(|error| format!("Failed to resolve current binary path: {}", error))?; + .map_err(|source| ServiceOptionsError::ResolveCurrentBinaryPath { source })?; - ensure_no_spaces(&systemd_dir, "systemd dir")?; - ensure_no_spaces(&config_path, "config path")?; - ensure_no_spaces(&working_dir, "working dir")?; - ensure_no_spaces(&bin_path, "binary path")?; + ensure_no_spaces(&systemd_dir, PathField::SystemdDir)?; + ensure_no_spaces(&config_path, PathField::ConfigPath)?; + ensure_no_spaces(&working_dir, PathField::WorkingDir)?; + ensure_no_spaces(&bin_path, PathField::BinaryPath)?; Ok(ServiceOptions { service_name, @@ -96,24 +261,24 @@ fn build_options(args: &[String]) -> Result { }) } -fn install_service(options: &ServiceOptions) -> Result<(), String> { +fn install_service(options: &ServiceOptions) -> Result<(), ServiceInstallError> { if !options.bin_path.is_file() { - return Err(format!("Binary not found: {}", options.bin_path.display())); + return Err(ServiceInstallError::BinaryNotFound { + path: options.bin_path.clone(), + }); } if !options.config_path.is_file() { - return Err(format!( - "Config not found: {}", - options.config_path.display() - )); + return Err(ServiceInstallError::ConfigNotFound { + path: options.config_path.clone(), + }); } - fs::create_dir_all(&options.systemd_dir).map_err(|error| { - format!( - "Failed to create systemd dir '{}': {}", - options.systemd_dir.display(), - error - ) + fs::create_dir_all(&options.systemd_dir).map_err(|source| { + ServiceInstallError::CreateSystemdDir { + path: options.systemd_dir.clone(), + source, + } })?; let unit_file_name = format!("{}.service", options.service_name); @@ -121,18 +286,20 @@ fn install_service(options: &ServiceOptions) -> Result<(), String> { let log_dir = options.working_dir.join("log"); let unit_contents = render_unit(options, &log_dir); - fs::write(&unit_file_path, unit_contents).map_err(|error| { - format!( - "Failed to write unit file '{}': {}", - unit_file_path.display(), - error - ) + fs::write(&unit_file_path, unit_contents).map_err(|source| { + ServiceInstallError::WriteUnitFile { + path: unit_file_path.clone(), + source, + } })?; - run_systemctl(&["daemon-reload"])?; + run_systemctl(&["daemon-reload"], SystemctlAction::DaemonReload)?; if options.enable_now { - run_systemctl(&["enable", "--now", &unit_file_name])?; + run_systemctl( + &["enable", "--now", &unit_file_name], + SystemctlAction::EnableNow, + )?; println!( "Installed and started {} using config {}", unit_file_name, @@ -149,48 +316,48 @@ fn install_service(options: &ServiceOptions) -> Result<(), String> { Ok(()) } -fn uninstall_service(options: &ServiceOptions) -> Result<(), String> { +fn uninstall_service(options: &ServiceOptions) -> Result<(), ServiceUninstallError> { let unit_file_name = format!("{}.service", options.service_name); let unit_file_path = options.systemd_dir.join(&unit_file_name); - let _disable_result = run_systemctl(&["disable", "--now", &unit_file_name]); + let _disable_result = run_systemctl( + &["disable", "--now", &unit_file_name], + SystemctlAction::DisableNow, + ); if unit_file_path.exists() { - fs::remove_file(&unit_file_path).map_err(|error| { - format!( - "Failed to remove unit file '{}': {}", - unit_file_path.display(), - error - ) + fs::remove_file(&unit_file_path).map_err(|source| { + ServiceUninstallError::RemoveUnitFile { + path: unit_file_path.clone(), + source, + } })?; } - run_systemctl(&["daemon-reload"])?; - let _reset_failed = run_systemctl(&["reset-failed", &unit_file_name]); + run_systemctl(&["daemon-reload"], SystemctlAction::DaemonReload)?; + let _reset_failed = run_systemctl( + &["reset-failed", &unit_file_name], + SystemctlAction::ResetFailed, + ); println!("Uninstalled {}", unit_file_name); Ok(()) } -fn run_systemctl(args: &[&str]) -> Result<(), String> { +fn run_systemctl(args: &[&str], action: SystemctlAction) -> Result<(), SystemctlError> { let output = Command::new("systemctl") .args(args) .output() - .map_err(|error| format!("Failed to execute systemctl {:?}: {}", args, error))?; + .map_err(|source| SystemctlError::Execute { action, source })?; if output.status.success() { return Ok(()); } - let stderr = String::from_utf8_lossy(&output.stderr); - let stdout = String::from_utf8_lossy(&output.stdout); - Err(format!( - "systemctl {:?} failed (code {:?}). stdout='{}' stderr='{}'", - args, - output.status.code(), - stdout.trim(), - stderr.trim() - )) + Err(SystemctlError::Failed { + action, + code: output.status.code(), + }) } fn primary_group_for_user(user: &str) -> Option { @@ -248,7 +415,7 @@ fn arg_value(args: &[String], flag: &str) -> Option { .cloned() } -fn absolutize(value: String) -> Result { +fn absolutize(value: String, field: PathField) -> Result { let path = PathBuf::from(value); if path.is_absolute() { return Ok(path); @@ -256,29 +423,22 @@ fn absolutize(value: String) -> Result { env::current_dir() .map(|cwd| cwd.join(path)) - .map_err(|error| format!("Failed to resolve absolute path: {}", error)) + .map_err(|source| ServiceOptionsError::ResolveAbsolutePath { field, source }) } -fn validate_name(value: &str, field: &str) -> Result<(), String> { +fn validate_name(value: &str, field: NameField) -> Result<(), ServiceOptionsError> { if value.trim().is_empty() { - return Err(format!("{} must not be empty", field)); + return Err(ServiceOptionsError::EmptyName { field }); } if value.contains(char::is_whitespace) || value.contains('/') { - return Err(format!( - "{} must not contain whitespace or '/' (got '{}')", - field, value - )); + return Err(ServiceOptionsError::InvalidNameCharacters { field }); } Ok(()) } -fn ensure_no_spaces(path: &Path, field: &str) -> Result<(), String> { +fn ensure_no_spaces(path: &Path, field: PathField) -> Result<(), ServiceOptionsError> { if path.to_string_lossy().contains(' ') { - return Err(format!( - "{} must not contain spaces for systemd compatibility: {}", - field, - path.display() - )); + return Err(ServiceOptionsError::PathContainsSpaces { field }); } Ok(()) } @@ -320,16 +480,15 @@ mod tests { service_user: "slotstrike".to_owned(), service_group: "slotstrike".to_owned(), systemd_dir: PathBuf::from("/etc/systemd/system"), - config_path: PathBuf::from("/home/slotstrike/slotstrike/slotstrike.toml"), - working_dir: PathBuf::from("/home/slotstrike/slotstrike"), - bin_path: PathBuf::from("/home/slotstrike/slotstrike/target/release/slotstrike"), + config_path: PathBuf::from("/home/slotstrike/slotstrike.toml"), + working_dir: PathBuf::from("/home/slotstrike"), + bin_path: PathBuf::from("/usr/local/bin/slotstrike"), enable_now: true, }; - let log_dir = PathBuf::from("/home/slotstrike/slotstrike/log"); - let rendered = render_unit(&options, &log_dir); + let rendered = render_unit(&options, &PathBuf::from("/home/slotstrike/log")); assert!(rendered.contains( - "ExecStart=/home/slotstrike/slotstrike/target/release/slotstrike --config /home/slotstrike/slotstrike/slotstrike.toml" + "ExecStart=/usr/local/bin/slotstrike --config /home/slotstrike/slotstrike.toml" )); } } diff --git a/src/domain/config.rs b/src/domain/config.rs index 3758954..3c51eb4 100644 --- a/src/domain/config.rs +++ b/src/domain/config.rs @@ -1,4 +1,6 @@ use serde::Deserialize; +use std::path::PathBuf; +use thiserror::Error; #[derive(Clone, Debug, Deserialize)] pub struct SniperConfigFile { @@ -31,6 +33,12 @@ pub struct RuntimeConfigSection { pub fpga_verbose: bool, #[serde(default = "default_fpga_vendor")] pub fpga_vendor: String, + #[serde(default = "default_fpga_ingress_mode")] + pub fpga_ingress_mode: String, + #[serde(default = "default_fpga_direct_device_path")] + pub fpga_direct_device_path: String, + #[serde(default = "default_fpga_dma_socket_path")] + pub fpga_dma_socket_path: String, #[serde(default)] pub replay_benchmark: bool, #[serde(default = "default_replay_event_count")] @@ -78,15 +86,33 @@ impl Default for TelemetryConfigSection { } } -pub fn load_sniper_config_file(path: &str) -> Result { - let config_text = std::fs::read_to_string(path) - .map_err(|error| format!("Failed to read config file '{}': {}", path, error))?; +#[derive(Debug, Error)] +pub enum ConfigError { + #[error("failed to read config file at {path}")] + ReadConfigFile { + path: PathBuf, + #[source] + source: std::io::Error, + }, + #[error("invalid slotstrike.toml format")] + ParseToml { + #[source] + source: toml::de::Error, + }, +} + +pub fn load_sniper_config_file(path: &str) -> Result { + let config_text = + std::fs::read_to_string(path).map_err(|source| ConfigError::ReadConfigFile { + path: PathBuf::from(path), + source, + })?; parse_sniper_config_toml(&config_text) } -pub fn parse_sniper_config_toml(config_text: &str) -> Result { +pub fn parse_sniper_config_toml(config_text: &str) -> Result { toml::from_str::(config_text) - .map_err(|error| format!("Invalid slotstrike.toml format: {}", error)) + .map_err(|source| ConfigError::ParseToml { source }) } fn default_tx_submission_mode() -> String { @@ -109,6 +135,18 @@ fn default_fpga_vendor() -> String { "generic".to_owned() } +fn default_fpga_ingress_mode() -> String { + "auto".to_owned() +} + +fn default_fpga_direct_device_path() -> String { + "/dev/slotstrike-fpga0".to_owned() +} + +fn default_fpga_dma_socket_path() -> String { + "/tmp/slotstrike-fpga-dma.sock".to_owned() +} + const fn default_replay_event_count() -> usize { 50_000 } diff --git a/src/domain/settings.rs b/src/domain/settings.rs index 9b1409f..c0c7ead 100644 --- a/src/domain/settings.rs +++ b/src/domain/settings.rs @@ -1,13 +1,150 @@ use std::env; +use thiserror::Error; use crate::domain::{ - config::{SniperConfigFile, load_sniper_config_file}, + config::{ConfigError, SniperConfigFile, load_sniper_config_file}, value_objects::{ - KernelBypassEngine, NonEmptyText, PriorityFeesMicrolamports, ReplayBurstSize, - ReplayEventCount, TxSubmissionMode, + FpgaIngressMode, KernelBypassEngine, NonEmptyText, PriorityFeesMicrolamports, + ReplayBurstSize, ReplayEventCount, TxSubmissionMode, }, }; +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum RequiredRuntimeField { + KeypairPath, + RpcUrl, + WssUrl, + JitoUrl, +} + +impl RequiredRuntimeField { + const fn as_str(self) -> &'static str { + match self { + Self::KeypairPath => "keypair_path", + Self::RpcUrl => "rpc_url", + Self::WssUrl => "wss_url", + Self::JitoUrl => "jito_url", + } + } +} + +impl std::fmt::Display for RequiredRuntimeField { + fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str(self.as_str()) + } +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum NonEmptyRuntimeField { + KernelBypassSocketPath, + FpgaVendor, + FpgaDirectDevicePath, + FpgaDmaSocketPath, + WssUrl, +} + +impl NonEmptyRuntimeField { + const fn as_str(self) -> &'static str { + match self { + Self::KernelBypassSocketPath => "kernel_bypass_socket_path", + Self::FpgaVendor => "fpga_vendor", + Self::FpgaDirectDevicePath => "fpga_direct_device_path", + Self::FpgaDmaSocketPath => "fpga_dma_socket_path", + Self::WssUrl => "wss_url", + } + } +} + +impl std::fmt::Display for NonEmptyRuntimeField { + fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str(self.as_str()) + } +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum ReplayField { + ReplayEventCount, + ReplayBurstSize, +} + +impl ReplayField { + const fn as_str(self) -> &'static str { + match self { + Self::ReplayEventCount => "replay_event_count", + Self::ReplayBurstSize => "replay_burst_size", + } + } +} + +impl std::fmt::Display for ReplayField { + fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str(self.as_str()) + } +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum TelemetryField { + SampleCapacity, + ReportPeriodSecs, +} + +impl TelemetryField { + const fn as_str(self) -> &'static str { + match self { + Self::SampleCapacity => "telemetry.sample_capacity", + Self::ReportPeriodSecs => "telemetry.report_period_secs", + } + } +} + +impl std::fmt::Display for TelemetryField { + fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str(self.as_str()) + } +} + +#[derive(Debug, Error)] +pub enum SettingsError { + #[error(transparent)] + Config(#[from] ConfigError), + #[error(transparent)] + Replay(#[from] ReplaySettingsError), + #[error(transparent)] + Runtime(#[from] RuntimeSettingsError), + #[error(transparent)] + Telemetry(#[from] TelemetrySettingsError), +} + +#[derive(Debug, Error)] +pub enum ReplaySettingsError { + #[error("{field} must be greater than 0")] + MustBeGreaterThanZero { field: ReplayField }, +} + +#[derive(Debug, Error)] +pub enum RuntimeSettingsError { + #[error( + "invalid kernel_tcp_bypass_engine; supported values: af_xdp, dpdk, openonload, af_xdp_or_dpdk_external" + )] + InvalidKernelBypassEngine, + #[error( + "invalid fpga_ingress_mode; supported values: auto, mock_dma, direct_device, external_socket" + )] + InvalidFpgaIngressMode, + #[error("invalid tx_submission_mode; supported values: jito, direct")] + InvalidTxSubmissionMode, + #[error("missing {field} in runtime config")] + MissingRuntimeField { field: RequiredRuntimeField }, + #[error("{field} must not be empty")] + EmptyRuntimeField { field: NonEmptyRuntimeField }, +} + +#[derive(Debug, Error)] +pub enum TelemetrySettingsError { + #[error("{field} must be greater than 0 when telemetry.enabled=true")] + InvalidEnabledValue { field: TelemetryField }, +} + #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub enum NetworkStackMode { Fpga, @@ -41,6 +178,9 @@ pub struct RuntimeSettings { pub fpga_enabled: bool, pub fpga_verbose: bool, pub fpga_vendor: NonEmptyText, + pub fpga_ingress_mode: FpgaIngressMode, + pub fpga_direct_device_path: NonEmptyText, + pub fpga_dma_socket_path: NonEmptyText, pub network_stack_mode: NetworkStackMode, pub run_replay_benchmark: bool, pub replay_event_count: ReplayEventCount, @@ -52,12 +192,12 @@ pub struct RuntimeSettings { } impl RuntimeSettings { - pub fn from_args() -> Result { + pub fn from_args() -> Result { let args = env::args().skip(1).collect::>(); Self::from_cli_args(&args) } - pub(crate) fn from_cli_args(args: &[String]) -> Result { + pub(crate) fn from_cli_args(args: &[String]) -> Result { let config_path = arg_value(args, "--config").unwrap_or_else(|| "slotstrike.toml".to_owned()); let parsed_config = load_sniper_config_file(&config_path)?; @@ -68,40 +208,57 @@ impl RuntimeSettings { args: &[String], config_path: String, parsed_config: &SniperConfigFile, - ) -> Result { + ) -> Result { let runtime = &parsed_config.runtime; let telemetry = &parsed_config.telemetry; let run_replay_benchmark = arg_flag(args, "--replay-benchmark") || runtime.replay_benchmark; - let replay_event_count = ReplayEventCount::new(runtime.replay_event_count) - .map_err(|message| format!("replay_event_count {}", message))?; - let replay_burst_size = ReplayBurstSize::new(runtime.replay_burst_size) - .map_err(|message| format!("replay_burst_size {}", message))?; + let replay_event_count = + ReplayEventCount::new(runtime.replay_event_count).map_err(|_source| { + ReplaySettingsError::MustBeGreaterThanZero { + field: ReplayField::ReplayEventCount, + } + })?; + let replay_burst_size = + ReplayBurstSize::new(runtime.replay_burst_size).map_err(|_source| { + ReplaySettingsError::MustBeGreaterThanZero { + field: ReplayField::ReplayBurstSize, + } + })?; let kernel_tcp_bypass_enabled = runtime.kernel_tcp_bypass; let kernel_tcp_bypass_engine = KernelBypassEngine::parse(&runtime.kernel_tcp_bypass_engine) - .ok_or_else(|| { - format!( - "Invalid kernel_tcp_bypass_engine '{}'. Supported values: af_xdp, dpdk, openonload, af_xdp_or_dpdk_external", - runtime.kernel_tcp_bypass_engine - ) - })?; - let kernel_bypass_socket_path = - NonEmptyText::try_from(runtime.kernel_bypass_socket_path.clone()) - .map_err(|_error| "kernel_bypass_socket_path must not be empty".to_owned())?; - - let tx_submission_mode = - TxSubmissionMode::parse(&runtime.tx_submission_mode).ok_or_else(|| { - format!( - "Invalid tx_submission_mode '{}'", - runtime.tx_submission_mode - ) - })?; + .ok_or(RuntimeSettingsError::InvalidKernelBypassEngine)?; + let kernel_bypass_socket_path = NonEmptyText::try_from( + runtime.kernel_bypass_socket_path.clone(), + ) + .map_err(|_source| RuntimeSettingsError::EmptyRuntimeField { + field: NonEmptyRuntimeField::KernelBypassSocketPath, + })?; + + let tx_submission_mode = TxSubmissionMode::parse(&runtime.tx_submission_mode) + .ok_or(RuntimeSettingsError::InvalidTxSubmissionMode)?; let fpga_enabled = arg_flag(args, "--fpga") || runtime.fpga_enabled; let fpga_verbose = arg_flag(args, "--fpga-verbose") || runtime.fpga_verbose; - let fpga_vendor = NonEmptyText::try_from(runtime.fpga_vendor.clone()) - .map_err(|_error| "fpga_vendor must not be empty".to_owned())?; + let fpga_vendor = + NonEmptyText::try_from(runtime.fpga_vendor.clone()).map_err(|_source| { + RuntimeSettingsError::EmptyRuntimeField { + field: NonEmptyRuntimeField::FpgaVendor, + } + })?; + let fpga_ingress_mode = FpgaIngressMode::parse(&runtime.fpga_ingress_mode) + .ok_or(RuntimeSettingsError::InvalidFpgaIngressMode)?; + let fpga_direct_device_path = + NonEmptyText::try_from(runtime.fpga_direct_device_path.clone()).map_err(|_source| { + RuntimeSettingsError::EmptyRuntimeField { + field: NonEmptyRuntimeField::FpgaDirectDevicePath, + } + })?; + let fpga_dma_socket_path = NonEmptyText::try_from(runtime.fpga_dma_socket_path.clone()) + .map_err(|_source| RuntimeSettingsError::EmptyRuntimeField { + field: NonEmptyRuntimeField::FpgaDmaSocketPath, + })?; let network_stack_mode = if fpga_enabled { NetworkStackMode::Fpga @@ -113,13 +270,22 @@ impl RuntimeSettings { if !run_replay_benchmark { if runtime.keypair_path.trim().is_empty() { - return Err("Missing keypair_path in runtime config".to_owned()); + return Err(RuntimeSettingsError::MissingRuntimeField { + field: RequiredRuntimeField::KeypairPath, + } + .into()); } if runtime.rpc_url.trim().is_empty() { - return Err("Missing rpc_url in runtime config".to_owned()); + return Err(RuntimeSettingsError::MissingRuntimeField { + field: RequiredRuntimeField::RpcUrl, + } + .into()); } if runtime.wss_url.trim().is_empty() { - return Err("Missing wss_url in runtime config".to_owned()); + return Err(RuntimeSettingsError::MissingRuntimeField { + field: RequiredRuntimeField::WssUrl, + } + .into()); } } @@ -132,7 +298,9 @@ impl RuntimeSettings { .jito_url .clone() .filter(|value| !value.trim().is_empty()) - .ok_or_else(|| "Missing jito_url for jito submission mode".to_owned())? + .ok_or(RuntimeSettingsError::MissingRuntimeField { + field: RequiredRuntimeField::JitoUrl, + })? } else { runtime.jito_url.clone().unwrap_or_else(|| rpc_url.clone()) }; @@ -142,20 +310,23 @@ impl RuntimeSettings { } else { runtime.wss_url.clone() }; - let wss_url = NonEmptyText::try_from(wss_url_raw) - .map_err(|_error| "wss_url must not be empty".to_owned())?; + let wss_url = NonEmptyText::try_from(wss_url_raw).map_err(|_source| { + RuntimeSettingsError::EmptyRuntimeField { + field: NonEmptyRuntimeField::WssUrl, + } + })?; if telemetry.enabled && telemetry.sample_capacity == 0 { - return Err( - "telemetry.sample_capacity must be greater than 0 when telemetry.enabled=true" - .to_owned(), - ); + return Err(TelemetrySettingsError::InvalidEnabledValue { + field: TelemetryField::SampleCapacity, + } + .into()); } if telemetry.enabled && telemetry.report_period_secs == 0 { - return Err( - "telemetry.report_period_secs must be greater than 0 when telemetry.enabled=true" - .to_owned(), - ); + return Err(TelemetrySettingsError::InvalidEnabledValue { + field: TelemetryField::ReportPeriodSecs, + } + .into()); } Ok(Self { @@ -172,6 +343,9 @@ impl RuntimeSettings { fpga_enabled, fpga_verbose, fpga_vendor, + fpga_ingress_mode, + fpga_direct_device_path, + fpga_dma_socket_path, network_stack_mode, run_replay_benchmark, replay_event_count, @@ -199,11 +373,11 @@ fn arg_value(args: &[String], flag: &str) -> Option { mod tests { use super::{NetworkStackMode, RuntimeSettings}; use crate::domain::{ - config::{SniperConfigFile, parse_sniper_config_toml}, - value_objects::{KernelBypassEngine, TxSubmissionMode}, + config::{ConfigError, SniperConfigFile, parse_sniper_config_toml}, + value_objects::{FpgaIngressMode, KernelBypassEngine, TxSubmissionMode}, }; - fn minimal_config() -> Result { + fn minimal_config() -> Result { parse_sniper_config_toml( r#" [runtime] @@ -248,6 +422,7 @@ report_period_secs = 15 settings.kernel_tcp_bypass_engine, KernelBypassEngine::AfXdpOrDpdkExternal ); + assert_eq!(settings.fpga_ingress_mode, FpgaIngressMode::Auto); assert_eq!(settings.tx_submission_mode, TxSubmissionMode::Jito); } } @@ -379,6 +554,38 @@ replay_burst_size = 512 } } + #[test] + fn rejects_unknown_fpga_ingress_mode() { + let config = parse_sniper_config_toml( + r#" +[runtime] +keypair_path = "keypair.json" +rpc_url = "https://rpc.example" +wss_url = "wss://wss.example" +priority_fees = 1000 +tx_submission_mode = "direct" +kernel_tcp_bypass = true +kernel_tcp_bypass_engine = "af_xdp" +fpga_enabled = false +fpga_verbose = false +fpga_vendor = "generic" +fpga_ingress_mode = "invalid" +replay_benchmark = false +replay_event_count = 50000 +replay_burst_size = 512 +"#, + ); + assert!(config.is_ok()); + if let Ok(config) = config { + let settings = RuntimeSettings::from_parsed_config( + &Vec::new(), + "slotstrike.toml".to_owned(), + &config, + ); + assert!(settings.is_err()); + } + } + #[test] fn rejects_unknown_tx_submission_mode() { let config = parse_sniper_config_toml( diff --git a/src/domain/value_objects/mod.rs b/src/domain/value_objects/mod.rs index 2f7c7c4..d8f0997 100644 --- a/src/domain/value_objects/mod.rs +++ b/src/domain/value_objects/mod.rs @@ -4,6 +4,6 @@ pub mod sol_amount; pub use rule_primitives::{RuleAddress, RuleSlippageBps, RuleSolAmount}; pub use runtime::{ - KernelBypassEngine, NonEmptyText, PriorityFeesMicrolamports, ReplayBurstSize, ReplayEventCount, - TxSubmissionMode, + FpgaIngressMode, KernelBypassEngine, NonEmptyText, PriorityFeesMicrolamports, ReplayBurstSize, + ReplayEventCount, TxSubmissionMode, }; diff --git a/src/domain/value_objects/runtime.rs b/src/domain/value_objects/runtime.rs index b268b2e..5730115 100644 --- a/src/domain/value_objects/runtime.rs +++ b/src/domain/value_objects/runtime.rs @@ -42,6 +42,43 @@ impl Display for KernelBypassEngine { } } +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum FpgaIngressMode { + Auto, + MockDma, + DirectDevice, + ExternalSocket, +} + +impl FpgaIngressMode { + pub fn parse(value: &str) -> Option { + let normalized = value.trim().to_ascii_lowercase(); + match normalized.as_str() { + "auto" => Some(Self::Auto), + "mock_dma" => Some(Self::MockDma), + "direct_device" => Some(Self::DirectDevice), + "external_socket" => Some(Self::ExternalSocket), + _ => None, + } + } + + #[inline(always)] + pub const fn as_str(self) -> &'static str { + match self { + Self::Auto => "auto", + Self::MockDma => "mock_dma", + Self::DirectDevice => "direct_device", + Self::ExternalSocket => "external_socket", + } + } +} + +impl Display for FpgaIngressMode { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str(self.as_str()) + } +} + #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub enum TxSubmissionMode { Jito, @@ -178,8 +215,8 @@ impl TryFrom<&str> for NonEmptyText { #[cfg(test)] mod tests { use super::{ - KernelBypassEngine, NonEmptyText, PriorityFeesMicrolamports, ReplayBurstSize, - ReplayEventCount, TxSubmissionMode, + FpgaIngressMode, KernelBypassEngine, NonEmptyText, PriorityFeesMicrolamports, + ReplayBurstSize, ReplayEventCount, TxSubmissionMode, }; #[test] @@ -211,6 +248,28 @@ mod tests { assert_eq!(KernelBypassEngine::parse("invalid"), None); } + #[test] + fn parses_fpga_ingress_mode() { + assert_eq!(FpgaIngressMode::parse("auto"), Some(FpgaIngressMode::Auto)); + assert_eq!( + FpgaIngressMode::parse("mock_dma"), + Some(FpgaIngressMode::MockDma) + ); + assert_eq!( + FpgaIngressMode::parse("direct_device"), + Some(FpgaIngressMode::DirectDevice) + ); + assert_eq!( + FpgaIngressMode::parse("EXTERNAL_SOCKET"), + Some(FpgaIngressMode::ExternalSocket) + ); + } + + #[test] + fn rejects_invalid_fpga_ingress_mode() { + assert_eq!(FpgaIngressMode::parse("invalid"), None); + } + #[test] fn parses_tx_submission_mode() { assert_eq!( diff --git a/src/ports/fpga_feed.rs b/src/ports/fpga_feed.rs index 71a27cc..edda421 100644 --- a/src/ports/fpga_feed.rs +++ b/src/ports/fpga_feed.rs @@ -1,33 +1,53 @@ -use std::{ - error::Error, - fmt::{Display, Formatter}, -}; +use std::path::PathBuf; +use thiserror::Error; use tokio::sync::mpsc; use crate::domain::events::RawLogEvent; -#[derive(Clone, Debug, Eq, PartialEq)] +#[derive(Clone, Debug, Eq, Error, PartialEq)] pub enum FpgaFeedError { - Unavailable(String), - InvalidFrame(String), + #[error( + "unsupported fpga_vendor '{vendor}'; supported values: mock_dma, generic, exanic, xilinx, amd, solarflare, napatech" + )] + UnsupportedVendor { vendor: String }, + #[error("mock FPGA DMA ring requires '{env_var}' environment payload")] + MissingMockPayloadEnv { env_var: &'static str }, + #[error("configured FPGA DMA socket path '{socket_path}' does not exist")] + DmaSocketPathMissing { socket_path: PathBuf }, + #[error("failed to connect FPGA DMA socket at '{socket_path}'")] + DmaSocketUnavailable { socket_path: PathBuf }, + #[error("FPGA external DMA socket mode requires unix target")] + ExternalSocketRequiresUnixTarget, + #[error("configured FPGA direct device path '{device_path}' does not exist")] + DirectDevicePathMissing { device_path: PathBuf }, + #[error("failed to open FPGA direct device at '{device_path}'")] + DirectDeviceUnavailable { device_path: PathBuf }, + #[error("FPGA direct device mode requires unix target")] + DirectDeviceRequiresUnixTarget, + #[error("FPGA external frame is not valid JSON")] + ExternalFrameInvalidJson, + #[error("FPGA external frame must include either payload or payload_base64")] + ExternalFrameMissingPayload, + #[error("FPGA external frame payload_base64 is invalid")] + ExternalFrameInvalidBase64, + #[error("FPGA DMA payload is not valid UTF-8")] + InvalidPayloadUtf8, + #[error("FPGA DMA frame contains empty signature")] + EmptySignature, + #[error("FPGA DMA frame has invalid has_error flag")] + InvalidHasErrorFlag, + #[error("FPGA DMA frame missing signature field")] + MissingSignature, + #[error("FPGA DMA frame does not contain logs")] + MissingLogs, } -impl Display for FpgaFeedError { - fn fmt(&self, formatter: &mut Formatter<'_>) -> std::fmt::Result { - match self { - Self::Unavailable(message) => write!(formatter, "{}", message), - Self::InvalidFrame(message) => write!(formatter, "{}", message), - } - } -} - -impl Error for FpgaFeedError {} - pub trait FpgaFeedPort: Send + Sync { fn vendor(&self) -> &str; fn verbose(&self) -> bool; fn describe(&self) -> String; + fn validate_ready(&self) -> Result<(), FpgaFeedError>; fn spawn_stream(&self, sender: mpsc::UnboundedSender) -> Result<(), FpgaFeedError>; } diff --git a/src/ports/log_stream.rs b/src/ports/log_stream.rs index 2ee844d..01a4178 100644 --- a/src/ports/log_stream.rs +++ b/src/ports/log_stream.rs @@ -1,29 +1,35 @@ -use std::{ - error::Error, - fmt::{Display, Formatter}, -}; +use std::path::PathBuf; +use thiserror::Error; use tokio::sync::mpsc; use crate::domain::events::RawLogEvent; -#[derive(Clone, Debug, Eq, PartialEq)] +#[derive(Clone, Debug, Eq, Error, PartialEq)] pub enum LogStreamError { + #[error("{0}")] Unavailable(String), + #[error("invalid websocket url '{url}' for {path} path")] + InvalidWebsocketUrl { url: String, path: &'static str }, + #[error("kernel bypass path missing engine selection")] + MissingKernelBypassEngine, + #[error( + "openonload engine selected but Onload runtime is inactive; ensure /dev/onload is present and launch via onload preload" + )] + OpenOnloadRuntimeInactive, + #[error("missing kernel bypass socket path")] + MissingKernelBypassSocketPath, + #[error( + "kernel bypass socket unavailable at '{socket_path}' (expected external AF_XDP/DPDK bridge)" + )] + KernelBypassSocketUnavailable { socket_path: PathBuf }, + #[error("kernel bypass external socket mode requires unix target")] + ExternalBypassRequiresUnixTarget, } -impl Display for LogStreamError { - fn fmt(&self, formatter: &mut Formatter<'_>) -> std::fmt::Result { - match self { - Self::Unavailable(message) => write!(formatter, "{}", message), - } - } -} - -impl Error for LogStreamError {} - pub trait LogStreamPort: Send + Sync { fn path_name(&self) -> &'static str; + fn validate_ready(&self) -> Result<(), LogStreamError>; fn spawn_stream( &self, sender: mpsc::UnboundedSender,