diff --git a/demo/perfetto/README.md b/demo/perfetto/README.md index 876182e..c27d020 100644 --- a/demo/perfetto/README.md +++ b/demo/perfetto/README.md @@ -22,4 +22,5 @@ Add to PATH or set `LJD_PERFETTO_TRACE_PROCESSOR` to point at ## Demos - [linux-data-record](linux-data-record/) — capture and inspect a system trace -- [perfetto-to-logjet](perfetto-to-logjet/) — full end-to-end: record ftrace, import via plugin, view in ljx +- [perfetto-to-logjet](perfetto-to-logjet/) — full end-to-end: record ftrace, import via plugin (SQLite export), view in ljx +- [perfetto-to-logjet-rpc](perfetto-to-logjet-rpc/) — same but using RPC stdio mode (no temp files) diff --git a/demo/perfetto/perfetto-to-logjet-rpc/README.md b/demo/perfetto/perfetto-to-logjet-rpc/README.md new file mode 100644 index 0000000..a8c08d8 --- /dev/null +++ b/demo/perfetto/perfetto-to-logjet-rpc/README.md @@ -0,0 +1,48 @@ +# Perfetto-to-logjet (RPC) + +Same as `perfetto-to-logjet/` but uses trace processor's `server stdio` RPC +mode instead of SQLite export. No temp files — queries go directly over stdin/stdout. + +## Build First + +```bash +make dev +./demo/perfetto/build-perfetto.sh +``` + +## Run + +```bash +cd demo/perfetto/perfetto-to-logjet-rpc +./run-demo.sh +``` + +## What Happens + +1. `tracebox` records 5s of scheduler events (CPU switches, process lifecycle, + CPU frequency, interrupts) via ftrace. +2. `ljd` loads the perfetto-ingest plugin with `LJD_PERFETTO_ACQUISITION=rpc` + set via `ingest.plugin-env` config. +3. The plugin spawns a fresh `trace_processor server stdio` for each query, + sends SQL, receives protobuf responses, maps rows to OTel log records, + and streams them into a `.logjet` spool — no temp SQLite files. +4. `ljx view` opens the spool. + +## What You Should See + +Same output as the SQLite demo — thousands of log lines across multiple types +(sched slices, thread states, ftrace events, spurious wakeups). + +## SQLite vs RPC + +| | SQLite (default) | RPC | +|---|---|---| +| Config | `ingest.plugin-path: ...` | + `ingest.plugin-env: LJD_PERFETTO_ACQUISITION=rpc` | +| Temp files | Yes (SQLite export) | No | +| Speed | Faster (single trace load) | Slower (one load per query type) | +| Maturity | Stable | New | + +## Troubleshooting + +- **0 records**: Run with `sudo ./run-demo.sh` for ftrace access. +- **ljx view shows fewer records**: Delete stale index cache: `rm -rf ~/.cache/ljx`. diff --git a/demo/perfetto/perfetto-to-logjet-rpc/run-demo.sh b/demo/perfetto/perfetto-to-logjet-rpc/run-demo.sh new file mode 100755 index 0000000..7263f92 --- /dev/null +++ b/demo/perfetto/perfetto-to-logjet-rpc/run-demo.sh @@ -0,0 +1,157 @@ +#!/bin/sh +set -eu + +SCRIPT_DIR=$(CDPATH= cd -- "$(dirname -- "$0")" && pwd) +ROOT_DIR="$SCRIPT_DIR/../../.." +BUILD_DIR="$ROOT_DIR/target/debug" +PERFETTO_OUT="${PERFETTO_OUT:-$ROOT_DIR/perfetto/out/linux_release}" +SPOOL_DIR="$SCRIPT_DIR/spool" +TRACE_FILE="$SCRIPT_DIR/trace.pftrace" + +LJD="$BUILD_DIR/ljd" +LJX="$BUILD_DIR/ljx" +PLUGIN="$BUILD_DIR/liblj_perfetto_ingest.so" +TRACED="$PERFETTO_OUT/traced" +TRACED_PROBES="$PERFETTO_OUT/traced_probes" +TRACEBOX="$PERFETTO_OUT/tracebox" +TP="$PERFETTO_OUT/trace_processor_shell" + +for bin in "$LJD" "$LJX" "$PLUGIN"; do + if [ ! -e "$bin" ]; then + echo "missing $bin" + echo "build first with: make dev" + exit 1 + fi +done + +for bin in "$TRACED" "$TRACED_PROBES" "$TRACEBOX" "$TP"; do + if [ ! -x "$bin" ]; then + echo "missing $bin" + echo "build perfetto first with: ./demo/perfetto/build-perfetto.sh" + exit 1 + fi +done + +# ── Record a trace ──────────────────────────────────────────────────────────── + +echo "Starting traced..." +"$TRACED" &>/dev/null & +TRACED_PID=$! + +echo "Starting traced_probes..." +"$TRACED_PROBES" &>/dev/null & +PROBES_PID=$! + +cleanup_trace() { + kill "$TRACED_PID" 2>/dev/null || true + kill "$PROBES_PID" 2>/dev/null || true + wait "$TRACED_PID" 2>/dev/null || true + wait "$PROBES_PID" 2>/dev/null || true +} + +trap cleanup_trace EXIT INT TERM + +sleep 1 + +echo "Recording 5s of ftrace to $TRACE_FILE..." +CONFIG_FILE="$SCRIPT_DIR/trace-config.txt" +cat > "$CONFIG_FILE" <<'ENDCONFIG' +buffers: { + size_kb: 8192 + fill_policy: RING_BUFFER +} +data_sources: { + config { + name: "linux.ftrace" + ftrace_config { + ftrace_events: "sched/sched_switch" + ftrace_events: "sched/sched_waking" + ftrace_events: "sched/sched_process_exec" + ftrace_events: "sched/sched_process_fork" + ftrace_events: "sched/sched_process_exit" + ftrace_events: "power/cpu_frequency" + ftrace_events: "power/cpu_idle" + ftrace_events: "irq/irq_handler_entry" + ftrace_events: "irq/irq_handler_exit" + } + } +} +duration_ms: 5000 +ENDCONFIG + +if [ "$(id -u)" -eq 0 ]; then + "$TRACEBOX" --txt -c "$CONFIG_FILE" -o "$TRACE_FILE" +else + sudo "$TRACEBOX" --txt -c "$CONFIG_FILE" -o "$TRACE_FILE" + sudo chown "$(id -u):$(id -g)" "$TRACE_FILE" +fi + +rm -f "$CONFIG_FILE" + +cleanup_trace +trap - EXIT INT TERM + +if [ ! -f "$TRACE_FILE" ]; then + echo "Trace file not created." + exit 1 +fi + +SIZE=$(du -h "$TRACE_FILE" | cut -f1) +echo "Trace recorded: $TRACE_FILE ($SIZE)" +echo "" + +# ── Import via perfetto-ingest plugin ───────────────────────────────────────── + +echo "Importing into .logjet..." +rm -rf "$SPOOL_DIR" +mkdir -p "$SPOOL_DIR" + +# Use a temp config so we don't interfere with the user's config. +CONFIG_FILE="$SCRIPT_DIR/ljd-perfetto.conf" +cat > "$CONFIG_FILE" </dev/null || true + wait "$LJD_PID" 2>/dev/null || true + rm -f "$CONFIG_FILE" +} + +trap cleanup_ljd EXIT INT TERM + +# Poll until records appear (plugin finishes), up to 60s. +echo "Waiting for import..." +elapsed=0 +while [ "$elapsed" -lt 60 ]; do + if [ -f "$SPOOL_DIR/perfetto.logjet" ]; then + COUNT=$("$LJX" count "$SPOOL_DIR/perfetto.logjet" 2>/dev/null || echo "0") + if [ "$COUNT" -gt 0 ] 2>/dev/null; then + echo "Imported $COUNT records into $SPOOL_DIR/perfetto.logjet" + break + fi + fi + sleep 1 + elapsed=$((elapsed + 1)) +done + +kill "$LJD_PID" 2>/dev/null || true +wait "$LJD_PID" 2>/dev/null || true +trap - EXIT INT TERM + +# ── View the result ─────────────────────────────────────────────────────────── + +echo "Opening ljx view..." +"$LJX" view "$SPOOL_DIR/perfetto.logjet" diff --git a/doc/perfetto-ingest.md b/doc/perfetto-ingest.md index ebe2bf0..8ca12e8 100644 --- a/doc/perfetto-ingest.md +++ b/doc/perfetto-ingest.md @@ -5,17 +5,18 @@ ecosystem as OTel logs, traces, and metrics. ## Architecture +Two acquisition modes — selectable via `ingest.plugin-env` config: + ``` .pftrace ──→ trace_processor (spawned as subprocess) - ├── export sqlite ──→ sqlite_reader ──→ trace_mapper ──→ OTel spans - └── --run-metrics ──→ metrics_reader ──→ metric_mapper ──→ OTel metrics - log_mapper ──→ OTel logs - │ - ▼ - buffer & sort by ts - │ - ▼ - ljd spool (.logjet) + ├── SQLite (default): export sqlite → sqlite_reader → mappers + └── RPC: server stdio → rpc_reader → mappers + │ + ▼ + buffer & sort by ts + │ + ▼ + ljd spool (.logjet) ``` The plugin is an **active source** (`mode: 1`). ljd calls `lj_ingest_fetch()` once, @@ -34,11 +35,9 @@ to guarantee monotonic timestamps in the logjet block format. ## Usage -```bash -# Build the plugin and ljd: -make dev +### SQLite path (default) -# Create a config file (ljd uses YAML config, not CLI flags): +```bash cat > /tmp/perfetto.conf < /tmp/perfetto-rpc.conf <) -> Resu } fn scan_entries_parallel( - entries: Vec, - predicate: &RecordPredicate, - service_filter: Option<&HashSet>, - severity_filter: Option<&HashSet>, + entries: Vec, predicate: &RecordPredicate, service_filter: Option<&HashSet>, severity_filter: Option<&HashSet>, workers: usize, ) -> Result> { if entries.is_empty() { @@ -106,10 +103,7 @@ fn scan_entries_parallel( } let worker_count = workers.max(1).min(entries.len()); if worker_count == 1 { - return entries - .iter() - .map(|entry| scan_entry(entry, predicate, service_filter, severity_filter)) - .collect(); + return entries.iter().map(|entry| scan_entry(entry, predicate, service_filter, severity_filter)).collect(); } let predicate = predicate.clone(); diff --git a/ljx/src/commands/export.rs b/ljx/src/commands/export.rs index a62ca6b..03d20a1 100644 --- a/ljx/src/commands/export.rs +++ b/ljx/src/commands/export.rs @@ -134,7 +134,10 @@ fn insert_otlp_log_fields_with_preview( ) { target.insert( "body".to_string(), - JsonValue::String(truncate_preview(&log_record.body.as_ref().map(|v| format_any_value(Some(v))).filter(|s| !s.is_empty()).unwrap_or_default(), preview_bytes)), + JsonValue::String(truncate_preview( + &log_record.body.as_ref().map(|v| format_any_value(Some(v))).filter(|s| !s.is_empty()).unwrap_or_default(), + preview_bytes, + )), ); target.insert("timestamp".to_string(), JsonValue::String(format_timestamp(log_record.time_unix_nano.max(fallback_ts_unix_ns)))); if log_record.observed_time_unix_nano > 0 { @@ -229,7 +232,9 @@ fn any_value_to_json(value: &AnyValue, preview_bytes: Option) -> Option Some(JsonValue::Number((*number).into())), Some(Value::DoubleValue(number)) => serde_json::Number::from_f64(*number).map(JsonValue::Number), Some(Value::BytesValue(bytes)) => Some(JsonValue::String(truncate_preview(&format!("<{} bytes>", bytes.len()), preview_bytes))), - Some(Value::ArrayValue(array)) => Some(JsonValue::Array(array.values.iter().filter_map(|value| any_value_to_json(value, preview_bytes)).collect())), + Some(Value::ArrayValue(array)) => { + Some(JsonValue::Array(array.values.iter().filter_map(|value| any_value_to_json(value, preview_bytes)).collect())) + } Some(Value::KvlistValue(map)) => Some(JsonValue::Object( map.values .iter() @@ -282,4 +287,3 @@ fn format_timestamp(ts_unix_ns: u64) -> String { #[cfg(test)] #[path = "../../tests/unit/commands/top_level_query_ut.rs"] mod top_level_query_ut; - diff --git a/ljx/src/commands/view/app.rs b/ljx/src/commands/view/app.rs index 86cd021..cf99bb0 100644 --- a/ljx/src/commands/view/app.rs +++ b/ljx/src/commands/view/app.rs @@ -43,13 +43,11 @@ impl ViewApp { let catalog_bg = Arc::clone(&catalog); let dataset_bg = dataset.clone(); let workers = crate::scan_workers::default_worker_count(); - thread::Builder::new() - .name(format!("ljx-field-catalog-{workers}")) - .spawn(move || { - if let Ok(cat) = scan_field_catalog(&dataset_bg, workers) { - *catalog_bg.lock().unwrap() = Some(cat); - } - })?; + thread::Builder::new().name(format!("ljx-field-catalog-{workers}")).spawn(move || { + if let Ok(cat) = scan_field_catalog(&dataset_bg, workers) { + *catalog_bg.lock().unwrap() = Some(cat); + } + })?; Ok(Self { input: dataset.primary_path().to_path_buf(), @@ -1202,24 +1200,18 @@ impl ViewApp { let _ = std::fs::remove_file(scan.spool_path); } self.input = path; - let options = if self.nfs_mode { - crate::dataset::DatasetOptions::nfs() - } else { - crate::dataset::DatasetOptions::default() - }; + let options = if self.nfs_mode { crate::dataset::DatasetOptions::nfs() } else { crate::dataset::DatasetOptions::default() }; self.dataset = Dataset::from_inputs_with_options(std::slice::from_ref(&self.input), options)?; let catalog_bg = Arc::clone(&self.field_catalog); let dataset_bg = self.dataset.clone(); *self.field_catalog.lock().unwrap() = None; let workers = crate::scan_workers::default_worker_count(); - thread::Builder::new() - .name(format!("ljx-field-catalog-{workers}")) - .spawn(move || { - if let Ok(cat) = scan_field_catalog(&dataset_bg, workers) { - *catalog_bg.lock().unwrap() = Some(cat); - } - })?; + thread::Builder::new().name(format!("ljx-field-catalog-{workers}")).spawn(move || { + if let Ok(cat) = scan_field_catalog(&dataset_bg, workers) { + *catalog_bg.lock().unwrap() = Some(cat); + } + })?; self.query_input.clear(); self.apply_filter() diff --git a/ljx/src/commands/view/scan.rs b/ljx/src/commands/view/scan.rs index 2161d7f..1b2c832 100644 --- a/ljx/src/commands/view/scan.rs +++ b/ljx/src/commands/view/scan.rs @@ -39,14 +39,16 @@ pub(super) fn scan_field_catalog(dataset: &Dataset, workers: usize) -> Result= paths.len() { - break; + handles.push(thread::spawn(move || { + loop { + let idx = cursor.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + if idx >= paths.len() { + break; + } + let path = &paths[idx]; + let result = scan_field_catalog_file(path); + let _ = tx.send(result); } - let path = &paths[idx]; - let result = scan_field_catalog_file(path); - let _ = tx.send(result); })); } drop(tx); @@ -351,7 +353,8 @@ struct IndexedScanState<'a> { } fn scan_indexed_entry( - entry: &crate::dataset::DatasetEntry, index: &crate::dataset_index::DatasetIndex, predicate: &crate::predicate::RecordPredicate, state: &mut IndexedScanState<'_>, + entry: &crate::dataset::DatasetEntry, index: &crate::dataset_index::DatasetIndex, predicate: &crate::predicate::RecordPredicate, + state: &mut IndexedScanState<'_>, ) -> Result<()> { for block in &index.blocks { if state.cancel.load(Ordering::Relaxed) { diff --git a/ljx/src/dataset.rs b/ljx/src/dataset.rs index 3e16629..a3c9802 100644 --- a/ljx/src/dataset.rs +++ b/ljx/src/dataset.rs @@ -89,12 +89,7 @@ impl Dataset { paths.sort_by(|a, b| a.as_os_str().cmp(b.as_os_str())); paths.dedup(); - Ok(Self { - entries: paths - .into_iter() - .map(|path| DatasetEntry::from_path(path, options)) - .collect::>>()?, - }) + Ok(Self { entries: paths.into_iter().map(|path| DatasetEntry::from_path(path, options)).collect::>>()? }) } pub(crate) fn len(&self) -> usize { @@ -172,10 +167,7 @@ fn collect_dir_entries(dir: &Path, out: &mut Vec) -> Result<()> { } } if found == 0 { - return Err(Error::Usage(format!( - "input {} is a directory with no .logjet files", - dir.display() - ))); + return Err(Error::Usage(format!("input {} is a directory with no .logjet files", dir.display()))); } Ok(()) } @@ -185,10 +177,7 @@ fn looks_like_glob(path: &Path) -> bool { } fn modified_ns(meta: &Metadata) -> Option { - meta.modified() - .ok() - .and_then(|ts| ts.duration_since(UNIX_EPOCH).ok()) - .and_then(|dur| u64::try_from(dur.as_nanos()).ok()) + meta.modified().ok().and_then(|ts| ts.duration_since(UNIX_EPOCH).ok()).and_then(|dur| u64::try_from(dur.as_nanos()).ok()) } #[cfg(test)] diff --git a/ljx/src/dataset_index.rs b/ljx/src/dataset_index.rs index 2cacd15..03bdba7 100644 --- a/ljx/src/dataset_index.rs +++ b/ljx/src/dataset_index.rs @@ -112,9 +112,8 @@ pub(crate) fn sidecar_path(path: &Path) -> PathBuf { } fn cache_root_dir() -> Option { - let base = std::env::var_os("XDG_CACHE_HOME").map(PathBuf::from).or_else(|| { - std::env::var_os("HOME").map(|home| PathBuf::from(home).join(".cache")) - })?; + let base = + std::env::var_os("XDG_CACHE_HOME").map(PathBuf::from).or_else(|| std::env::var_os("HOME").map(|home| PathBuf::from(home).join(".cache")))?; let dir = base.join("ljx"); if std::fs::create_dir_all(&dir).is_err() { return None; @@ -217,7 +216,11 @@ impl IndexBlock { fn load_fresh(path: &Path, size: u64, modified_ns: Option, sidecar_path: &Path) -> Option { let bytes = std::fs::read(sidecar_path).ok()?; let disk: DiskIndex = serde_json::from_slice(&bytes).ok()?; - if disk.version != INDEX_VERSION || disk.source_path != path.display().to_string() || disk.source_size != size || disk.source_modified_ns != modified_ns { + if disk.version != INDEX_VERSION + || disk.source_path != path.display().to_string() + || disk.source_size != size + || disk.source_modified_ns != modified_ns + { return None; } Some(from_disk(disk)) @@ -473,7 +476,13 @@ impl SummaryBuilder { self.last_ts_unix_ns = Some(record.ts_unix_ns); self.record_types |= record_type_bit(record.record_type); if record.record_type == RecordType::Logs { - collect_log_summaries(&record.payload, &mut self.services, &mut self.services_complete, &mut self.severities, &mut self.severities_complete); + collect_log_summaries( + &record.payload, + &mut self.services, + &mut self.services_complete, + &mut self.severities, + &mut self.severities_complete, + ); } } diff --git a/ljx/src/main.rs b/ljx/src/main.rs index dd5fef0..b9c1045 100644 --- a/ljx/src/main.rs +++ b/ljx/src/main.rs @@ -35,9 +35,12 @@ fn run() -> Result<()> { let mut matches = command.get_matches_mut(); let cli = Cli::from_arg_matches_mut(&mut matches).map_err(|err| crate::error::Error::Usage(err.to_string()))?; if let Some(format) = cli.export.as_deref() { - let input = cli.input.as_deref().and_then(|inputs| inputs.first()).cloned().ok_or_else(|| { - Error::Usage("missing export input; use `ljx --export -o `".to_string()) - })?; + let input = cli + .input + .as_deref() + .and_then(|inputs| inputs.first()) + .cloned() + .ok_or_else(|| Error::Usage("missing export input; use `ljx --export -o `".to_string()))?; let output = cli.output.ok_or_else(|| Error::Usage("missing export output; use `ljx --export -o `".to_string()))?; return commands::export::run(format, &input, &output, cli.force, &cli.fields); } @@ -46,9 +49,7 @@ fn run() -> Result<()> { predicate.field_filter.services = crate::predicate::parse_string_filter(cli.services, "service")?; predicate.field_filter.severities = crate::predicate::parse_string_filter(cli.severities, "severity")?; return match cli.format.unwrap_or(ExportFormat::Ndjson) { - ExportFormat::Ndjson => { - commands::export::run_query_ndjson_multi(inputs, &predicate, &cli.fields, cli.preview_bytes) - } + ExportFormat::Ndjson => commands::export::run_query_ndjson_multi(inputs, &predicate, &cli.fields, cli.preview_bytes), }; } if cli.format.is_some() || cli.predicate.has_filters() { diff --git a/ljx/src/predicate.rs b/ljx/src/predicate.rs index 6af41ed..ba071fa 100644 --- a/ljx/src/predicate.rs +++ b/ljx/src/predicate.rs @@ -137,7 +137,6 @@ pub fn parse_string_filter(values: Vec, label: &str) -> Result Result { let mut payload_matchers = Vec::with_capacity(self.grep.len() + self.fixed_string.len()); for pattern in self.grep { diff --git a/ljx/src/scan_workers.rs b/ljx/src/scan_workers.rs index 721108a..c31a794 100644 --- a/ljx/src/scan_workers.rs +++ b/ljx/src/scan_workers.rs @@ -1,8 +1,5 @@ pub(crate) const MAX_SCAN_WORKERS: usize = 8; pub(crate) fn default_worker_count() -> usize { - std::thread::available_parallelism() - .map(|count| count.get()) - .unwrap_or(1) - .clamp(1, MAX_SCAN_WORKERS) + std::thread::available_parallelism().map(|count| count.get()).unwrap_or(1).clamp(1, MAX_SCAN_WORKERS) } diff --git a/ljx/tests/unit/cli_utst.rs b/ljx/tests/unit/cli_utst.rs index 996ed06..d66f2b7 100644 --- a/ljx/tests/unit/cli_utst.rs +++ b/ljx/tests/unit/cli_utst.rs @@ -23,7 +23,7 @@ fn dedup_accepts_collapse_and_full_matcher() { fn top_level_export_parses() { let cli = Cli::try_parse_from(["ljx", "--export", "ndjson", "input.logjet", "-o", "out.ndjson"]).expect("cli parses"); assert_eq!(cli.export.as_deref(), Some("ndjson")); - assert_eq!(cli.input, Some(vec![PathBuf::from("input.logjet") ])); + assert_eq!(cli.input, Some(vec![PathBuf::from("input.logjet")])); assert_eq!(cli.output, Some(PathBuf::from("out.ndjson"))); assert!(!cli.force); assert!(cli.command.is_none()); @@ -38,7 +38,7 @@ fn top_level_export_force_parses() { #[test] fn top_level_query_parses_literal_filter() { let cli = Cli::try_parse_from(["ljx", "input.logjet", "-F", "error"]).expect("cli parses"); - assert_eq!(cli.input, Some(vec![PathBuf::from("input.logjet") ])); + assert_eq!(cli.input, Some(vec![PathBuf::from("input.logjet")])); assert!(cli.format.is_none()); assert_eq!(cli.predicate.fixed_string, vec!["error".to_string()]); } @@ -46,7 +46,7 @@ fn top_level_query_parses_literal_filter() { #[test] fn top_level_query_accepts_explicit_ndjson_format() { let cli = Cli::try_parse_from(["ljx", "--format", "ndjson", "input.logjet", "-e", "error|panic"]).expect("cli parses"); - assert_eq!(cli.input, Some(vec![PathBuf::from("input.logjet") ])); + assert_eq!(cli.input, Some(vec![PathBuf::from("input.logjet")])); assert!(matches!(cli.format, Some(crate::cli::ExportFormat::Ndjson))); assert_eq!(cli.predicate.grep, vec!["error|panic".to_string()]); } diff --git a/ljx/tests/unit/commands/top_level_query_ut.rs b/ljx/tests/unit/commands/top_level_query_ut.rs index 4d56d8f..4d074c8 100644 --- a/ljx/tests/unit/commands/top_level_query_ut.rs +++ b/ljx/tests/unit/commands/top_level_query_ut.rs @@ -69,14 +69,7 @@ fn top_level_query_grep_scans_multiple_inputs() { let predicate = PredicateArgs { grep: vec!["timeout|bad".to_string()], ..PredicateArgs::default() }.build().unwrap(); let mut output = Vec::new(); - run_query_ndjson_multi_with_writer( - &[PathBuf::from(&a), PathBuf::from(&b)], - &predicate, - &[], - None, - &mut output, - ) - .unwrap(); + run_query_ndjson_multi_with_writer(&[PathBuf::from(&a), PathBuf::from(&b)], &predicate, &[], None, &mut output).unwrap(); let text = String::from_utf8(output).unwrap(); let rows = text.lines().map(|line| serde_json::from_str::(line).unwrap()).collect::>(); diff --git a/ljx/tests/unit/commands/view_ut.rs b/ljx/tests/unit/commands/view_ut.rs index 50f313c..28194e1 100644 --- a/ljx/tests/unit/commands/view_ut.rs +++ b/ljx/tests/unit/commands/view_ut.rs @@ -252,8 +252,7 @@ fn make_view_app(input: std::path::PathBuf) -> ViewApp { } fn make_view_app_inputs(inputs: Vec) -> ViewApp { - ViewApp::new(ViewArgs { inputs, dataset_order: ViewOrderArg::Concat, nfs: false, hex_payload: false, tail: false }) - .expect("view app") + ViewApp::new(ViewArgs { inputs, dataset_order: ViewOrderArg::Concat, nfs: false, hex_payload: false, tail: false }).expect("view app") } fn make_view_app_inputs_order(inputs: Vec, dataset_order: ViewOrderArg) -> ViewApp { diff --git a/ljx/tests/unit/dataset_index_ut.rs b/ljx/tests/unit/dataset_index_ut.rs index 51dc223..00462b7 100644 --- a/ljx/tests/unit/dataset_index_ut.rs +++ b/ljx/tests/unit/dataset_index_ut.rs @@ -64,10 +64,7 @@ fn sidecar_index_builds_bounds_and_log_summaries() { let path = std::env::temp_dir().join(format!("endor-sidecar-{}.logjet", std::process::id())); write_star_wars_logjet( &path, - &[ - (4, 1_000, "rebel-fleet", "WARN", "Endor shield pulse"), - (9, 5_000, "rebel-fleet", "ERROR", "Death Star reactor breach"), - ], + &[(4, 1_000, "rebel-fleet", "WARN", "Endor shield pulse"), (9, 5_000, "rebel-fleet", "ERROR", "Death Star reactor breach")], 32, ); @@ -96,10 +93,7 @@ fn sidecar_index_rebuilds_when_source_changes() { write_star_wars_logjet( &path, - &[ - (3, 300, "empire", "INFO", "Vader arrives with probe droids"), - (8, 800, "empire", "ERROR", "Mustafar lava surge alert"), - ], + &[(3, 300, "empire", "INFO", "Vader arrives with probe droids"), (8, 800, "empire", "ERROR", "Mustafar lava surge alert")], 128, ); let second = Dataset::from_inputs(std::slice::from_ref(&path)).unwrap(); diff --git a/logjetd/src/config.rs b/logjetd/src/config.rs index 92a1f09..3cd240d 100644 --- a/logjetd/src/config.rs +++ b/logjetd/src/config.rs @@ -16,6 +16,8 @@ pub struct Config { pub ingest_plugin_dir: Option, /// Optional descriptor name selected from `ingest_plugin_dir` and system plugin roots. pub ingest_plugin_name: Option, + /// Environment variables passed to the ingest plugin (KEY=VALUE pairs). + pub ingest_plugin_env: Vec, pub replay_addr: String, pub replay_max_clients: usize, pub replay_client_timeout_ms: u64, @@ -226,6 +228,8 @@ struct RawConfig { ingest_plugin: Option, #[serde(rename = "ingest.use")] ingest_use: Option, + #[serde(rename = "ingest.plugin-env", default)] + ingest_plugin_env: Vec, #[serde(rename = "replay.listen")] replay_addr: Option, #[serde(rename = "replay.max-clients")] @@ -313,6 +317,7 @@ impl Config { ingest_plugin_dir: None, ingest_plugin: None, ingest_use: None, + ingest_plugin_env: Vec::new(), replay_addr: None, replay_max_clients: None, replay_client_timeout_ms: None, @@ -497,6 +502,7 @@ impl Config { ingest_plugin_path, ingest_plugin_dir, ingest_plugin_name, + ingest_plugin_env: raw.ingest_plugin_env, replay_addr, replay_max_clients, replay_client_timeout_ms, diff --git a/logjetd/src/daemon.rs b/logjetd/src/daemon.rs index 60cc77a..b0af3d5 100644 --- a/logjetd/src/daemon.rs +++ b/logjetd/src/daemon.rs @@ -214,6 +214,7 @@ pub fn serve(config: DaemonConfig) -> io::Result<()> { config.config.ingest_plugin_path, config.config.ingest_plugin_dir, config.config.ingest_plugin_name, + config.config.ingest_plugin_env, ingest_policy, spool, next_seq, @@ -224,15 +225,15 @@ pub fn serve(config: DaemonConfig) -> io::Result<()> { #[allow(clippy::too_many_arguments)] fn ingest_loop( bind_addr: String, protocol: IngestProtocol, ingest_tls: IngestTlsConfig, ingest_limits: IngestLimits, plugin_path: Option, - plugin_dir: Option, plugin_name: Option, ingest_policy: Arc, spool: Arc, - next_seq: Arc, + plugin_dir: Option, plugin_name: Option, plugin_env: Vec, ingest_policy: Arc, + spool: Arc, next_seq: Arc, ) -> io::Result<()> { let limiter = Arc::new(ConnectionLimiter::new(ingest_limits.max_clients)); match protocol { IngestProtocol::Plugin => { let path = crate::plugin::resolve_ingest_plugin(plugin_path.as_deref(), plugin_dir.as_deref(), plugin_name.as_deref())?; eprintln!("ljd ingest plugin selected name={} path={}", crate::plugin::ingest_plugin_label(&path), path.display()); - return crate::plugin::plugin_ingest_loop(&bind_addr, &path, spool, next_seq); + return crate::plugin::plugin_ingest_loop(&bind_addr, &path, &plugin_env, spool, next_seq); } IngestProtocol::Wire => { let listener = TcpListener::bind(&bind_addr)?; diff --git a/logjetd/src/plugin.rs b/logjetd/src/plugin.rs index afb37db..e10b7ae 100644 --- a/logjetd/src/plugin.rs +++ b/logjetd/src/plugin.rs @@ -500,9 +500,9 @@ struct LjLogRecord { #[repr(C)] struct LjIngestRecordV1 { struct_size: u32, - record_type: u32, // LJ_INGEST_RECORD_TYPE_* + record_type: u32, // LJ_INGEST_RECORD_TYPE_* timestamp_unix_ns: u64, - payload: *const u8, // pre-encoded OTLP protobuf bytes + payload: *const u8, // pre-encoded OTLP protobuf bytes payload_len: usize, flags: u32, reserved: [u64; 4], @@ -557,21 +557,11 @@ impl PluginHandle { let fetch: Option = lib.get::(b"lj_ingest_fetch\0").ok().map(|sym| *sym); let set_generic_callback: Option = lib.get::(b"lj_ingest_set_generic_callback\0").ok().map(|sym| *sym); - let last_error: Option = - lib.get::(b"lj_ingest_last_error\0").ok().map(|sym| *sym); + let last_error: Option = lib.get::(b"lj_ingest_last_error\0").ok().map(|sym| *sym); let free: libloading::Symbol = lib.get(b"lj_ingest_free\0").map_err(|err| io::Error::other(format!("symbol lj_ingest_free: {err}")))?; - Ok(Self { - create: *create, - set_callback: *set_callback, - feed: *feed, - fetch, - set_generic_callback, - last_error, - free: *free, - _lib: lib, - }) + Ok(Self { create: *create, set_callback: *set_callback, feed: *feed, fetch, set_generic_callback, last_error, free: *free, _lib: lib }) } } @@ -789,35 +779,59 @@ pub(crate) fn build_otlp_payload(rec: OtlpRecord<'_>) -> Vec { /// Runs the plugin ingest loop: loads the .so, then either calls /// `lj_ingest_fetch` (active plugin) or binds TCP and feeds bytes (passive). -pub fn plugin_ingest_loop(bind_addr: &str, plugin_path: &Path, spool: Arc, next_seq: Arc) -> io::Result<()> { +pub fn plugin_ingest_loop( + bind_addr: &str, plugin_path: &Path, plugin_env: &[String], spool: Arc, next_seq: Arc, +) -> io::Result<()> { + // Set plugin-specific environment variables before loading. + let prev_vars: Vec<_> = plugin_env + .iter() + .filter_map(|env| { + let (k, v) = env.split_once('=')?; + let prev = std::env::var(k).ok(); + unsafe { std::env::set_var(k, v) }; + Some((k.to_string(), prev)) + }) + .collect(); + let handle = Arc::new(PluginHandle::load(plugin_path)?); - if handle.is_active() { + let result = if handle.is_active() { eprintln!("ljd ingest using active plugin {}", plugin_path.display()); - return run_active_plugin(&handle, spool, next_seq); - } - - let listener = TcpListener::bind(bind_addr)?; - eprintln!("ljd ingest listening on {bind_addr} using passive plugin {}", plugin_path.display()); - - for stream in listener.incoming() { - let stream = stream?; - let peer = stream.peer_addr().ok(); - let handle = Arc::clone(&handle); - let spool = Arc::clone(&spool); - let next_seq = Arc::clone(&next_seq); + run_active_plugin(&handle, spool, next_seq) + } else { + let listener = TcpListener::bind(bind_addr)?; + eprintln!("ljd ingest listening on {bind_addr} using passive plugin {}", plugin_path.display()); + let mut first = true; + for stream in listener.incoming() { + let stream = stream?; + let peer = stream.peer_addr().ok(); + let handle = Arc::clone(&handle); + let spool = Arc::clone(&spool); + let next_seq = Arc::clone(&next_seq); + + thread::Builder::new().name("ljd-plugin-client".to_string()).spawn(move || { + if let Err(err) = handle_plugin_client(stream, &handle, spool, next_seq) { + eprintln!("ljd plugin client error: {err}"); + } + if let Some(peer) = peer { + eprintln!("ljd plugin client disconnected: {peer}"); + } + })?; + first = false; + } + #[allow(unreachable_code)] + if first { Ok(()) } else { unreachable!() } + }; - thread::Builder::new().name("ljd-plugin-client".to_string()).spawn(move || { - if let Err(err) = handle_plugin_client(stream, &handle, spool, next_seq) { - eprintln!("ljd plugin client error: {err}"); - } - if let Some(peer) = peer { - eprintln!("ljd plugin client disconnected: {peer}"); - } - })?; + // Restore previous environment variables. + for (k, prev) in prev_vars { + match prev { + Some(val) => unsafe { std::env::set_var(&k, val) }, + None => unsafe { std::env::remove_var(&k) }, + } } - Ok(()) + result } /// Handles a single TCP client through the plugin parser. @@ -879,13 +893,14 @@ fn run_active_plugin(handle: &PluginHandle, spool: Arc io::Result<()> { )?; let bridge_config = dir.write( "bridge.conf", - &format!("collector.url: http://127.0.0.1:{}/v1/logs\nupstream.replay: 127.0.0.1:{replay_port}\nupstream.mode: drain\n", reserved_port_addr(&collector_port)), + &format!( + "collector.url: http://127.0.0.1:{}/v1/logs\nupstream.replay: 127.0.0.1:{replay_port}\nupstream.mode: drain\n", + reserved_port_addr(&collector_port) + ), )?; let _appliance = ChildGuard::spawn({ diff --git a/logjetd/tests/common/mod.rs b/logjetd/tests/common/mod.rs index 2691751..e7675cf 100644 --- a/logjetd/tests/common/mod.rs +++ b/logjetd/tests/common/mod.rs @@ -16,8 +16,8 @@ use opentelemetry_proto::tonic::logs::v1::{LogRecord, ResourceLogs, ScopeLogs, S use opentelemetry_proto::tonic::resource::v1::Resource; use prost::Message; use rcgen::{BasicConstraints, Certificate, CertificateParams, DistinguishedName, DnType, ExtendedKeyUsagePurpose, IsCa, SanType}; -use tokio::runtime::Builder; use tokio::net::TcpListener as TokioTcpListener; +use tokio::runtime::Builder; use tokio_stream::wrappers::TcpListenerStream; use tonic::transport::{Identity, ServerTlsConfig}; use tonic::{Request, Response, Status}; @@ -269,9 +269,7 @@ impl MockGrpcCollector { GrpcTlsMode::Tls { tls } => { let server_cert = fs::read(&tls.server_cert).expect("gRPC TLS server cert"); let server_key = fs::read(&tls.server_key).expect("gRPC TLS server key"); - builder - .tls_config(ServerTlsConfig::new().identity(Identity::from_pem(server_cert, server_key))) - .expect("gRPC TLS config") + builder.tls_config(ServerTlsConfig::new().identity(Identity::from_pem(server_cert, server_key))).expect("gRPC TLS config") } GrpcTlsMode::Mtls { tls } => { let ca = fs::read(&tls.ca).expect("gRPC mTLS CA"); @@ -289,11 +287,7 @@ impl MockGrpcCollector { let listener = TokioTcpListener::from_std(listener).expect("gRPC listener"); let incoming = TcpListenerStream::new(listener); - builder - .add_service(LogsServiceServer::new(service)) - .serve_with_incoming(incoming) - .await - .expect("gRPC collector server"); + builder.add_service(LogsServiceServer::new(service)).serve_with_incoming(incoming).await.expect("gRPC collector server"); }); }); Ok(Self { received, _thread: thread }) diff --git a/logjetd/tests/unit/spool_utst.rs b/logjetd/tests/unit/spool_utst.rs index a8cd94d..092d459 100644 --- a/logjetd/tests/unit/spool_utst.rs +++ b/logjetd/tests/unit/spool_utst.rs @@ -583,16 +583,14 @@ fn verify_otlp_payload_rejects_invalid_log_payload() { #[test] fn verify_otlp_payload_rejects_invalid_metrics_payload() { - let record = - logjet::OwnedRecord { record_type: RecordType::Metrics, seq: 1, ts_unix_ns: 1, payload: b"not metrics protobuf".to_vec() }; + let record = logjet::OwnedRecord { record_type: RecordType::Metrics, seq: 1, ts_unix_ns: 1, payload: b"not metrics protobuf".to_vec() }; let err = super::verify_otlp_payload(&record).unwrap_err(); assert_eq!(err.kind(), std::io::ErrorKind::InvalidData); } #[test] fn verify_otlp_payload_rejects_invalid_traces_payload() { - let record = - logjet::OwnedRecord { record_type: RecordType::Traces, seq: 1, ts_unix_ns: 1, payload: b"not traces protobuf".to_vec() }; + let record = logjet::OwnedRecord { record_type: RecordType::Traces, seq: 1, ts_unix_ns: 1, payload: b"not traces protobuf".to_vec() }; let err = super::verify_otlp_payload(&record).unwrap_err(); assert_eq!(err.kind(), std::io::ErrorKind::InvalidData); } @@ -617,8 +615,7 @@ fn verify_otlp_payload_accepts_events_as_log_payload() { #[test] fn verify_otlp_payload_rejects_invalid_events_payload() { - let record = - logjet::OwnedRecord { record_type: RecordType::Events, seq: 1, ts_unix_ns: 1, payload: b"not protobuf".to_vec() }; + let record = logjet::OwnedRecord { record_type: RecordType::Events, seq: 1, ts_unix_ns: 1, payload: b"not protobuf".to_vec() }; let err = super::verify_otlp_payload(&record).unwrap_err(); assert_eq!(err.kind(), std::io::ErrorKind::InvalidData); } diff --git a/plugins/perfetto-ingest/src/lib.rs b/plugins/perfetto-ingest/src/lib.rs index 58cc1ab..4dacaa7 100644 --- a/plugins/perfetto-ingest/src/lib.rs +++ b/plugins/perfetto-ingest/src/lib.rs @@ -9,6 +9,8 @@ mod log_mapper; mod metric_mapper; mod metrics_reader; mod perfetto_invoke; +mod rpc_client; +mod rpc_reader; mod sqlite_reader; mod timestamp; mod trace_mapper; @@ -232,20 +234,102 @@ pub unsafe extern "C" fn lj_ingest_fetch(ctx: *mut PerfettoPlugin) -> c_int { } fn run_pipeline(plugin: &mut PerfettoPlugin, trace_file: &std::path::Path) -> Result<(), String> { - let tp_path = perfetto_invoke::find_trace_processor() - .map_err(|err| format!("trace_processor not found: {err}"))?; + let tp_path = perfetto_invoke::find_trace_processor().map_err(|err| format!("trace_processor not found: {err}"))?; eprintln!("perfetto-ingest: using trace_processor {}", tp_path.display()); - eprintln!("perfetto-ingest: exporting SQLite from {}", trace_file.display()); - let sqlite_path = perfetto_invoke::export_sqlite(trace_file, &tp_path) - .map_err(|err| format!("SQLite export failed: {err}"))?; + let use_rpc = std::env::var("LJD_PERFETTO_ACQUISITION").as_deref() == Ok("rpc"); - let db = sqlite_reader::PerfettoDb::open(&sqlite_path) - .map_err(|err| format!("failed to open exported DB: {err}"))?; + if use_rpc { + eprintln!("perfetto-ingest: RPC acquisition mode"); + let mut reader = rpc_reader::RpcReader::new(&tp_path, trace_file); + run_pipeline_impl(plugin, &mut reader, trace_file, &tp_path) + } else { + eprintln!("perfetto-ingest: exporting SQLite from {}", trace_file.display()); + let sqlite_path = perfetto_invoke::export_sqlite(trace_file, &tp_path).map_err(|err| format!("SQLite export failed: {err}"))?; + let mut db = sqlite_reader::PerfettoDb::open(&sqlite_path).map_err(|err| format!("failed to open exported DB: {err}"))?; + let result = run_pipeline_impl(plugin, &mut db, trace_file, &tp_path); + let _ = std::fs::remove_file(&sqlite_path); + result + } +} + +pub(crate) trait Reader { + fn read_clock_snapshots(&mut self) -> Result, String>; + fn read_slices(&mut self) -> Result, String>; + fn read_sched_slices(&mut self) -> Result, String>; + fn read_thread_states(&mut self) -> Result, String>; + fn read_ftrace_events(&mut self) -> Result, String>; + fn read_spurious_wakeups(&mut self) -> Result, String>; + fn read_instants(&mut self) -> Result, String>; + fn read_threads(&mut self) -> Result, String>; + fn read_processes(&mut self) -> Result, String>; +} - let snaps = db.read_clock_snapshots() - .map_err(|err| format!("failed to read clock snapshots: {err}"))?; +impl Reader for sqlite_reader::PerfettoDb { + fn read_clock_snapshots(&mut self) -> Result, String> { + sqlite_reader::PerfettoDb::read_clock_snapshots(self) + } + fn read_slices(&mut self) -> Result, String> { + sqlite_reader::PerfettoDb::read_slices(self) + } + fn read_sched_slices(&mut self) -> Result, String> { + sqlite_reader::PerfettoDb::read_sched_slices(self) + } + fn read_thread_states(&mut self) -> Result, String> { + sqlite_reader::PerfettoDb::read_thread_states(self) + } + fn read_ftrace_events(&mut self) -> Result, String> { + sqlite_reader::PerfettoDb::read_ftrace_events(self) + } + fn read_spurious_wakeups(&mut self) -> Result, String> { + sqlite_reader::PerfettoDb::read_spurious_wakeups(self) + } + fn read_instants(&mut self) -> Result, String> { + sqlite_reader::PerfettoDb::read_instants(self) + } + fn read_threads(&mut self) -> Result, String> { + sqlite_reader::PerfettoDb::read_threads(self) + } + fn read_processes(&mut self) -> Result, String> { + sqlite_reader::PerfettoDb::read_processes(self) + } +} + +impl Reader for rpc_reader::RpcReader { + fn read_clock_snapshots(&mut self) -> Result, String> { + self.read_clock_snapshots().map_err(|e| e.to_string()) + } + fn read_slices(&mut self) -> Result, String> { + self.read_slices().map_err(|e| e.to_string()) + } + fn read_sched_slices(&mut self) -> Result, String> { + self.read_sched_slices().map_err(|e| e.to_string()) + } + fn read_thread_states(&mut self) -> Result, String> { + self.read_thread_states().map_err(|e| e.to_string()) + } + fn read_ftrace_events(&mut self) -> Result, String> { + self.read_ftrace_events().map_err(|e| e.to_string()) + } + fn read_spurious_wakeups(&mut self) -> Result, String> { + self.read_spurious_wakeups().map_err(|e| e.to_string()) + } + fn read_instants(&mut self) -> Result, String> { + self.read_instants().map_err(|e| e.to_string()) + } + fn read_threads(&mut self) -> Result, String> { + self.read_threads().map_err(|e| e.to_string()) + } + fn read_processes(&mut self) -> Result, String> { + self.read_processes().map_err(|e| e.to_string()) + } +} + +fn run_pipeline_impl( + plugin: &mut PerfettoPlugin, reader: &mut impl Reader, _trace_file: &std::path::Path, _tp_path: &std::path::Path, +) -> Result<(), String> { + let snaps = reader.read_clock_snapshots()?; let policy = match std::env::var("LJD_PERFETTO_TIMESTAMP_POLICY").as_deref() { Ok("require-realtime") => timestamp::TimestampPolicy::RequireRealtime, @@ -260,32 +344,10 @@ fn run_pipeline(plugin: &mut PerfettoPlugin, trace_file: &std::path::Path) -> Re eprintln!("perfetto-ingest: no realtime clock snapshots — timestamps will be unavailable"); } - // Buffer all emissions through a thread-local buffer, sort by timestamp, - // then flush through the real callback. This guarantees monotonicity. EMIT_BUF.with(|buf| buf.borrow_mut().clear()); - // Skip trace mapping for now — ljx view doesn't decode ExportTraceServiceRequest - // yet, so trace records render as binary garbage. - // eprintln!("perfetto-ingest: mapping traces..."); - // trace_mapper::map_traces(&db, &converter, buffer_emit, plugin)?; - eprintln!("perfetto-ingest: mapping logs..."); - log_mapper::map_logs(&db, &converter, buffer_emit, plugin)?; - - // Optional metrics - let metrics_names: Vec = std::env::var("LJD_PERFETTO_METRICS") - .ok() - .map(|s| s.split(',').map(|s| s.trim().to_string()).collect::>()) - .unwrap_or_default(); - let metrics_refs: Vec<&str> = metrics_names.iter().map(|s| s.as_str()).collect(); - if !metrics_refs.is_empty() - && let Ok(Some(metrics_path)) = perfetto_invoke::export_metrics(trace_file, &tp_path, &metrics_refs) { - eprintln!("perfetto-ingest: mapping metrics..."); - let metrics = metrics_reader::parse_metrics_json(&metrics_path) - .map_err(|err| format!("failed to parse metrics JSON: {err}"))?; - metric_mapper::map_metrics(&metrics, &converter, buffer_emit, plugin)?; - let _ = std::fs::remove_file(&metrics_path); - } + log_mapper::map_logs(reader, &converter, buffer_emit, plugin)?; let mut all: Vec<(u32, u64, Vec)> = Vec::new(); EMIT_BUF.with(|buf| all = std::mem::take(&mut *buf.borrow_mut())); @@ -295,8 +357,6 @@ fn run_pipeline(plugin: &mut PerfettoPlugin, trace_file: &std::path::Path) -> Re unsafe { emit_generic(plugin, *rt, *ts, payload) }; } - let _ = std::fs::remove_file(&sqlite_path); - eprintln!("perfetto-ingest: done"); Ok(()) } @@ -322,12 +382,7 @@ pub unsafe extern "C" fn lj_ingest_free(ctx: *mut PerfettoPlugin) { /// /// `ctx` must have a generic callback set. #[allow(dead_code)] -pub(crate) unsafe fn emit_generic( - ctx: &PerfettoPlugin, - record_type: u32, - ts_unix_ns: u64, - payload: &[u8], -) { +pub(crate) unsafe fn emit_generic(ctx: &PerfettoPlugin, record_type: u32, ts_unix_ns: u64, payload: &[u8]) { let Some(cb) = ctx.generic_callback else { return; }; diff --git a/plugins/perfetto-ingest/src/log_mapper.rs b/plugins/perfetto-ingest/src/log_mapper.rs index 3e77257..4e50fe1 100644 --- a/plugins/perfetto-ingest/src/log_mapper.rs +++ b/plugins/perfetto-ingest/src/log_mapper.rs @@ -7,7 +7,7 @@ use opentelemetry_proto::tonic::logs::v1::{LogRecord, ResourceLogs, ScopeLogs}; use opentelemetry_proto::tonic::resource::v1::Resource; use prost::Message; -use crate::sqlite_reader::{PerfettoDb, PerfettoFtraceEvent, PerfettoInstant, PerfettoSchedSlice, PerfettoSlice, PerfettoSpuriousWakeup, PerfettoThreadState}; +use crate::sqlite_reader::{PerfettoFtraceEvent, PerfettoInstant, PerfettoSchedSlice, PerfettoSlice, PerfettoSpuriousWakeup, PerfettoThreadState}; use crate::timestamp::TimestampConverter; const SEVERITY_INFO: i32 = 9; @@ -17,30 +17,40 @@ fn dur_str(ns: i64) -> String { } pub fn map_logs( - db: &PerfettoDb, - converter: &TimestampConverter, - emit: unsafe fn(ctx: &crate::PerfettoPlugin, record_type: u32, ts_unix_ns: u64, payload: &[u8]), - plugin: &crate::PerfettoPlugin, + db: &mut impl crate::Reader, converter: &TimestampConverter, + emit: unsafe fn(ctx: &crate::PerfettoPlugin, record_type: u32, ts_unix_ns: u64, payload: &[u8]), plugin: &crate::PerfettoPlugin, ) -> Result<(), String> { let mut all: Vec = Vec::new(); for slice in &db.read_slices()? { - if let Some(rec) = maybe_slice_to_log(slice, converter) { all.push(rec); } + if let Some(rec) = maybe_slice_to_log(slice, converter) { + all.push(rec); + } } for s in &db.read_sched_slices()? { - if let Some(rec) = maybe_sched_slice_to_log(s, converter) { all.push(rec); } + if let Some(rec) = maybe_sched_slice_to_log(s, converter) { + all.push(rec); + } } for ts in &db.read_thread_states()? { - if let Some(rec) = maybe_thread_state_to_log(ts, converter) { all.push(rec); } + if let Some(rec) = maybe_thread_state_to_log(ts, converter) { + all.push(rec); + } } for ev in &db.read_ftrace_events()? { - if let Some(rec) = maybe_ftrace_event_to_log(ev, converter) { all.push(rec); } + if let Some(rec) = maybe_ftrace_event_to_log(ev, converter) { + all.push(rec); + } } for w in &db.read_spurious_wakeups()? { - if let Some(rec) = maybe_spurious_wakeup_to_log(w, converter) { all.push(rec); } + if let Some(rec) = maybe_spurious_wakeup_to_log(w, converter) { + all.push(rec); + } } for inst in &db.read_instants()? { - if let Some(rec) = maybe_instant_to_log(inst, converter) { all.push(rec); } + if let Some(rec) = maybe_instant_to_log(inst, converter) { + all.push(rec); + } } all.sort_by_key(|r| r.time_unix_nano); @@ -87,8 +97,10 @@ fn maybe_slice_to_log(slice: &PerfettoSlice, converter: &TimestampConverter) -> let name = slice.name.as_deref().unwrap_or("(unnamed)"); let body = format!("{name} dur={dur} depth={}", slice.depth); Some(LogRecord { - time_unix_nano: ts, observed_time_unix_nano: ts, - severity_number: SEVERITY_INFO, severity_text: "INFO".to_string(), + time_unix_nano: ts, + observed_time_unix_nano: ts, + severity_number: SEVERITY_INFO, + severity_text: "INFO".to_string(), body: Some(AnyValue { value: Some(Value::StringValue(body)) }), attributes: vec![ KeyValue { key: "perfetto.slice.id".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(slice.id)) }) }, @@ -96,8 +108,11 @@ fn maybe_slice_to_log(slice: &PerfettoSlice, converter: &TimestampConverter) -> KeyValue { key: "perfetto.slice.dur_ns".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(slice.dur)) }) }, KeyValue { key: "perfetto.slice.depth".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(slice.depth as i64)) }) }, ], - dropped_attributes_count: 0, flags: 0, - trace_id: Vec::new(), span_id: Vec::new(), event_name: String::new(), + dropped_attributes_count: 0, + flags: 0, + trace_id: Vec::new(), + span_id: Vec::new(), + event_name: String::new(), }) } @@ -107,8 +122,10 @@ fn maybe_sched_slice_to_log(s: &PerfettoSchedSlice, converter: &TimestampConvert let dur = dur_str(s.dur); let body = format!("cpu={} state={end} utid={} dur={dur}", s.cpu, s.utid); Some(LogRecord { - time_unix_nano: ts, observed_time_unix_nano: ts, - severity_number: SEVERITY_INFO, severity_text: "INFO".to_string(), + time_unix_nano: ts, + observed_time_unix_nano: ts, + severity_number: SEVERITY_INFO, + severity_text: "INFO".to_string(), body: Some(AnyValue { value: Some(Value::StringValue(body)) }), attributes: vec![ KeyValue { key: "perfetto.sched.id".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(s.id)) }) }, @@ -116,8 +133,11 @@ fn maybe_sched_slice_to_log(s: &PerfettoSchedSlice, converter: &TimestampConvert KeyValue { key: "perfetto.sched.dur_ns".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(s.dur)) }) }, KeyValue { key: "perfetto.sched.end_state".to_string(), value: Some(AnyValue { value: Some(Value::StringValue(end.to_string())) }) }, ], - dropped_attributes_count: 0, flags: 0, - trace_id: Vec::new(), span_id: Vec::new(), event_name: String::new(), + dropped_attributes_count: 0, + flags: 0, + trace_id: Vec::new(), + span_id: Vec::new(), + event_name: String::new(), }) } @@ -126,10 +146,18 @@ fn maybe_thread_state_to_log(ts: &PerfettoThreadState, converter: &TimestampConv let state = ts.state.as_deref().unwrap_or("?"); let dur = dur_str(ts.dur); let mut body = format!("state={state} dur={dur} utid={}", ts.utid); - if let Some(cpu) = ts.cpu { body.push_str(&format!(" cpu={cpu}")); } - if ts.io_wait == Some(true) { body.push_str(" io_wait"); } - if let Some(ref blocked) = ts.blocked_function { body.push_str(&format!(" blocked={blocked}")); } - if let Some(waker) = ts.waker_utid { body.push_str(&format!(" waker={waker}")); } + if let Some(cpu) = ts.cpu { + body.push_str(&format!(" cpu={cpu}")); + } + if ts.io_wait == Some(true) { + body.push_str(" io_wait"); + } + if let Some(ref blocked) = ts.blocked_function { + body.push_str(&format!(" blocked={blocked}")); + } + if let Some(waker) = ts.waker_utid { + body.push_str(&format!(" waker={waker}")); + } let mut attrs = vec![ KeyValue { key: "perfetto.ts.id".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(ts.id)) }) }, @@ -137,17 +165,31 @@ fn maybe_thread_state_to_log(ts: &PerfettoThreadState, converter: &TimestampConv KeyValue { key: "perfetto.ts.dur_ns".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(ts.dur)) }) }, KeyValue { key: "perfetto.ts.utid".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(ts.utid)) }) }, ]; - if let Some(cpu) = ts.cpu { attrs.push(KeyValue { key: "perfetto.ts.cpu".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(cpu)) }) }); } - if let Some(io) = ts.io_wait { attrs.push(KeyValue { key: "perfetto.ts.io_wait".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(i64::from(io))) }) }); } - if let Some(ref blocked) = ts.blocked_function { attrs.push(KeyValue { key: "perfetto.ts.blocked_function".to_string(), value: Some(AnyValue { value: Some(Value::StringValue(blocked.clone())) }) }); } + if let Some(cpu) = ts.cpu { + attrs.push(KeyValue { key: "perfetto.ts.cpu".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(cpu)) }) }); + } + if let Some(io) = ts.io_wait { + attrs.push(KeyValue { key: "perfetto.ts.io_wait".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(i64::from(io))) }) }); + } + if let Some(ref blocked) = ts.blocked_function { + attrs.push(KeyValue { + key: "perfetto.ts.blocked_function".to_string(), + value: Some(AnyValue { value: Some(Value::StringValue(blocked.clone())) }), + }); + } Some(LogRecord { - time_unix_nano: t, observed_time_unix_nano: t, - severity_number: SEVERITY_INFO, severity_text: "INFO".to_string(), + time_unix_nano: t, + observed_time_unix_nano: t, + severity_number: SEVERITY_INFO, + severity_text: "INFO".to_string(), body: Some(AnyValue { value: Some(Value::StringValue(body)) }), attributes: attrs, - dropped_attributes_count: 0, flags: 0, - trace_id: Vec::new(), span_id: Vec::new(), event_name: String::new(), + dropped_attributes_count: 0, + flags: 0, + trace_id: Vec::new(), + span_id: Vec::new(), + event_name: String::new(), }) } @@ -161,14 +203,21 @@ fn maybe_ftrace_event_to_log(ev: &PerfettoFtraceEvent, converter: &TimestampConv KeyValue { key: "perfetto.ftrace.name".to_string(), value: Some(AnyValue { value: Some(Value::StringValue(name.to_string())) }) }, KeyValue { key: "perfetto.ftrace.cpu".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(cpu)) }) }, ]; - if let Some(utid) = ev.utid { attrs.push(KeyValue { key: "perfetto.ftrace.utid".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(utid)) }) }); } + if let Some(utid) = ev.utid { + attrs.push(KeyValue { key: "perfetto.ftrace.utid".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(utid)) }) }); + } Some(LogRecord { - time_unix_nano: t, observed_time_unix_nano: t, - severity_number: SEVERITY_INFO, severity_text: "INFO".to_string(), + time_unix_nano: t, + observed_time_unix_nano: t, + severity_number: SEVERITY_INFO, + severity_text: "INFO".to_string(), body: Some(AnyValue { value: Some(Value::StringValue(body)) }), attributes: attrs, - dropped_attributes_count: 0, flags: 0, - trace_id: Vec::new(), span_id: Vec::new(), event_name: String::new(), + dropped_attributes_count: 0, + flags: 0, + trace_id: Vec::new(), + span_id: Vec::new(), + event_name: String::new(), }) } @@ -176,15 +225,24 @@ fn maybe_spurious_wakeup_to_log(w: &PerfettoSpuriousWakeup, converter: &Timestam let t = converter.to_realtime(w.ts).ok().flatten().unwrap_or(0); let body = format!("spurious_wakeup utid={}", w.utid.unwrap_or(-1)); let mut attrs = vec![KeyValue { key: "perfetto.sw.id".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(w.id)) }) }]; - if let Some(utid) = w.utid { attrs.push(KeyValue { key: "perfetto.sw.utid".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(utid)) }) }); } - if let Some(waker) = w.waker_utid { attrs.push(KeyValue { key: "perfetto.sw.waker_utid".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(waker)) }) }); } + if let Some(utid) = w.utid { + attrs.push(KeyValue { key: "perfetto.sw.utid".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(utid)) }) }); + } + if let Some(waker) = w.waker_utid { + attrs.push(KeyValue { key: "perfetto.sw.waker_utid".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(waker)) }) }); + } Some(LogRecord { - time_unix_nano: t, observed_time_unix_nano: t, - severity_number: SEVERITY_INFO, severity_text: "INFO".to_string(), + time_unix_nano: t, + observed_time_unix_nano: t, + severity_number: SEVERITY_INFO, + severity_text: "INFO".to_string(), body: Some(AnyValue { value: Some(Value::StringValue(body)) }), attributes: attrs, - dropped_attributes_count: 0, flags: 0, - trace_id: Vec::new(), span_id: Vec::new(), event_name: String::new(), + dropped_attributes_count: 0, + flags: 0, + trace_id: Vec::new(), + span_id: Vec::new(), + event_name: String::new(), }) } @@ -193,41 +251,65 @@ fn maybe_instant_to_log(inst: &PerfettoInstant, converter: &TimestampConverter) let name = inst.name.as_deref().unwrap_or("?"); let body = name.to_string(); Some(LogRecord { - time_unix_nano: t, observed_time_unix_nano: t, - severity_number: SEVERITY_INFO, severity_text: "INFO".to_string(), + time_unix_nano: t, + observed_time_unix_nano: t, + severity_number: SEVERITY_INFO, + severity_text: "INFO".to_string(), body: Some(AnyValue { value: Some(Value::StringValue(body)) }), attributes: vec![ KeyValue { key: "perfetto.instant.name".to_string(), value: Some(AnyValue { value: Some(Value::StringValue(name.to_string())) }) }, KeyValue { key: "perfetto.instant.track_id".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(inst.track_id)) }) }, ], - dropped_attributes_count: 0, flags: 0, - trace_id: Vec::new(), span_id: Vec::new(), event_name: String::new(), + dropped_attributes_count: 0, + flags: 0, + trace_id: Vec::new(), + span_id: Vec::new(), + event_name: String::new(), }) } fn emit_summary( count_slices: usize, count_threads: usize, count_processes: usize, - emit: unsafe fn(ctx: &crate::PerfettoPlugin, record_type: u32, ts_unix_ns: u64, payload: &[u8]), - plugin: &crate::PerfettoPlugin, - ts: u64, + emit: unsafe fn(ctx: &crate::PerfettoPlugin, record_type: u32, ts_unix_ns: u64, payload: &[u8]), plugin: &crate::PerfettoPlugin, ts: u64, ) { let body = format!("Perfetto trace analysis complete: {} slices, {} threads, {} processes", count_slices, count_threads, count_processes); let record = LogRecord { - time_unix_nano: ts, observed_time_unix_nano: ts, - severity_number: SEVERITY_INFO, severity_text: "INFO".to_string(), + time_unix_nano: ts, + observed_time_unix_nano: ts, + severity_number: SEVERITY_INFO, + severity_text: "INFO".to_string(), body: Some(AnyValue { value: Some(Value::StringValue(body)) }), attributes: vec![ KeyValue { key: "perfetto.slices".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(count_slices as i64)) }) }, KeyValue { key: "perfetto.threads".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(count_threads as i64)) }) }, KeyValue { key: "perfetto.processes".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(count_processes as i64)) }) }, ], - dropped_attributes_count: 0, flags: 0, - trace_id: Vec::new(), span_id: Vec::new(), event_name: String::new(), + dropped_attributes_count: 0, + flags: 0, + trace_id: Vec::new(), + span_id: Vec::new(), + event_name: String::new(), }; let request = ExportLogsServiceRequest { resource_logs: vec![ResourceLogs { - resource: Some(Resource { attributes: vec![KeyValue { key: "service.name".to_string(), value: Some(AnyValue { value: Some(Value::StringValue("perfetto".to_string())) }) }], dropped_attributes_count: 0, entity_refs: Vec::new() }), - scope_logs: vec![ScopeLogs { scope: Some(InstrumentationScope { name: "perfetto-ingest".to_string(), version: String::new(), attributes: Vec::new(), dropped_attributes_count: 0 }), log_records: vec![record], schema_url: String::new() }], + resource: Some(Resource { + attributes: vec![KeyValue { + key: "service.name".to_string(), + value: Some(AnyValue { value: Some(Value::StringValue("perfetto".to_string())) }), + }], + dropped_attributes_count: 0, + entity_refs: Vec::new(), + }), + scope_logs: vec![ScopeLogs { + scope: Some(InstrumentationScope { + name: "perfetto-ingest".to_string(), + version: String::new(), + attributes: Vec::new(), + dropped_attributes_count: 0, + }), + log_records: vec![record], + schema_url: String::new(), + }], schema_url: String::new(), }], }; diff --git a/plugins/perfetto-ingest/src/metric_mapper.rs b/plugins/perfetto-ingest/src/metric_mapper.rs index 207e635..da3ea54 100644 --- a/plugins/perfetto-ingest/src/metric_mapper.rs +++ b/plugins/perfetto-ingest/src/metric_mapper.rs @@ -1,12 +1,11 @@ //! Maps Perfetto metrics to OTel metrics. +#![allow(dead_code)] use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; use opentelemetry_proto::tonic::common::v1::any_value::Value; use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue}; use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value as DataPointValue; -use opentelemetry_proto::tonic::metrics::v1::{ - Metric, NumberDataPoint, ResourceMetrics, ScopeMetrics, -}; +use opentelemetry_proto::tonic::metrics::v1::{Metric, NumberDataPoint, ResourceMetrics, ScopeMetrics}; use opentelemetry_proto::tonic::resource::v1::Resource; use prost::Message; @@ -14,15 +13,10 @@ use crate::metrics_reader::PerfettoMetric; use crate::timestamp::TimestampConverter; pub fn map_metrics( - metrics: &[PerfettoMetric], - _converter: &TimestampConverter, - emit: unsafe fn(ctx: &crate::PerfettoPlugin, record_type: u32, ts_unix_ns: u64, payload: &[u8]), - plugin: &crate::PerfettoPlugin, + metrics: &[PerfettoMetric], _converter: &TimestampConverter, + emit: unsafe fn(ctx: &crate::PerfettoPlugin, record_type: u32, ts_unix_ns: u64, payload: &[u8]), plugin: &crate::PerfettoPlugin, ) -> Result<(), String> { - let now_ns = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_nanos() as u64; + let now_ns = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_nanos() as u64; let mut otel_metrics: Vec = Vec::new(); @@ -55,11 +49,7 @@ pub fn map_metrics( }; let request = ExportMetricsServiceRequest { - resource_metrics: vec![ResourceMetrics { - resource: Some(resource), - scope_metrics: vec![scope_metrics], - schema_url: String::new(), - }], + resource_metrics: vec![ResourceMetrics { resource: Some(resource), scope_metrics: vec![scope_metrics], schema_url: String::new() }], }; let payload = request.encode_to_vec(); @@ -68,11 +58,7 @@ pub fn map_metrics( Ok(()) } -fn flatten_metrics( - metric: &PerfettoMetric, - out: &mut Vec, - prefix: &mut Vec, -) { +fn flatten_metrics(metric: &PerfettoMetric, out: &mut Vec, prefix: &mut Vec) { prefix.push(metric.name.clone()); let full_name = prefix.join("."); @@ -80,28 +66,23 @@ fn flatten_metrics( let attrs: Vec = metric .labels .iter() - .map(|(k, v)| KeyValue { - key: k.clone(), - value: Some(AnyValue { value: Some(Value::StringValue(v.clone())) }), - }) + .map(|(k, v)| KeyValue { key: k.clone(), value: Some(AnyValue { value: Some(Value::StringValue(v.clone())) }) }) .collect(); out.push(Metric { name: full_name, description: metric.description.clone().unwrap_or_default(), unit: metric.unit.clone().unwrap_or_default(), - data: Some(opentelemetry_proto::tonic::metrics::v1::metric::Data::Gauge( - opentelemetry_proto::tonic::metrics::v1::Gauge { - data_points: vec![NumberDataPoint { - attributes: attrs, - start_time_unix_nano: 0, - time_unix_nano: 0, - value: Some(DataPointValue::AsDouble(scalar)), - flags: 0, - exemplars: Vec::new(), - }], - }, - )), + data: Some(opentelemetry_proto::tonic::metrics::v1::metric::Data::Gauge(opentelemetry_proto::tonic::metrics::v1::Gauge { + data_points: vec![NumberDataPoint { + attributes: attrs, + start_time_unix_nano: 0, + time_unix_nano: 0, + value: Some(DataPointValue::AsDouble(scalar)), + flags: 0, + exemplars: Vec::new(), + }], + })), metadata: Vec::new(), }); } diff --git a/plugins/perfetto-ingest/src/metrics_reader.rs b/plugins/perfetto-ingest/src/metrics_reader.rs index 74196b5..fd690b9 100644 --- a/plugins/perfetto-ingest/src/metrics_reader.rs +++ b/plugins/perfetto-ingest/src/metrics_reader.rs @@ -1,7 +1,5 @@ //! Parses Perfetto metrics JSON output. -//! -//! The `trace_processor metrics --run NAMES --output json` command writes -//! JSON to stdout. Each metric is a top-level key in the JSON object. +#![allow(dead_code)] use std::path::Path; @@ -36,11 +34,9 @@ pub struct PerfettoMetric { /// ``` /// or for structured metrics with nested entries. pub fn parse_metrics_json(path: &Path) -> Result, String> { - let bytes = std::fs::read(path) - .map_err(|err| format!("failed to read metrics JSON file {}: {err}", path.display()))?; + let bytes = std::fs::read(path).map_err(|err| format!("failed to read metrics JSON file {}: {err}", path.display()))?; - let root: serde_json::Value = serde_json::from_slice(&bytes) - .map_err(|err| format!("failed to parse metrics JSON: {err}"))?; + let root: serde_json::Value = serde_json::from_slice(&bytes).map_err(|err| format!("failed to parse metrics JSON: {err}"))?; let obj = root.as_object().ok_or_else(|| format!("metrics JSON root is not an object: {}", path.display()))?; @@ -55,10 +51,7 @@ pub fn parse_metrics_json(path: &Path) -> Result, String> { fn parse_metric(name: &str, value: &serde_json::Value) -> PerfettoMetric { let obj = value.as_object(); - let scalar_value = obj - .and_then(|o| o.get("value")) - .and_then(|v| v.as_f64()) - .or_else(|| value.as_f64()); + let scalar_value = obj.and_then(|o| o.get("value")).and_then(|v| v.as_f64()).or_else(|| value.as_f64()); let description = obj.and_then(|o| o.get("description")).and_then(|v| v.as_str()).map(String::from); @@ -73,14 +66,7 @@ fn parse_metric(name: &str, value: &serde_json::Value) -> PerfettoMetric { }) .unwrap_or_default(); - let children = obj - .map(|o| { - o.iter() - .filter(|(_, v)| v.is_object()) - .map(|(k, v)| parse_metric(k, v)) - .collect() - }) - .unwrap_or_default(); + let children = obj.map(|o| o.iter().filter(|(_, v)| v.is_object()).map(|(k, v)| parse_metric(k, v)).collect()).unwrap_or_default(); PerfettoMetric { name: name.to_string(), description, unit, scalar_value, labels, children } } diff --git a/plugins/perfetto-ingest/src/perfetto_invoke.rs b/plugins/perfetto-ingest/src/perfetto_invoke.rs index 031500a..c7619fe 100644 --- a/plugins/perfetto-ingest/src/perfetto_invoke.rs +++ b/plugins/perfetto-ingest/src/perfetto_invoke.rs @@ -16,10 +16,7 @@ pub fn find_trace_processor() -> io::Result { if path.is_file() { return Ok(path); } - return Err(io::Error::other(format!( - "LJD_PERFETTO_TRACE_PROCESSOR is set but not a file: {}", - path.display() - ))); + return Err(io::Error::other(format!("LJD_PERFETTO_TRACE_PROCESSOR is set but not a file: {}", path.display()))); } for name in ["trace_processor", "trace_processor_shell"] { @@ -28,9 +25,7 @@ pub fn find_trace_processor() -> io::Result { } } - Err(io::Error::other( - "trace_processor not found. Set LJD_PERFETTO_TRACE_PROCESSOR or install Perfetto tools on PATH.", - )) + Err(io::Error::other("trace_processor not found. Set LJD_PERFETTO_TRACE_PROCESSOR or install Perfetto tools on PATH.")) } #[allow(dead_code)] @@ -107,12 +102,15 @@ pub fn export_metrics(trace_file: &Path, tp_path: &Path, metrics: &[&str]) -> io } #[allow(dead_code)] +/// Runs `trace_processor server stdio ` and returns a connected +/// `RpcClient`. The process stays alive until the client is shut down. +pub fn start_server(trace_file: &Path, tp_path: &Path) -> io::Result { + crate::rpc_client::RpcClient::connect(tp_path, trace_file) +} + fn temp_file_path(prefix: &str, suffix: &str) -> io::Result { let pid = std::process::id(); - let ts = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_micros(); + let ts = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_micros(); let name = format!("{prefix}-{ts}-{pid}.{suffix}"); Ok(env::temp_dir().join(name)) } diff --git a/plugins/perfetto-ingest/src/rpc_client.rs b/plugins/perfetto-ingest/src/rpc_client.rs new file mode 100644 index 0000000..e79aca6 --- /dev/null +++ b/plugins/perfetto-ingest/src/rpc_client.rs @@ -0,0 +1,357 @@ +//! RPC client for Perfetto trace_processor `server stdio` mode. +//! Wire format: `[0x0a] [varint len] [serialized TraceProcessorRpc]` + +#![allow(dead_code)] + +use std::io::Write; + +// Varint + +fn read_varint(buf: &[u8], pos: &mut usize) -> Option { + pub fn read_varint_public(buf: &[u8], pos: &mut usize) -> Option { + read_varint(buf, pos) + } + let (mut value, mut shift) = (0u64, 0u32); + loop { + if *pos >= buf.len() { + return None; + } + let byte = buf[*pos]; + *pos += 1; + value |= ((byte & 0x7F) as u64) << shift; + if byte & 0x80 == 0 { + return Some(value); + } + shift += 7; + if shift >= 64 { + return None; + } + } +} + +fn write_varint(mut value: u64) -> Vec { + let mut out = Vec::with_capacity(10); + while value >= 0x80 { + out.push((value as u8 & 0x7F) | 0x80); + value >>= 7; + } + out.push(value as u8); + out +} + +// Wire type helpers + +fn write_tagged_varint(field: u64, value: u64) -> Vec { + let mut out = write_varint(field << 3); + out.extend(write_varint(value)); + out +} + +fn write_tagged_bytes(field: u64, data: &[u8]) -> Vec { + let mut out = write_varint((field << 3) | 2); + out.extend(write_varint(data.len() as u64)); + out.extend_from_slice(data); + out +} + +fn write_tagged_str(field: u64, value: &str) -> Vec { + write_tagged_bytes(field, value.as_bytes()) +} + +// Protobuf field reader + +#[derive(Debug, Clone)] +enum FieldValue<'a> { + Varint(u64), + LengthDelimited(&'a [u8]), +} + +fn read_next_tag<'a>(buf: &'a [u8], pos: &mut usize) -> Option<(u64, FieldValue<'a>)> { + let tag_raw = read_varint(buf, pos)?; + match (tag_raw & 0x07) as u8 { + 0 => read_varint(buf, pos).map(|v| (tag_raw >> 3, FieldValue::Varint(v))), + 2 => { + let len = read_varint(buf, pos)? as usize; + (*pos + len <= buf.len()).then(|| { + let data = &buf[*pos..*pos + len]; + *pos += len; + (tag_raw >> 3, FieldValue::LengthDelimited(data)) + }) + } + _ => { + let wire = (tag_raw & 0x07) as u8; + match wire { + 0 => { + let _ = read_varint(buf, pos); + } + // Skip length-delimited: varint length + bytes + _ => { + if let Some(len) = read_varint(buf, pos) { + *pos = (*pos + len as usize).min(buf.len()); + } + } + } + Some((tag_raw >> 3, FieldValue::Varint(0))) + } + } +} + +// Messages + +const TPM_QUERY_STREAMING: u64 = 3; +const CELL_NULL: i32 = 1; +const CELL_VARINT: i32 = 2; +const CELL_FLOAT64: i32 = 3; +const CELL_STRING: i32 = 4; + +/// Decoded query result from trace_processor, containing column names, error +/// message, and rows of `CellValue` data. +#[derive(Debug, Clone)] +pub struct QueryResult { + pub column_names: Vec, + pub error: Option, + pub rows: Vec>, + pub is_last: bool, +} + +/// One cell in a query result row. +#[derive(Debug, Clone)] +pub enum CellValue { + Null, + Varint(i64), + Float64(f64), + String(String), +} + +/// Builds a length-delimited `TraceProcessorRpc` query request frame ready to +/// write to the trace_processor's stdin. +pub fn build_query_request(seq: u64, sql: &str) -> Vec { + let query_args = write_tagged_str(1, sql); + let rpc = [write_tagged_varint(1, seq), write_tagged_varint(2, TPM_QUERY_STREAMING), write_tagged_bytes(103, &query_args)].concat(); + [&[0x0au8][..], &write_varint(rpc.len() as u64), &rpc].concat() +} + +/// Parses a length-delimited `TraceProcessorRpc` response frame. Returns +/// `Some(QueryResult)` if the frame is a `TPM_QUERY_STREAMING` response, +/// or `None` otherwise. +pub fn parse_response(rpc_bytes: &[u8], fallback_ncols: Option) -> Option { + let (mut pos, mut qr_bytes, mut is_resp) = (0, None, false); + while pos < rpc_bytes.len() { + let (field, value) = read_next_tag(rpc_bytes, &mut pos)?; + match field { + 3 => is_resp = matches!(value, FieldValue::Varint(v) if v == TPM_QUERY_STREAMING), + 203 => { + if let FieldValue::LengthDelimited(data) = value { + qr_bytes = Some(data); + } + } + _ => {} + } + } + is_resp.then_some(()).and_then(|_| decode_query_result(qr_bytes?, fallback_ncols)) +} + +fn decode_query_result(bytes: &[u8], fallback_ncols: Option) -> Option { + let (mut pos, mut column_names, mut error, mut batches, mut is_last) = (0, Vec::new(), None, Vec::new(), false); + while pos < bytes.len() { + let (field, value) = read_next_tag(bytes, &mut pos)?; + match (field, value) { + (1, FieldValue::LengthDelimited(d)) => column_names.push(String::from_utf8_lossy(d).to_string()), + (2, FieldValue::LengthDelimited(d)) => error = Some(String::from_utf8_lossy(d).to_string()), + (3, FieldValue::LengthDelimited(d)) => { + batches.push(d.to_vec()); + } + _ => {} + } + } + let mut all_rows = Vec::new(); + let ncols = if column_names.is_empty() { fallback_ncols.unwrap_or(0) } else { column_names.len() }; + for batch in batches { + let (rows, batch_is_last) = decode_cells_batch(&batch, ncols)?; + all_rows.extend(rows); + is_last |= batch_is_last; + } + Some(QueryResult { column_names, error, rows: all_rows, is_last }) +} + +fn decode_cells_batch(data: &[u8], ncols: usize) -> Option<(Vec>, bool)> { + let (mut pos, mut cell_types, mut varint_cells, mut float64_cells, mut string_cells, mut is_last_batch) = + (0, Vec::new(), Vec::new(), Vec::new(), Vec::new(), false); + while pos < data.len() { + let (field, value) = read_next_tag(data, &mut pos)?; + match (field, value) { + (1, FieldValue::LengthDelimited(d)) => cell_types = read_packed_varints(d).into_iter().map(|v| v as i32).collect(), + (2, FieldValue::LengthDelimited(d)) => varint_cells = read_packed_varints(d).into_iter().map(decode_zigzag).collect(), + (3, FieldValue::LengthDelimited(d)) => { + float64_cells = d.chunks(8).filter_map(|c| <[u8; 8]>::try_from(c).ok()).map(f64::from_le_bytes).collect() + } + (5, FieldValue::LengthDelimited(d)) => string_cells = String::from_utf8_lossy(d).split('\0').map(String::from).collect(), + (6, FieldValue::Varint(v)) => is_last_batch = v != 0, + _ => {} + } + } + if ncols == 0 { + return Some((Vec::new(), is_last_batch)); + } + if cell_types.len() % ncols != 0 { + return None; + } + let nrows = cell_types.len() / ncols; + let (mut rows, mut vi, mut fi, mut si) = (Vec::with_capacity(nrows), 0usize, 0usize, 0usize); + for ri in 0..nrows { + let mut row = Vec::with_capacity(ncols); + for ci in 0..ncols { + row.push(match cell_types[ri * ncols + ci] { + CELL_VARINT => { + vi += 1; + varint_cells.get(vi - 1).map(|&v| CellValue::Varint(v)).unwrap_or(CellValue::Null) + } + CELL_FLOAT64 => { + fi += 1; + float64_cells.get(fi - 1).map(|&v| CellValue::Float64(v)).unwrap_or(CellValue::Null) + } + CELL_STRING => { + si += 1; + string_cells.get(si - 1).map(|s| CellValue::String(s.clone())).unwrap_or(CellValue::Null) + } + _ => CellValue::Null, + }); + } + rows.push(row); + } + Some((rows, is_last_batch)) +} + +fn decode_zigzag(v: u64) -> i64 { + ((v >> 1) as i64) ^ -((v & 1) as i64) +} + +fn read_packed_varints(data: &[u8]) -> Vec { + let (mut p, mut out) = (0, Vec::new()); + while p < data.len() { + if let Some(v) = read_varint(data, &mut p) { + out.push(v); + } else { + break; + } + } + out +} + +// RPC Client + +/// Connected trace_processor child process in `server stdio` mode. +pub struct RpcClient { + child: std::process::Child, + stdin: std::process::ChildStdin, + stdout: std::io::BufReader, + next_seq: u64, +} + +impl RpcClient { + /// Spawns `trace_processor server stdio ` and returns a client + /// for sending SQL queries over stdin/stdout. + pub fn connect(tp_path: &std::path::Path, trace_file: &std::path::Path) -> std::io::Result { + let mut child = std::process::Command::new(tp_path) + .args(["server", "stdio"]) + .arg(trace_file) + .stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::inherit()) + .spawn()?; + let stdin = child.stdin.take().expect("stdin"); + let stdout = std::io::BufReader::new(child.stdout.take().expect("stdout")); + Ok(Self { child, stdin, stdout, next_seq: 0 }) + } + + /// Sends a SQL query and collects all response batches into a single + /// `QueryResult`. Each batch is parsed from the length-delimited framing. + pub fn query(&mut self, sql: &str) -> std::io::Result { + let seq = self.next_seq; + self.next_seq += 1; + self.stdin.write_all(&build_query_request(seq, sql))?; + self.stdin.flush()?; + let (mut result, mut got_columns) = (QueryResult { column_names: vec![], error: None, rows: vec![], is_last: false }, false); + loop { + let frame = self.read_frame()?; + if let Some(qr) = parse_response(&frame, (!result.column_names.is_empty()).then_some(result.column_names.len())) { + if !qr.column_names.is_empty() && !got_columns { + result.column_names = qr.column_names.clone(); + got_columns = true; + } + if let Some(ref err) = qr.error + && !err.is_empty() + { + result.error = Some(err.clone()); + return Ok(result); + } + result.rows.extend(qr.rows); + if !qr.column_names.is_empty() { + result.column_names = qr.column_names.clone(); + } + if qr.is_last { + result.is_last = true; + break; + } + } + } + Ok(result) + } + + fn read_frame(&mut self) -> std::io::Result> { + Self::read_frame_raw(&mut self.stdout) + } + + /// Reads one length-delimited `TraceProcessorRpc` frame from a raw reader. + /// Useful for one-shot subprocess queries that don't need a full `RpcClient`. + pub fn read_frame_raw(r: &mut impl std::io::Read) -> std::io::Result> { + let mut tag = [0u8; 1]; + r.read_exact(&mut tag)?; + if tag[0] != 0x0a { + return Err(std::io::Error::other(format!("bad frame tag: {:#04x}", tag[0]))); + } + let len = { + let (mut value, mut shift) = (0u64, 0u32); + loop { + let mut byte = [0u8; 1]; + r.read_exact(&mut byte)?; + value |= ((byte[0] & 0x7F) as u64) << shift; + if byte[0] & 0x80 == 0 { + break value as usize; + } + shift += 7; + if shift >= 64 { + return Err(std::io::Error::other("varint overflow")); + } + } + }; + let mut body = vec![0u8; len]; + r.read_exact(&mut body)?; + Ok(body) + } + + fn read_stream_varint(&mut self) -> std::io::Result { + use std::io::Read; + let (mut value, mut shift) = (0u64, 0u32); + loop { + let mut byte = [0u8; 1]; + self.stdout.read_exact(&mut byte)?; + value |= ((byte[0] & 0x7F) as u64) << shift; + if byte[0] & 0x80 == 0 { + return Ok(value); + } + shift += 7; + if shift >= 64 { + return Err(std::io::Error::other("varint overflow")); + } + } + } + + /// Kills the trace_processor process and waits for it to exit. + pub fn shutdown(mut self) -> std::io::Result<()> { + drop(self.stdin); + drop(self.stdout); + let _ = self.child.kill(); + self.child.wait().map(|_| ()) + } +} diff --git a/plugins/perfetto-ingest/src/rpc_reader.rs b/plugins/perfetto-ingest/src/rpc_reader.rs new file mode 100644 index 0000000..1abd0a1 --- /dev/null +++ b/plugins/perfetto-ingest/src/rpc_reader.rs @@ -0,0 +1,266 @@ +//! Reads Perfetto data via RPC instead of SQLite export. +//! +//! Simpler first cut: one trace_processor process per query. This avoids the +//! current multi-query persistent-connection issue while still eliminating the +//! SQLite temp-file round trip. + +#![allow(dead_code)] + +use std::io::Write; +use std::path::{Path, PathBuf}; +use std::process::{Command, Stdio}; + +use crate::rpc_client::{CellValue, QueryResult, RpcClient, build_query_request, parse_response}; +use crate::sqlite_reader::{ + PerfettoClockSnapshot, PerfettoFtraceEvent, PerfettoInstant, PerfettoProcess, PerfettoSchedSlice, PerfettoSlice, PerfettoSpuriousWakeup, + PerfettoThread, PerfettoThreadState, +}; + +pub struct RpcReader { + tp_path: PathBuf, + trace_file: PathBuf, +} + +impl RpcReader { + pub fn new(tp_path: &Path, trace_file: &Path) -> Self { + Self { tp_path: tp_path.to_path_buf(), trace_file: trace_file.to_path_buf() } + } + + fn query(&self, sql: &str) -> std::io::Result { + eprintln!("perfetto-rpc: query {}", &sql[..sql.len().min(80)]); + let mut child = Command::new(&self.tp_path) + .args(["server", "stdio"]) + .arg(&self.trace_file) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::inherit()) + .spawn()?; + let mut stdin = child.stdin.take().expect("stdin"); + let mut stdout = child.stdout.take().expect("stdout"); + stdin.write_all(&build_query_request(0, sql))?; + drop(stdin); + let (mut result, mut got_columns) = (QueryResult { column_names: vec![], error: None, rows: vec![], is_last: false }, false); + loop { + eprintln!("perfetto-rpc: request sent, waiting frame"); + let frame = match RpcClient::read_frame_raw(&mut stdout) { + Ok(frame) => frame, + Err(err) if err.kind() == std::io::ErrorKind::UnexpectedEof => break, + Err(err) => return Err(err), + }; + eprintln!("perfetto-rpc: got frame {} bytes", frame.len()); + if let Some(qr) = parse_response(&frame, (!result.column_names.is_empty()).then_some(result.column_names.len())) { + if !qr.column_names.is_empty() && !got_columns { + result.column_names = qr.column_names.clone(); + got_columns = true; + } + if let Some(ref err) = qr.error + && !err.is_empty() + { + result.error = Some(err.clone()); + break; + } + result.rows.extend(qr.rows); + if !qr.column_names.is_empty() { + result.column_names = qr.column_names.clone(); + } + if qr.is_last { + result.is_last = true; + break; + } + } + } + eprintln!("perfetto-rpc: parsed rows={} cols={}", result.rows.len(), result.column_names.len()); + let _ = child.wait(); + Ok(result) + } + + fn col_idx(columns: &[String], name: &str) -> Option { + columns.iter().position(|c| c == name) + } + + pub fn read_slices(&mut self) -> std::io::Result> { + let r = self.query("SELECT id, ts, dur, name, parent_id, track_id, arg_set_id, depth FROM slice ORDER BY ts")?; + let (id, ts, dur, name, pid, tid, aid, depth) = ( + Self::col_idx(&r.column_names, "id"), + Self::col_idx(&r.column_names, "ts"), + Self::col_idx(&r.column_names, "dur"), + Self::col_idx(&r.column_names, "name"), + Self::col_idx(&r.column_names, "parent_id"), + Self::col_idx(&r.column_names, "track_id"), + Self::col_idx(&r.column_names, "arg_set_id"), + Self::col_idx(&r.column_names, "depth"), + ); + Ok(r.rows + .iter() + .map(|row| PerfettoSlice { + id: i64_val(row, id), + ts: i64_val(row, ts), + dur: i64_val(row, dur), + name: str_val(row, name), + parent_id: opt_i64_val(row, pid), + track_id: i64_val(row, tid), + arg_set_id: opt_i64_val(row, aid), + depth: i32_val(row, depth), + }) + .collect()) + } + + pub fn read_sched_slices(&mut self) -> std::io::Result> { + let r = self.query("SELECT id, ts, dur, utid, ucpu, end_state FROM sched_slice ORDER BY ts")?; + let (id, ts, dur, utid, cpu, es) = ( + Self::col_idx(&r.column_names, "id"), + Self::col_idx(&r.column_names, "ts"), + Self::col_idx(&r.column_names, "dur"), + Self::col_idx(&r.column_names, "utid"), + Self::col_idx(&r.column_names, "ucpu"), + Self::col_idx(&r.column_names, "end_state"), + ); + Ok(r.rows + .iter() + .map(|row| PerfettoSchedSlice { + id: i64_val(row, id), + ts: i64_val(row, ts), + dur: i64_val(row, dur), + utid: i64_val(row, utid), + cpu: i64_val(row, cpu), + end_state: str_val(row, es), + }) + .collect()) + } + + pub fn read_thread_states(&mut self) -> std::io::Result> { + let r = self.query("SELECT id, ts, dur, utid, state, io_wait, blocked_function, waker_utid, cpu FROM thread_state ORDER BY ts")?; + let (id, ts, dur, utid, st, iw, bf, wu, cpu) = ( + Self::col_idx(&r.column_names, "id"), + Self::col_idx(&r.column_names, "ts"), + Self::col_idx(&r.column_names, "dur"), + Self::col_idx(&r.column_names, "utid"), + Self::col_idx(&r.column_names, "state"), + Self::col_idx(&r.column_names, "io_wait"), + Self::col_idx(&r.column_names, "blocked_function"), + Self::col_idx(&r.column_names, "waker_utid"), + Self::col_idx(&r.column_names, "cpu"), + ); + Ok(r.rows + .iter() + .map(|row| PerfettoThreadState { + id: i64_val(row, id), + ts: i64_val(row, ts), + dur: i64_val(row, dur), + utid: i64_val(row, utid), + state: str_val(row, st), + io_wait: opt_i64_val(row, iw).map(|v| v != 0), + blocked_function: str_val(row, bf), + waker_utid: opt_i64_val(row, wu), + cpu: opt_i64_val(row, cpu), + }) + .collect()) + } + + pub fn read_ftrace_events(&mut self) -> std::io::Result> { + let r = self.query("SELECT id, ts, name, cpu, utid FROM ftrace_event ORDER BY ts")?; + let (id, ts, name, cpu, utid) = ( + Self::col_idx(&r.column_names, "id"), + Self::col_idx(&r.column_names, "ts"), + Self::col_idx(&r.column_names, "name"), + Self::col_idx(&r.column_names, "cpu"), + Self::col_idx(&r.column_names, "utid"), + ); + Ok(r.rows + .iter() + .map(|row| PerfettoFtraceEvent { + id: i64_val(row, id), + ts: i64_val(row, ts), + name: str_val(row, name), + cpu: opt_i64_val(row, cpu), + utid: opt_i64_val(row, utid), + }) + .collect()) + } + + pub fn read_spurious_wakeups(&mut self) -> std::io::Result> { + let r = self.query("SELECT id, ts, utid, waker_utid FROM spurious_sched_wakeup ORDER BY ts")?; + let (id, ts, utid, wu) = ( + Self::col_idx(&r.column_names, "id"), + Self::col_idx(&r.column_names, "ts"), + Self::col_idx(&r.column_names, "utid"), + Self::col_idx(&r.column_names, "waker_utid"), + ); + Ok(r.rows + .iter() + .map(|row| PerfettoSpuriousWakeup { + id: i64_val(row, id), + ts: i64_val(row, ts), + utid: opt_i64_val(row, utid), + waker_utid: opt_i64_val(row, wu), + }) + .collect()) + } + + pub fn read_instants(&mut self) -> std::io::Result> { + let r = self.query("SELECT ts, track_id, name FROM instant ORDER BY ts")?; + let (ts, tid, name) = + (Self::col_idx(&r.column_names, "ts"), Self::col_idx(&r.column_names, "track_id"), Self::col_idx(&r.column_names, "name")); + Ok(r.rows.iter().map(|row| PerfettoInstant { ts: i64_val(row, ts), track_id: i64_val(row, tid), name: str_val(row, name) }).collect()) + } + + pub fn read_threads(&mut self) -> std::io::Result> { + let r = self.query("SELECT utid, name, tid, upid, is_main_thread FROM thread ORDER BY utid")?; + let (utid, name, tid, upid, imt) = ( + Self::col_idx(&r.column_names, "utid"), + Self::col_idx(&r.column_names, "name"), + Self::col_idx(&r.column_names, "tid"), + Self::col_idx(&r.column_names, "upid"), + Self::col_idx(&r.column_names, "is_main_thread"), + ); + Ok(r.rows + .iter() + .map(|row| PerfettoThread { + utid: i64_val(row, utid), + name: str_val(row, name), + tid: opt_i64_val(row, tid), + upid: opt_i64_val(row, upid), + is_main_thread: opt_i64_val(row, imt).unwrap_or(0) != 0, + }) + .collect()) + } + + pub fn read_processes(&mut self) -> std::io::Result> { + let r = self.query("SELECT upid, name, pid FROM process ORDER BY upid")?; + let (upid, name, pid) = + (Self::col_idx(&r.column_names, "upid"), Self::col_idx(&r.column_names, "name"), Self::col_idx(&r.column_names, "pid")); + Ok(r.rows.iter().map(|row| PerfettoProcess { upid: i64_val(row, upid), name: str_val(row, name), pid: opt_i64_val(row, pid) }).collect()) + } + + pub fn read_clock_snapshots(&mut self) -> std::io::Result> { + let r = self.query("SELECT ts, clock_value FROM clock_snapshot WHERE clock_name = 'REALTIME' ORDER BY ts")?; + let (ts, cv) = (Self::col_idx(&r.column_names, "ts"), Self::col_idx(&r.column_names, "clock_value")); + Ok(r.rows.iter().map(|row| PerfettoClockSnapshot { ts: i64_val(row, ts), clock_value: i64_val(row, cv) }).collect()) + } +} + +fn i64_val(row: &[CellValue], idx: Option) -> i64 { + idx.and_then(|i| row.get(i)).and_then(cell_i64).unwrap_or(0) +} +fn i32_val(row: &[CellValue], idx: Option) -> i32 { + i64_val(row, idx) as i32 +} +fn opt_i64_val(row: &[CellValue], idx: Option) -> Option { + idx.and_then(|i| row.get(i)).and_then(cell_i64) +} +fn str_val(row: &[CellValue], idx: Option) -> Option { + idx.and_then(|i| row.get(i)).and_then(cell_str) +} +fn cell_i64(cv: &CellValue) -> Option { + match cv { + CellValue::Varint(v) => Some(*v), + CellValue::Float64(f) => Some(*f as i64), + _ => None, + } +} +fn cell_str(cv: &CellValue) -> Option { + match cv { + CellValue::String(s) => Some(s.clone()), + _ => None, + } +} diff --git a/plugins/perfetto-ingest/src/sqlite_reader.rs b/plugins/perfetto-ingest/src/sqlite_reader.rs index 10ced8f..07f6b61 100644 --- a/plugins/perfetto-ingest/src/sqlite_reader.rs +++ b/plugins/perfetto-ingest/src/sqlite_reader.rs @@ -218,8 +218,7 @@ pub struct PerfettoDb { impl PerfettoDb { /// Opens an exported Perfetto SQLite database. pub fn open(path: &Path) -> Result { - let conn = rusqlite::Connection::open(path) - .map_err(|err| format!("failed to open exported SQLite DB {}: {err}", path.display()))?; + let conn = rusqlite::Connection::open(path).map_err(|err| format!("failed to open exported SQLite DB {}: {err}", path.display()))?; // Enable WAL mode for better read concurrency. let _ = conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA read_uncommitted=1;"); @@ -268,9 +267,7 @@ impl PerfettoDb { .map_err(|err| format!("failed to prepare flow query: {err}"))?; let rows = stmt - .query_map([], |row| { - Ok(PerfettoFlow { id: row.get(0)?, slice_out: row.get(1)?, slice_in: row.get(2)? }) - }) + .query_map([], |row| Ok(PerfettoFlow { id: row.get(0)?, slice_out: row.get(1)?, slice_in: row.get(2)? })) .map_err(|err| format!("failed to query flows: {err}"))?; let mut out = Vec::new(); @@ -288,9 +285,7 @@ impl PerfettoDb { .map_err(|err| format!("failed to prepare process query: {err}"))?; let rows = stmt - .query_map([], |row| { - Ok(PerfettoProcess { upid: row.get(0)?, name: row.get(1)?, pid: row.get(2)? }) - }) + .query_map([], |row| Ok(PerfettoProcess { upid: row.get(0)?, name: row.get(1)?, pid: row.get(2)? })) .map_err(|err| format!("failed to query processes: {err}"))?; let mut out = Vec::new(); @@ -336,13 +331,7 @@ impl PerfettoDb { let rows = stmt .query_map([], |row| { - Ok(PerfettoTrack { - id: row.get(0)?, - name: row.get(1)?, - track_type: row.get(2)?, - utid: row.get(3)?, - upid: row.get(4)?, - }) + Ok(PerfettoTrack { id: row.get(0)?, name: row.get(1)?, track_type: row.get(2)?, utid: row.get(3)?, upid: row.get(4)? }) }) .map_err(|err| format!("failed to query tracks: {err}"))?; @@ -394,10 +383,7 @@ impl PerfettoDb { let mut stmt = self.conn.prepare(&sql).map_err(|err| format!("failed to prepare args IN query: {err}"))?; - let params: Vec<&dyn rusqlite::types::ToSql> = arg_set_ids - .iter() - .map(|id| id as &dyn rusqlite::types::ToSql) - .collect(); + let params: Vec<&dyn rusqlite::types::ToSql> = arg_set_ids.iter().map(|id| id as &dyn rusqlite::types::ToSql).collect(); let rows = stmt .query_map(params.as_slice(), |row| { @@ -521,9 +507,7 @@ impl PerfettoDb { .map_err(|err| format!("failed to prepare ftrace_event query: {err}"))?; let rows = stmt - .query_map([], |row| { - Ok(PerfettoFtraceEvent { id: row.get(0)?, ts: row.get(1)?, name: row.get(2)?, cpu: row.get(3)?, utid: row.get(4)? }) - }) + .query_map([], |row| Ok(PerfettoFtraceEvent { id: row.get(0)?, ts: row.get(1)?, name: row.get(2)?, cpu: row.get(3)?, utid: row.get(4)? })) .map_err(|err| format!("failed to query ftrace_event: {err}"))?; let mut out = Vec::new(); @@ -545,9 +529,7 @@ impl PerfettoDb { .map_err(|err| format!("failed to prepare spurious_sched_wakeup query: {err}"))?; let rows = stmt - .query_map([], |row| { - Ok(PerfettoSpuriousWakeup { id: row.get(0)?, ts: row.get(1)?, utid: row.get(2)?, waker_utid: row.get(3)? }) - }) + .query_map([], |row| Ok(PerfettoSpuriousWakeup { id: row.get(0)?, ts: row.get(1)?, utid: row.get(2)?, waker_utid: row.get(3)? })) .map_err(|err| format!("failed to query spurious_sched_wakeup: {err}"))?; let mut out = Vec::new(); @@ -559,56 +541,83 @@ impl PerfettoDb { /// Reads instant events ordered by ts. pub fn read_instants(&self) -> Result, String> { - let mut stmt = self.conn.prepare("SELECT ts, track_id, name FROM instant ORDER BY ts") + let mut stmt = self + .conn + .prepare("SELECT ts, track_id, name FROM instant ORDER BY ts") .map_err(|err| format!("failed to prepare instant query: {err}"))?; - let rows = stmt.query_map([], |row| Ok(PerfettoInstant { ts: row.get(0)?, track_id: row.get(1)?, name: row.get(2)? })) + let rows = stmt + .query_map([], |row| Ok(PerfettoInstant { ts: row.get(0)?, track_id: row.get(1)?, name: row.get(2)? })) .map_err(|err| format!("failed to query instant: {err}"))?; let mut out = Vec::new(); - for row in rows { out.push(row.map_err(|err| format!("failed to read instant row: {err}"))?); } + for row in rows { + out.push(row.map_err(|err| format!("failed to read instant row: {err}"))?); + } Ok(out) } /// Reads counter values ordered by ts. pub fn read_counters(&self) -> Result, String> { - let mut stmt = self.conn.prepare("SELECT id, ts, track_id, value FROM counter ORDER BY ts") + let mut stmt = self + .conn + .prepare("SELECT id, ts, track_id, value FROM counter ORDER BY ts") .map_err(|err| format!("failed to prepare counter query: {err}"))?; - let rows = stmt.query_map([], |row| Ok(PerfettoCounter { id: row.get(0)?, ts: row.get(1)?, track_id: row.get(2)?, value: row.get(3)? })) + let rows = stmt + .query_map([], |row| Ok(PerfettoCounter { id: row.get(0)?, ts: row.get(1)?, track_id: row.get(2)?, value: row.get(3)? })) .map_err(|err| format!("failed to query counter: {err}"))?; let mut out = Vec::new(); - for row in rows { out.push(row.map_err(|err| format!("failed to read counter row: {err}"))?); } + for row in rows { + out.push(row.map_err(|err| format!("failed to read counter row: {err}"))?); + } Ok(out) } /// Reads CPU topology. pub fn read_cpus(&self) -> Result, String> { - let mut stmt = self.conn.prepare("SELECT id, cpu, cluster_id, processor FROM cpu ORDER BY id") + let mut stmt = self + .conn + .prepare("SELECT id, cpu, cluster_id, processor FROM cpu ORDER BY id") .map_err(|err| format!("failed to prepare cpu query: {err}"))?; - let rows = stmt.query_map([], |row| Ok(PerfettoCpu { id: row.get(0)?, cpu: row.get(1)?, cluster_id: row.get(2)?, processor: row.get(3)? })) + let rows = stmt + .query_map([], |row| Ok(PerfettoCpu { id: row.get(0)?, cpu: row.get(1)?, cluster_id: row.get(2)?, processor: row.get(3)? })) .map_err(|err| format!("failed to query cpu: {err}"))?; let mut out = Vec::new(); - for row in rows { out.push(row.map_err(|err| format!("failed to read cpu row: {err}"))?); } + for row in rows { + out.push(row.map_err(|err| format!("failed to read cpu row: {err}"))?); + } Ok(out) } /// Reads machine info. pub fn read_machines(&self) -> Result, String> { - let mut stmt = self.conn.prepare("SELECT id, arch, num_cpus, sysname, release FROM machine ORDER BY id") + let mut stmt = self + .conn + .prepare("SELECT id, arch, num_cpus, sysname, release FROM machine ORDER BY id") .map_err(|err| format!("failed to prepare machine query: {err}"))?; - let rows = stmt.query_map([], |row| Ok(PerfettoMachine { id: row.get(0)?, arch: row.get(1)?, num_cpus: row.get(2)?, sysname: row.get(3)?, release: row.get(4)? })) + let rows = stmt + .query_map([], |row| { + Ok(PerfettoMachine { id: row.get(0)?, arch: row.get(1)?, num_cpus: row.get(2)?, sysname: row.get(3)?, release: row.get(4)? }) + }) .map_err(|err| format!("failed to query machine: {err}"))?; let mut out = Vec::new(); - for row in rows { out.push(row.map_err(|err| format!("failed to read machine row: {err}"))?); } + for row in rows { + out.push(row.map_err(|err| format!("failed to read machine row: {err}"))?); + } Ok(out) } /// Reads trace metadata entries. pub fn read_metadata(&self) -> Result, String> { - let mut stmt = self.conn.prepare("SELECT name, int_value, str_value FROM metadata ORDER BY name") + let mut stmt = self + .conn + .prepare("SELECT name, int_value, str_value FROM metadata ORDER BY name") .map_err(|err| format!("failed to prepare metadata query: {err}"))?; - let rows = stmt.query_map([], |row| Ok(PerfettoMetadata { name: row.get(0)?, int_value: row.get(1)?, str_value: row.get(2)? })) + let rows = stmt + .query_map([], |row| Ok(PerfettoMetadata { name: row.get(0)?, int_value: row.get(1)?, str_value: row.get(2)? })) .map_err(|err| format!("failed to query metadata: {err}"))?; let mut out = Vec::new(); - for row in rows { out.push(row.map_err(|err| format!("failed to read metadata row: {err}"))?); } + for row in rows { + out.push(row.map_err(|err| format!("failed to read metadata row: {err}"))?); + } Ok(out) } @@ -616,78 +625,130 @@ impl PerfettoDb { /// Reads memory snapshots. pub fn read_memory_snapshots(&self) -> Result, String> { - let mut stmt = self.conn.prepare("SELECT id, timestamp, track_id, detail_level FROM memory_snapshot ORDER BY id") + let mut stmt = self + .conn + .prepare("SELECT id, timestamp, track_id, detail_level FROM memory_snapshot ORDER BY id") .map_err(|err| format!("failed to prepare memory_snapshot query: {err}"))?; - let rows = stmt.query_map([], |row| Ok(PerfettoMemorySnapshot { id: row.get(0)?, timestamp: row.get(1)?, track_id: row.get(2)?, detail_level: row.get(3)? })) + let rows = stmt + .query_map([], |row| { + Ok(PerfettoMemorySnapshot { id: row.get(0)?, timestamp: row.get(1)?, track_id: row.get(2)?, detail_level: row.get(3)? }) + }) .map_err(|err| format!("failed to query memory_snapshot: {err}"))?; let mut out = Vec::new(); - for row in rows { out.push(row.map_err(|err| format!("failed to read memory_snapshot: {err}"))?); } + for row in rows { + out.push(row.map_err(|err| format!("failed to read memory_snapshot: {err}"))?); + } Ok(out) } /// Reads CPU profile stack samples. pub fn read_cpu_profile_samples(&self) -> Result, String> { - let mut stmt = self.conn.prepare("SELECT id, ts, callsite_id, utid FROM cpu_profile_stack_sample ORDER BY ts") + let mut stmt = self + .conn + .prepare("SELECT id, ts, callsite_id, utid FROM cpu_profile_stack_sample ORDER BY ts") .map_err(|err| format!("failed to prepare cpu_profile_stack_sample query: {err}"))?; - let rows = stmt.query_map([], |row| Ok(PerfettoCpuProfileSample { id: row.get(0)?, ts: row.get(1)?, callsite_id: row.get(2)?, utid: row.get(3)? })) + let rows = stmt + .query_map([], |row| Ok(PerfettoCpuProfileSample { id: row.get(0)?, ts: row.get(1)?, callsite_id: row.get(2)?, utid: row.get(3)? })) .map_err(|err| format!("failed to query cpu_profile_stack_sample: {err}"))?; let mut out = Vec::new(); - for row in rows { out.push(row.map_err(|err| format!("failed to read cpu_profile_stack_sample: {err}"))?); } + for row in rows { + out.push(row.map_err(|err| format!("failed to read cpu_profile_stack_sample: {err}"))?); + } Ok(out) } /// Reads stack profile frames. pub fn read_stack_frames(&self) -> Result, String> { - let mut stmt = self.conn.prepare("SELECT id, name, mapping_id FROM stack_profile_frame ORDER BY id") + let mut stmt = self + .conn + .prepare("SELECT id, name, mapping_id FROM stack_profile_frame ORDER BY id") .map_err(|err| format!("failed to prepare stack_profile_frame query: {err}"))?; - let rows = stmt.query_map([], |row| Ok(PerfettoStackFrame { id: row.get(0)?, name: row.get(1)?, mapping_id: row.get(2)? })) + let rows = stmt + .query_map([], |row| Ok(PerfettoStackFrame { id: row.get(0)?, name: row.get(1)?, mapping_id: row.get(2)? })) .map_err(|err| format!("failed to query stack_profile_frame: {err}"))?; let mut out = Vec::new(); - for row in rows { out.push(row.map_err(|err| format!("failed to read stack_profile_frame: {err}"))?); } + for row in rows { + out.push(row.map_err(|err| format!("failed to read stack_profile_frame: {err}"))?); + } Ok(out) } /// Reads heap profile allocations. pub fn read_heap_allocations(&self) -> Result, String> { - let mut stmt = self.conn.prepare("SELECT id, ts, upid, size, count FROM heap_profile_allocation ORDER BY ts") + let mut stmt = self + .conn + .prepare("SELECT id, ts, upid, size, count FROM heap_profile_allocation ORDER BY ts") .map_err(|err| format!("failed to prepare heap_profile_allocation query: {err}"))?; - let rows = stmt.query_map([], |row| Ok(PerfettoHeapAllocation { id: row.get(0)?, ts: row.get(1)?, upid: row.get(2)?, size: row.get(3)?, count: row.get(4)? })) + let rows = stmt + .query_map([], |row| { + Ok(PerfettoHeapAllocation { id: row.get(0)?, ts: row.get(1)?, upid: row.get(2)?, size: row.get(3)?, count: row.get(4)? }) + }) .map_err(|err| format!("failed to query heap_profile_allocation: {err}"))?; let mut out = Vec::new(); - for row in rows { out.push(row.map_err(|err| format!("failed to read heap_profile_allocation: {err}"))?); } + for row in rows { + out.push(row.map_err(|err| format!("failed to read heap_profile_allocation: {err}"))?); + } Ok(out) } /// Reads protolog entries. pub fn read_protologs(&self) -> Result, String> { - let mut stmt = self.conn.prepare("SELECT id, ts, level, tag, message FROM protolog ORDER BY ts") + let mut stmt = self + .conn + .prepare("SELECT id, ts, level, tag, message FROM protolog ORDER BY ts") .map_err(|err| format!("failed to prepare protolog query: {err}"))?; - let rows = stmt.query_map([], |row| Ok(PerfettoProtolog { id: row.get(0)?, ts: row.get(1)?, level: row.get(2)?, tag: row.get(3)?, message: row.get(4)? })) + let rows = stmt + .query_map([], |row| { + Ok(PerfettoProtolog { id: row.get(0)?, ts: row.get(1)?, level: row.get(2)?, tag: row.get(3)?, message: row.get(4)? }) + }) .map_err(|err| format!("failed to query protolog: {err}"))?; let mut out = Vec::new(); - for row in rows { out.push(row.map_err(|err| format!("failed to read protolog: {err}"))?); } + for row in rows { + out.push(row.map_err(|err| format!("failed to read protolog: {err}"))?); + } Ok(out) } /// Reads Android log entries. pub fn read_android_logs(&self) -> Result, String> { - let mut stmt = self.conn.prepare("SELECT id, ts, utid, prio, tag, msg FROM android_logs ORDER BY ts") + let mut stmt = self + .conn + .prepare("SELECT id, ts, utid, prio, tag, msg FROM android_logs ORDER BY ts") .map_err(|err| format!("failed to prepare android_logs query: {err}"))?; - let rows = stmt.query_map([], |row| Ok(PerfettoAndroidLog { id: row.get(0)?, ts: row.get(1)?, utid: row.get(2)?, prio: row.get(3)?, tag: row.get(4)?, msg: row.get(5)? })) + let rows = stmt + .query_map([], |row| { + Ok(PerfettoAndroidLog { id: row.get(0)?, ts: row.get(1)?, utid: row.get(2)?, prio: row.get(3)?, tag: row.get(4)?, msg: row.get(5)? }) + }) .map_err(|err| format!("failed to query android_logs: {err}"))?; let mut out = Vec::new(); - for row in rows { out.push(row.map_err(|err| format!("failed to read android_logs: {err}"))?); } + for row in rows { + out.push(row.map_err(|err| format!("failed to read android_logs: {err}"))?); + } Ok(out) } /// Reads file descriptor events. pub fn read_filedescriptors(&self) -> Result, String> { - let mut stmt = self.conn.prepare("SELECT id, ufd, fd, ts, upid, path FROM filedescriptor ORDER BY ts") + let mut stmt = self + .conn + .prepare("SELECT id, ufd, fd, ts, upid, path FROM filedescriptor ORDER BY ts") .map_err(|err| format!("failed to prepare filedescriptor query: {err}"))?; - let rows = stmt.query_map([], |row| Ok(PerfettoFileDescriptor { id: row.get(0)?, ufd: row.get(1)?, fd: row.get(2)?, ts: row.get(3)?, upid: row.get(4)?, path: row.get(5)? })) + let rows = stmt + .query_map([], |row| { + Ok(PerfettoFileDescriptor { + id: row.get(0)?, + ufd: row.get(1)?, + fd: row.get(2)?, + ts: row.get(3)?, + upid: row.get(4)?, + path: row.get(5)?, + }) + }) .map_err(|err| format!("failed to query filedescriptor: {err}"))?; let mut out = Vec::new(); - for row in rows { out.push(row.map_err(|err| format!("failed to read filedescriptor: {err}"))?); } + for row in rows { + out.push(row.map_err(|err| format!("failed to read filedescriptor: {err}"))?); + } Ok(out) } } diff --git a/plugins/perfetto-ingest/src/timestamp.rs b/plugins/perfetto-ingest/src/timestamp.rs index 27a7167..b0dadd5 100644 --- a/plugins/perfetto-ingest/src/timestamp.rs +++ b/plugins/perfetto-ingest/src/timestamp.rs @@ -54,10 +54,7 @@ impl TimestampConverter { let first = &self.snapshots[0]; match self.policy { TimestampPolicy::RequireRealtime => { - return Err(format!( - "trace timestamp {trace_ts} is before first REALTIME snapshot at {}", - first.ts - )); + return Err(format!("trace timestamp {trace_ts} is before first REALTIME snapshot at {}", first.ts)); } TimestampPolicy::BestEffort => { let delta = first.ts - trace_ts; @@ -84,8 +81,7 @@ impl TimestampConverter { Ok(Some(prev.clock_value as u64)) } else { let offset = trace_ts - prev.ts; - let realtime = (prev.clock_value as i128) - + (range_realtime as i128 * offset as i128) / range_ts as i128; + let realtime = (prev.clock_value as i128) + (range_realtime as i128 * offset as i128) / range_ts as i128; Ok(Some(realtime.max(0) as u64)) } } diff --git a/plugins/perfetto-ingest/src/trace_mapper.rs b/plugins/perfetto-ingest/src/trace_mapper.rs index 81eee3e..a872532 100644 --- a/plugins/perfetto-ingest/src/trace_mapper.rs +++ b/plugins/perfetto-ingest/src/trace_mapper.rs @@ -14,8 +14,8 @@ use opentelemetry_proto::tonic::common::v1::any_value::Value; use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue}; use opentelemetry_proto::tonic::resource::v1::Resource; use opentelemetry_proto::tonic::trace::v1::span::SpanKind; -use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, ScopeSpans, Span, Status}; use opentelemetry_proto::tonic::trace::v1::status::StatusCode; +use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, ScopeSpans, Span, Status}; use prost::Message; use crate::sqlite_reader::{PerfettoDb, PerfettoSlice}; @@ -40,14 +40,8 @@ impl TraceContext { let _tracks = db.read_tracks()?; let processes = db.read_processes()?; - let thread_names: HashMap = threads - .iter() - .filter_map(|t| t.name.clone().map(|n| (t.utid, n))) - .collect(); - let process_names: HashMap = processes - .iter() - .filter_map(|p| p.name.clone().map(|n| (p.upid, n))) - .collect(); + let thread_names: HashMap = threads.iter().filter_map(|t| t.name.clone().map(|n| (t.utid, n))).collect(); + let process_names: HashMap = processes.iter().filter_map(|p| p.name.clone().map(|n| (p.upid, n))).collect(); let thread_process: HashMap = threads.iter().filter_map(|t| t.upid.map(|u| (t.utid, u))).collect(); Ok(Self { thread_names, process_names, thread_process }) @@ -56,9 +50,7 @@ impl TraceContext { /// Maps all slices in the DB to OTel spans and streams them through `emit`. pub fn map_traces( - db: &PerfettoDb, - converter: &TimestampConverter, - emit: unsafe fn(ctx: &crate::PerfettoPlugin, record_type: u32, ts_unix_ns: u64, payload: &[u8]), + db: &PerfettoDb, converter: &TimestampConverter, emit: unsafe fn(ctx: &crate::PerfettoPlugin, record_type: u32, ts_unix_ns: u64, payload: &[u8]), plugin: &crate::PerfettoPlugin, ) -> Result<(), String> { let ctx = TraceContext::build(db)?; @@ -96,12 +88,7 @@ pub fn map_traces( Ok(()) } -fn build_span( - slice: &PerfettoSlice, - trace_id: &[u8; 16], - _ctx: &TraceContext, - converter: &TimestampConverter, -) -> Result { +fn build_span(slice: &PerfettoSlice, trace_id: &[u8; 16], _ctx: &TraceContext, converter: &TimestampConverter) -> Result { let start_time = converter.to_realtime(slice.ts)?.unwrap_or(0); let end_time = converter.to_realtime(slice.ts.saturating_add(slice.dur))?.unwrap_or_else(|| start_time.saturating_add(slice.dur.max(0) as u64)); @@ -145,11 +132,8 @@ fn build_span( } fn flush_batch( - batch: &mut Vec, - batch_min_ts: &mut Option, - _trace_id: &[u8; 16], - emit: unsafe fn(ctx: &crate::PerfettoPlugin, record_type: u32, ts_unix_ns: u64, payload: &[u8]), - plugin: &crate::PerfettoPlugin, + batch: &mut Vec, batch_min_ts: &mut Option, _trace_id: &[u8; 16], + emit: unsafe fn(ctx: &crate::PerfettoPlugin, record_type: u32, ts_unix_ns: u64, payload: &[u8]), plugin: &crate::PerfettoPlugin, ) -> Result<(), String> { if batch.is_empty() { return Ok(()); @@ -159,13 +143,8 @@ fn flush_batch( let ts = batch_min_ts.unwrap_or(0); *batch_min_ts = None; - let resource = Resource { - attributes: vec![ - key_value("service.name", any_string("perfetto")), - ], - dropped_attributes_count: 0, - entity_refs: Vec::new(), - }; + let resource = + Resource { attributes: vec![key_value("service.name", any_string("perfetto"))], dropped_attributes_count: 0, entity_refs: Vec::new() }; let scope_spans = ScopeSpans { scope: Some(opentelemetry_proto::tonic::common::v1::InstrumentationScope { @@ -179,11 +158,7 @@ fn flush_batch( }; let request = ExportTraceServiceRequest { - resource_spans: vec![ResourceSpans { - resource: Some(resource), - scope_spans: vec![scope_spans], - schema_url: String::new(), - }], + resource_spans: vec![ResourceSpans { resource: Some(resource), scope_spans: vec![scope_spans], schema_url: String::new() }], }; let payload = request.encode_to_vec(); @@ -196,10 +171,7 @@ fn flush_batch( fn make_trace_id() -> [u8; 16] { let mut id = [0u8; 16]; - let ts = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_nanos() as u64; + let ts = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_nanos() as u64; id[..8].copy_from_slice(&ts.to_le_bytes()); id[8..16].copy_from_slice(&(std::process::id() as u64).to_le_bytes()); id diff --git a/plugins/perfetto-ingest/tests/fixtures/clock_snapshot.bin b/plugins/perfetto-ingest/tests/fixtures/clock_snapshot.bin new file mode 100644 index 0000000..a546917 Binary files /dev/null and b/plugins/perfetto-ingest/tests/fixtures/clock_snapshot.bin differ diff --git a/plugins/perfetto-ingest/tests/fixtures/ftrace_event.bin b/plugins/perfetto-ingest/tests/fixtures/ftrace_event.bin new file mode 100644 index 0000000..158557b Binary files /dev/null and b/plugins/perfetto-ingest/tests/fixtures/ftrace_event.bin differ diff --git a/plugins/perfetto-ingest/tests/fixtures/process.bin b/plugins/perfetto-ingest/tests/fixtures/process.bin new file mode 100644 index 0000000..f886beb Binary files /dev/null and b/plugins/perfetto-ingest/tests/fixtures/process.bin differ diff --git a/plugins/perfetto-ingest/tests/fixtures/sched_slice.bin b/plugins/perfetto-ingest/tests/fixtures/sched_slice.bin new file mode 100644 index 0000000..eb42650 Binary files /dev/null and b/plugins/perfetto-ingest/tests/fixtures/sched_slice.bin differ diff --git a/plugins/perfetto-ingest/tests/fixtures/spurious_wakeup.bin b/plugins/perfetto-ingest/tests/fixtures/spurious_wakeup.bin new file mode 100644 index 0000000..bb4b9a4 Binary files /dev/null and b/plugins/perfetto-ingest/tests/fixtures/spurious_wakeup.bin differ diff --git a/plugins/perfetto-ingest/tests/fixtures/thread.bin b/plugins/perfetto-ingest/tests/fixtures/thread.bin new file mode 100644 index 0000000..838b7ea Binary files /dev/null and b/plugins/perfetto-ingest/tests/fixtures/thread.bin differ diff --git a/plugins/perfetto-ingest/tests/fixtures/thread_state.bin b/plugins/perfetto-ingest/tests/fixtures/thread_state.bin new file mode 100644 index 0000000..0aad9b0 Binary files /dev/null and b/plugins/perfetto-ingest/tests/fixtures/thread_state.bin differ diff --git a/plugins/perfetto-ingest/tests/unit/tests.rs b/plugins/perfetto-ingest/tests/unit/tests.rs index f8bdfe1..0ab4f29 100644 --- a/plugins/perfetto-ingest/tests/unit/tests.rs +++ b/plugins/perfetto-ingest/tests/unit/tests.rs @@ -200,13 +200,13 @@ fn realistic_db() -> super::sqlite_reader::PerfettoDb { #[test] fn log_mapper_produces_monotonic_timestamps_with_realistic_data() { - let db = realistic_db(); + let mut db = realistic_db(); let snaps = vec![crate::sqlite_reader::PerfettoClockSnapshot { ts: 0, clock_value: 1_700_000_000_000_000_000 }]; let converter = timestamp::TimestampConverter::new(snaps, timestamp::TimestampPolicy::BestEffort); // Collect emits into a buffer, sort, then verify monotonic. EMIT_BUF.with(|buf| buf.borrow_mut().clear()); - log_mapper::map_logs(&db, &converter, super::buffer_emit, &dummy_plugin(dummy_emit)).unwrap(); + log_mapper::map_logs(&mut db, &converter, super::buffer_emit, &dummy_plugin(dummy_emit)).unwrap(); let mut all: Vec<(u32, u64, Vec)> = Vec::new(); EMIT_BUF.with(|buf| all = std::mem::take(&mut *buf.borrow_mut())); @@ -222,7 +222,7 @@ fn log_mapper_produces_monotonic_timestamps_with_realistic_data() { #[test] fn full_pipeline_sorts_across_mappers_monotonically() { - let db = realistic_db(); + let mut db = realistic_db(); let snaps = vec![crate::sqlite_reader::PerfettoClockSnapshot { ts: 0, clock_value: 1_700_000_000_000_000_000 }]; let converter = timestamp::TimestampConverter::new(snaps, timestamp::TimestampPolicy::BestEffort); @@ -231,7 +231,7 @@ fn full_pipeline_sorts_across_mappers_monotonically() { // Run mappers in varying order — traces first, then logs (opposite of monotonic order). // The buffer should sort everything before emitting. let _ = trace_mapper::map_traces(&db, &converter, super::buffer_emit, &dummy_plugin(dummy_emit)); - let _ = log_mapper::map_logs(&db, &converter, super::buffer_emit, &dummy_plugin(dummy_emit)); + let _ = log_mapper::map_logs(&mut db, &converter, super::buffer_emit, &dummy_plugin(dummy_emit)); let mut all: Vec<(u32, u64, Vec)> = Vec::new(); EMIT_BUF.with(|buf| all = std::mem::take(&mut *buf.borrow_mut())); @@ -413,10 +413,7 @@ fn timestamp_has_realtime() { #[test] fn trace_mapper_produces_spans_from_slices() { let db = test_db(); - let snaps = vec![crate::sqlite_reader::PerfettoClockSnapshot { - ts: 0, - clock_value: 1_700_000_000_000_000_000, - }]; + let snaps = vec![crate::sqlite_reader::PerfettoClockSnapshot { ts: 0, clock_value: 1_700_000_000_000_000_000 }]; let converter = timestamp::TimestampConverter::new(snaps, timestamp::TimestampPolicy::BestEffort); let emitted = run_trace_mapper(&db, &converter); @@ -485,13 +482,8 @@ fn metric_mapper_flattens_nested_metrics() { let emitted = run_metric_mapper(&metrics); let payload = &emitted[0].2; - let req = opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest::decode(payload.as_slice()) - .unwrap(); - let names: Vec<&str> = req.resource_metrics[0].scope_metrics[0] - .metrics - .iter() - .map(|m| m.name.as_str()) - .collect(); + let req = opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest::decode(payload.as_slice()).unwrap(); + let names: Vec<&str> = req.resource_metrics[0].scope_metrics[0].metrics.iter().map(|m| m.name.as_str()).collect(); assert!(names.contains(&"parent")); assert!(names.contains(&"parent.child")); } @@ -500,8 +492,8 @@ fn metric_mapper_flattens_nested_metrics() { #[test] fn log_mapper_produces_per_slice_and_summary_logs() { - let db = test_db(); - let emitted = run_log_mapper(&db); + let mut db = test_db(); + let emitted = run_log_mapper(&mut db); // 3 slices + 1 summary = 4 log records assert!(emitted.len() >= 2, "expected per-slice logs + summary"); assert!(emitted.iter().all(|(rt, _, _)| *rt == crate::LJ_INGEST_RECORD_TYPE_LOGS)); @@ -528,17 +520,11 @@ fn run_pipeline_integration_with_sqlite() { // test helpers for mapper tests -fn run_trace_mapper( - db: &super::sqlite_reader::PerfettoDb, - converter: ×tamp::TimestampConverter, -) -> Vec { +fn run_trace_mapper(db: &super::sqlite_reader::PerfettoDb, converter: ×tamp::TimestampConverter) -> Vec { run_trace_mapper_result(db, converter).unwrap() } -fn run_trace_mapper_result( - db: &super::sqlite_reader::PerfettoDb, - converter: ×tamp::TimestampConverter, -) -> Result, String> { +fn run_trace_mapper_result(db: &super::sqlite_reader::PerfettoDb, converter: ×tamp::TimestampConverter) -> Result, String> { let plugin = dummy_plugin(dummy_emit); trace_mapper::map_traces(db, converter, super::emit_generic, &plugin)?; Ok(take_records(&plugin)) @@ -551,7 +537,7 @@ fn run_metric_mapper(metrics: &[crate::metrics_reader::PerfettoMetric]) -> Vec Vec { +fn run_log_mapper(db: &mut super::sqlite_reader::PerfettoDb) -> Vec { let plugin = dummy_plugin(dummy_emit); let snaps = vec![crate::sqlite_reader::PerfettoClockSnapshot { ts: 0, clock_value: 1_700_000_000_000_000_000 }]; let converter = timestamp::TimestampConverter::new(snaps, timestamp::TimestampPolicy::BestEffort); @@ -599,8 +585,8 @@ fn sqlite_reader_reads_instants() { #[test] fn log_mapper_produces_thread_state_records() { - let db = test_db(); - let emitted = run_log_mapper(&db); + let mut db = test_db(); + let emitted = run_log_mapper(&mut db); // Should include 2 thread_state records + slices + sched + ftrace + spurious + instant + summary assert!(emitted.iter().any(|(rt, _, _)| *rt == crate::LJ_INGEST_RECORD_TYPE_LOGS)); // Verify thread_state attributes in payload @@ -623,8 +609,8 @@ fn log_mapper_produces_thread_state_records() { #[test] fn log_mapper_produces_ftrace_event_records() { - let db = test_db(); - let emitted = run_log_mapper(&db); + let mut db = test_db(); + let emitted = run_log_mapper(&mut db); let has_ftrace = emitted.iter().any(|(_, _, payload)| { if let Ok(req) = prost::Message::decode(payload.as_slice()) { let req: opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest = req; @@ -644,18 +630,14 @@ fn log_mapper_produces_ftrace_event_records() { #[test] fn log_mapper_produces_spurious_wakeup_records() { - let db = test_db(); - let emitted = run_log_mapper(&db); + let mut db = test_db(); + let emitted = run_log_mapper(&mut db); let has_sw = emitted.iter().any(|(_, _, payload)| { if let Ok(req) = prost::Message::decode(payload.as_slice()) { let req: opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest = req; - req.resource_logs.iter().any(|rl| { - rl.scope_logs.iter().any(|sl| { - sl.log_records.iter().any(|lr| { - lr.attributes.iter().any(|kv| kv.key == "perfetto.sw.id") - }) - }) - }) + req.resource_logs + .iter() + .any(|rl| rl.scope_logs.iter().any(|sl| sl.log_records.iter().any(|lr| lr.attributes.iter().any(|kv| kv.key == "perfetto.sw.id")))) } else { false } @@ -665,8 +647,8 @@ fn log_mapper_produces_spurious_wakeup_records() { #[test] fn log_mapper_produces_instant_records() { - let db = test_db(); - let emitted = run_log_mapper(&db); + let mut db = test_db(); + let emitted = run_log_mapper(&mut db); let has_instant = emitted.iter().any(|(_, _, payload)| { if let Ok(req) = prost::Message::decode(payload.as_slice()) { let req: opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest = req; @@ -778,11 +760,11 @@ fn sqlite_reader_reads_filedescriptors() { fn run_core_pipeline(sqlite_path: &std::path::Path) -> Vec { let plugin = dummy_plugin(dummy_emit); - let db = super::sqlite_reader::PerfettoDb::open(sqlite_path).unwrap(); + let mut db = super::sqlite_reader::PerfettoDb::open(sqlite_path).unwrap(); let snaps = db.read_clock_snapshots().unwrap(); let converter = timestamp::TimestampConverter::new(snaps, timestamp::TimestampPolicy::BestEffort); let _ = trace_mapper::map_traces(&db, &converter, super::emit_generic, &plugin); - let _ = log_mapper::map_logs(&db, &converter, super::emit_generic, &plugin); + let _ = log_mapper::map_logs(&mut db, &converter, super::emit_generic, &plugin); take_records(&plugin) } @@ -842,3 +824,56 @@ fn temp_sqlite_file() -> std::path::PathBuf { ).unwrap(); path } + +fn strip_rpc_frame(data: &[u8]) -> Vec { + if data.first() != Some(&0x0a) { + return data.to_vec(); + } + let mut p = 1usize; + let _len = { + let (mut v, mut s) = (0u64, 0u32); + loop { + let b = data[p]; + p += 1; + v |= ((b & 0x7F) as u64) << s; + if b & 0x80 == 0 { + break v; + } + s += 7; + } + }; + data[p..].to_vec() +} + +fn parse_bytes(raw: &[u8]) -> crate::rpc_client::QueryResult { + let rpc = strip_rpc_frame(raw); + crate::rpc_client::parse_response(&rpc, None).expect("parse_response returned None") +} + +macro_rules! fixture_test { + ($name:ident, $file:literal, $cols:expr, $rows:expr, $col_count:expr) => { + #[test] + fn $name() { + let qr = parse_bytes(include_bytes!(concat!("../../tests/fixtures/", $file))); + assert_eq!(qr.column_names.as_slice(), $cols); + assert_eq!(qr.rows.len(), $rows); + if !qr.rows.is_empty() { + assert_eq!(qr.rows[0].len(), $col_count); + } + } + }; +} + +fixture_test!(rpc_fixture_clock_snapshot, "clock_snapshot.bin", &["ts", "clock_value"], 5, 2); +fixture_test!(rpc_fixture_sched_slice, "sched_slice.bin", &["id", "ts", "dur", "utid", "ucpu", "end_state"], 5, 6); +fixture_test!( + rpc_fixture_thread_state, + "thread_state.bin", + &["id", "ts", "dur", "utid", "state", "io_wait", "blocked_function", "waker_utid", "cpu"], + 5, + 9 +); +fixture_test!(rpc_fixture_ftrace_event, "ftrace_event.bin", &["id", "ts", "name", "cpu", "utid"], 5, 5); +fixture_test!(rpc_fixture_spurious_wakeup, "spurious_wakeup.bin", &["id", "ts", "utid", "waker_utid"], 3, 4); +fixture_test!(rpc_fixture_process, "process.bin", &["upid", "name", "pid"], 1, 3); +fixture_test!(rpc_fixture_thread, "thread.bin", &["utid", "name", "tid", "upid", "is_main_thread"], 5, 5);