diff --git a/Cargo.lock b/Cargo.lock index aaee4cd..dbf5aac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -695,6 +695,18 @@ dependencies = [ "num-traits", ] +[[package]] +name = "fallible-iterator" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" + +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + [[package]] name = "fancy-regex" version = "0.11.0" @@ -928,6 +940,15 @@ version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4f467dd6dccf739c208452f8014c75c18bb8301b050ad1cfb27153803edb0f51" +[[package]] +name = "hashlink" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1" +dependencies = [ + "hashbrown 0.15.5", +] + [[package]] name = "heck" version = "0.5.0" @@ -1218,6 +1239,17 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6d2cec3eae94f9f509c767b45932f1ada8350c4bdb85af2fcab4a3c14807981" +[[package]] +name = "libsqlite3-sys" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbb8270bb4060bd76c6e96f20c52d80620f1d82a3470885694e41e0f81ef6fe7" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "line-clipping" version = "0.3.7" @@ -1249,6 +1281,16 @@ checksum = "11d3d7f243d5c5a8b9bb5d6dd2b1602c0cb0b9db1621bafc7ed66e35ff9fe092" name = "lj-logcat-ingest" version = "0.8.0" +[[package]] +name = "lj-perfetto-ingest" +version = "0.8.0" +dependencies = [ + "opentelemetry-proto", + "prost", + "rusqlite", + "serde_json", +] + [[package]] name = "lj-stress-ingest" version = "0.8.0" @@ -2100,6 +2142,20 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rusqlite" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37e34486da88d8e051c7c0e23c3f15fd806ea8546260aa2fec247e97242ec143" +dependencies = [ + "bitflags 2.11.1", + "fallible-iterator", + "fallible-streaming-iterator", + "hashlink", + "libsqlite3-sys", + "smallvec", +] + [[package]] name = "rustc_version" version = "0.4.1" @@ -2832,6 +2888,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.5" diff --git a/Cargo.toml b/Cargo.toml index df945ea..c3440fa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ members = [ "plugins/syslog-ingest", "plugins/logcat-ingest", "plugins/stress-ingest", + "plugins/perfetto-ingest", "plugins/parquet-exporter", ] @@ -30,6 +31,7 @@ default-members = [ "plugins/syslog-ingest", "plugins/logcat-ingest", "plugins/stress-ingest", + "plugins/perfetto-ingest", "plugins/parquet-exporter", ] diff --git a/Makefile b/Makefile index 69d0c1b..67166cb 100644 --- a/Makefile +++ b/Makefile @@ -27,8 +27,8 @@ test: setup cargo nextest run $(CORE_WORKSPACE) test-unit: setup - cargo build -p ljx-parquet-exporter - cargo nextest run -p logjet --lib -p ljd --bins -p ljx --bin ljx + cargo build -p ljx-parquet-exporter -p lj-perfetto-ingest + cargo nextest run -p logjet --lib -p ljd --bins -p ljx --bin ljx -p lj-perfetto-ingest test-integration: setup cargo build -p ljd -p ljx -p ljx-parquet-exporter diff --git a/demo/perfetto/README.md b/demo/perfetto/README.md new file mode 100644 index 0000000..0f60ede --- /dev/null +++ b/demo/perfetto/README.md @@ -0,0 +1,25 @@ +# Perfetto demos + +## Building Perfetto + +```bash +# From workspace root — downloads deps and builds all needed tools. +./scripts/build-perfetto.sh +``` + +Or pass a custom source path: +```bash +./scripts/build-perfetto.sh /path/to/perfetto +``` + +The script installs missing system packages (git, python3, curl, tar), downloads +hermetic GN/Ninja/clang toolchain, and builds `trace_processor_shell`, `traced`, +`traced_probes`, and `tracebox` into `perfetto/out/linux_release/`. + +Add to PATH or set `LJD_PERFETTO_TRACE_PROCESSOR` to point at +`trace_processor_shell`. + +## Demos + +- [linux-data-record](linux-data-record/) — capture and inspect a system trace +- [perfetto-to-logjet](perfetto-to-logjet/) — full end-to-end: record ftrace, import via plugin, view in ljx diff --git a/demo/perfetto/linux-data-record/README.md b/demo/perfetto/linux-data-record/README.md new file mode 100644 index 0000000..5667729 --- /dev/null +++ b/demo/perfetto/linux-data-record/README.md @@ -0,0 +1,53 @@ +# linux-data-record + +Record a short ftrace-based Perfetto trace on a Linux desktop, then open it in +the trace processor for interactive inspection. + +## Prerequisites + +Build the Perfetto tools (from workspace root): + +```bash +./scripts/build-perfetto.sh +``` + +The script finds tools via `PERFETTO_OUT` (default: `perfetto/out/linux_release`). +Set `PERFETTO_TRACE_OUT` to override the output trace path. + +## Permissions + +ftrace requires root. The script runs `tracebox` with `sudo`. If passwordless +sudo is not configured, run the script with `sudo`: + +```bash +sudo ./run-demo.sh +``` + +Or set up passwordless ftrace: `sudo chown -R $USER /sys/kernel/tracing`. + +## Run + +```bash +./run-demo.sh +``` + +## What happens + +1. `traced` and `traced_probes` are started in the background. +2. `tracebox` captures 5 seconds of ftrace (scheduler, syscalls) via sudo into a `.pftrace` file. +3. Tools are stopped. +4. `trace_processor_shell` opens the trace in interactive SQL mode. +5. Type `.q` to quit. + +## Example queries (in trace processor) + +```sql +-- List process names +SELECT DISTINCT name FROM process WHERE name IS NOT NULL; + +-- Show thread scheduling slices +SELECT ts, dur, utid, cpu, end_state FROM sched ORDER BY ts LIMIT 20; + +-- Count kernel functions seen in ftrace +SELECT name, count(*) FROM ftrace_event GROUP BY name ORDER BY count(*) DESC LIMIT 10; +``` diff --git a/demo/perfetto/linux-data-record/run-demo.sh b/demo/perfetto/linux-data-record/run-demo.sh new file mode 100755 index 0000000..b545933 --- /dev/null +++ b/demo/perfetto/linux-data-record/run-demo.sh @@ -0,0 +1,84 @@ +#!/bin/sh +set -eu + +SCRIPT_DIR=$(CDPATH= cd -- "$(dirname -- "$0")" && pwd) +ROOT_DIR="$SCRIPT_DIR/../../.." +PERFETTO_OUT="${PERFETTO_OUT:-$ROOT_DIR/perfetto/out/linux_release}" +PERFETTO_TRACE_OUT="${PERFETTO_TRACE_OUT:-$SCRIPT_DIR/trace.pftrace}" + +TRACED="$PERFETTO_OUT/traced" +TRACED_PROBES="$PERFETTO_OUT/traced_probes" +TRACEBOX="$PERFETTO_OUT/tracebox" +TP="$PERFETTO_OUT/trace_processor_shell" + +for bin in "$TRACED" "$TRACED_PROBES" "$TRACEBOX" "$TP"; do + if [ ! -x "$bin" ]; then + echo "missing $bin" + echo "build first from workspace root:" + echo " cd perfetto && gn gen out/linux --args='is_debug=false'" + echo " ninja -C out/linux trace_processor_shell traced traced_probes tracebox" + exit 1 + fi +done + +echo "Starting traced..." +"$TRACED" &>/dev/null & +TRACED_PID=$! + +echo "Starting traced_probes..." +"$TRACED_PROBES" &>/dev/null & +PROBES_PID=$! + +cleanup() { + kill "$TRACED_PID" 2>/dev/null || true + kill "$PROBES_PID" 2>/dev/null || true + wait "$TRACED_PID" 2>/dev/null || true + wait "$PROBES_PID" 2>/dev/null || true + echo "Services stopped." +} + +trap cleanup EXIT INT TERM + +sleep 1 + +echo "Recording 5s of ftrace to $PERFETTO_TRACE_OUT..." +CONFIG_FILE="$SCRIPT_DIR/trace-config.txt" +cat > "$CONFIG_FILE" <<'ENDCONFIG' +buffers: { + size_kb: 4096 + fill_policy: RING_BUFFER +} +data_sources: { + config { + name: "linux.ftrace" + ftrace_config { + ftrace_events: "sched/sched_switch" + ftrace_events: "sched/sched_waking" + } + } +} +duration_ms: 5000 +ENDCONFIG + +if [ "$(id -u)" -eq 0 ]; then + "$TRACEBOX" --txt -c "$CONFIG_FILE" -o "$PERFETTO_TRACE_OUT" +else + sudo "$TRACEBOX" --txt -c "$CONFIG_FILE" -o "$PERFETTO_TRACE_OUT" + sudo chown "$(id -u):$(id -g)" "$PERFETTO_TRACE_OUT" +fi + +rm -f "$CONFIG_FILE" + +cleanup + +if [ ! -f "$PERFETTO_TRACE_OUT" ]; then + echo "Trace file not created." + exit 1 +fi + +SIZE=$(du -h "$PERFETTO_TRACE_OUT" | cut -f1) +echo "Trace recorded: $PERFETTO_TRACE_OUT ($SIZE)" +echo "" +echo "Opening in trace processor (type .q to quit)..." +echo "" +"$TP" "$PERFETTO_TRACE_OUT" diff --git a/demo/perfetto/linux-data-record/trace.pftrace b/demo/perfetto/linux-data-record/trace.pftrace new file mode 100644 index 0000000..4439f9b Binary files /dev/null and b/demo/perfetto/linux-data-record/trace.pftrace differ diff --git a/demo/perfetto/perfetto-to-logjet/README.md b/demo/perfetto/perfetto-to-logjet/README.md new file mode 100644 index 0000000..0132270 --- /dev/null +++ b/demo/perfetto/perfetto-to-logjet/README.md @@ -0,0 +1,47 @@ +# Perfetto-to-logjet Demo + +End-to-end pipeline: record a Linux ftrace, import it via the perfetto-ingest +plugin into a `.logjet` spool, and view the result in `ljx view`. + +## Build First + +```bash +# From workspace root +make dev +./scripts/build-perfetto.sh +``` + +## Run + +```bash +cd demo/perfetto/perfetto-to-logjet +./run-demo.sh +``` + +Requires sudo for ftrace access. + +## What Happens + +1. `traced` + `traced_probes` start in the background. +2. `tracebox` records 5s of scheduler events (CPU switches) via ftrace. +3. `ljd` loads the perfetto-ingest plugin, which spawns `trace_processor`, + exports the trace as SQLite, maps `sched_slice` rows to OTel log records + with CPU/state/duration, and streams them into a `.logjet` spool. +4. `ljx view` opens the spool — each CPU scheduling event appears as one line. + +## What You Should See + +- Thousands of log lines, each showing a CPU scheduling event: + ``` + May 7 10:43:15 I cpu=7 dur=7.2us state=R utid=19 ts=... + May 7 10:43:15 I cpu=7 dur=2.0us state=R utid=21 ts=... + ``` +- Press `Enter` to see full OTel attributes (perfetto.sched.id, cpu, end_state). +- Press `F` for field filter, `/` to search, `q` to quit. + +## Troubleshooting + +- **0 records**: The trace needs ftrace events — they require root. The script + uses `sudo tracebox`. If passwordless sudo isn't configured, run `sudo ./run-demo.sh`. +- **Fewer records than expected in ljx view**: Delete stale index cache: + `rm -rf ~/.cache/ljx && ./run-demo.sh` diff --git a/demo/perfetto/perfetto-to-logjet/run-demo.sh b/demo/perfetto/perfetto-to-logjet/run-demo.sh new file mode 100755 index 0000000..1566246 --- /dev/null +++ b/demo/perfetto/perfetto-to-logjet/run-demo.sh @@ -0,0 +1,146 @@ +#!/bin/sh +set -eu + +SCRIPT_DIR=$(CDPATH= cd -- "$(dirname -- "$0")" && pwd) +ROOT_DIR="$SCRIPT_DIR/../../.." +BUILD_DIR="$ROOT_DIR/target/debug" +PERFETTO_OUT="${PERFETTO_OUT:-$ROOT_DIR/perfetto/out/linux_release}" +SPOOL_DIR="$SCRIPT_DIR/spool" +TRACE_FILE="$SCRIPT_DIR/trace.pftrace" + +LJD="$BUILD_DIR/ljd" +LJX="$BUILD_DIR/ljx" +PLUGIN="$BUILD_DIR/liblj_perfetto_ingest.so" +TRACED="$PERFETTO_OUT/traced" +TRACED_PROBES="$PERFETTO_OUT/traced_probes" +TRACEBOX="$PERFETTO_OUT/tracebox" +TP="$PERFETTO_OUT/trace_processor_shell" + +for bin in "$LJD" "$LJX" "$PLUGIN"; do + if [ ! -e "$bin" ]; then + echo "missing $bin" + echo "build first with: make dev" + exit 1 + fi +done + +for bin in "$TRACED" "$TRACED_PROBES" "$TRACEBOX" "$TP"; do + if [ ! -x "$bin" ]; then + echo "missing $bin" + echo "build perfetto first with: ./scripts/build-perfetto.sh" + exit 1 + fi +done + +# ── Record a trace ──────────────────────────────────────────────────────────── + +echo "Starting traced..." +"$TRACED" &>/dev/null & +TRACED_PID=$! + +echo "Starting traced_probes..." +"$TRACED_PROBES" &>/dev/null & +PROBES_PID=$! + +cleanup_trace() { + kill "$TRACED_PID" 2>/dev/null || true + kill "$PROBES_PID" 2>/dev/null || true + wait "$TRACED_PID" 2>/dev/null || true + wait "$PROBES_PID" 2>/dev/null || true +} + +trap cleanup_trace EXIT INT TERM + +sleep 1 + +echo "Recording 5s of ftrace to $TRACE_FILE..." +CONFIG_FILE="$SCRIPT_DIR/trace-config.txt" +cat > "$CONFIG_FILE" <<'ENDCONFIG' +buffers: { + size_kb: 4096 + fill_policy: RING_BUFFER +} +data_sources: { + config { + name: "linux.ftrace" + ftrace_config { + ftrace_events: "sched/sched_switch" + ftrace_events: "sched/sched_waking" + ftrace_events: "power/cpu_frequency" + } + } +} +duration_ms: 5000 +ENDCONFIG + +if [ "$(id -u)" -eq 0 ]; then + "$TRACEBOX" --txt -c "$CONFIG_FILE" -o "$TRACE_FILE" +else + sudo "$TRACEBOX" --txt -c "$CONFIG_FILE" -o "$TRACE_FILE" + sudo chown "$(id -u):$(id -g)" "$TRACE_FILE" +fi + +rm -f "$CONFIG_FILE" + +cleanup_trace +trap - EXIT INT TERM + +if [ ! -f "$TRACE_FILE" ]; then + echo "Trace file not created." + exit 1 +fi + +SIZE=$(du -h "$TRACE_FILE" | cut -f1) +echo "Trace recorded: $TRACE_FILE ($SIZE)" +echo "" + +# ── Import via perfetto-ingest plugin ───────────────────────────────────────── + +echo "Importing into .logjet..." +rm -rf "$SPOOL_DIR" +mkdir -p "$SPOOL_DIR" + +# Use a temp config so we don't interfere with the user's config. +CONFIG_FILE="$SCRIPT_DIR/ljd-perfetto.conf" +cat > "$CONFIG_FILE" </dev/null || true + wait "$LJD_PID" 2>/dev/null || true +} + +trap cleanup_ljd EXIT INT TERM + +# Give the plugin time to finish processing (SQLite export + mapping takes a few seconds). +sleep 10 +kill "$LJD_PID" 2>/dev/null || true +wait "$LJD_PID" 2>/dev/null || true +trap - EXIT INT TERM + +rm -f "$CONFIG_FILE" + +if [ ! -f "$SPOOL_DIR/perfetto.logjet" ]; then + echo "No .logjet file produced." + exit 1 +fi + +RECORDS=$("$LJX" count "$SPOOL_DIR/perfetto.logjet" | tail -1) +echo "Imported $RECORDS records into $SPOOL_DIR/perfetto.logjet" +echo "" + +# ── View the result ─────────────────────────────────────────────────────────── + +echo "Opening ljx view..." +"$LJX" view "$SPOOL_DIR/perfetto.logjet" diff --git a/doc/perfetto-ingest.md b/doc/perfetto-ingest.md new file mode 100644 index 0000000..9121f88 --- /dev/null +++ b/doc/perfetto-ingest.md @@ -0,0 +1,88 @@ +# Perfetto Ingest Plugin (`lj-perfetto-ingest`) + +Imports Perfetto trace files (`.pftrace` / `.perfetto-trace`) into the logjet +ecosystem as OTel traces, metrics, logs, and events. + +## Architecture + +``` + .pftrace ──→ trace_processor (spawned as subprocess) + ├── export sqlite ──→ sqlite_reader ──→ trace_mapper ──→ OTel spans + └── --run-metrics ──→ metrics_reader ──→ metric_mapper ──→ OTel metrics + log_mapper ──→ OTel logs + │ + ▼ + ljd spool (.logjet) +``` + +The plugin is an **active source** (`mode: 1`). ljd calls `lj_ingest_fetch()` once, +which runs the full pipeline and streams OTel payloads through the generic record +callback. + +## Requirements + +- Perfetto trace processor binary (`trace_processor` or `trace_processor_shell`). + Build it from the bundled Perfetto source: + ```bash + ./scripts/build-perfetto.sh + ``` +- A `.pftrace` trace file to import. + +## Usage + +```bash +# Build the plugin and ljd: +make build + +# Run the import: +LJD_PERFETTO_TRACE_FILE=/path/to/trace.pftrace \ +LJD_PERFETTO_TRACE_PROCESSOR=/path/to/trace_processor_shell \ + ljd serve \ + --ingest-protocol plugin \ + --ingest-plugin perfetto \ + --storage ./otel-spool +``` + +## Environment Variables + +| Variable | Required | Default | Description | +|----------|----------|---------|-------------| +| `LJD_PERFETTO_TRACE_FILE` | **Yes** | — | Path to the `.pftrace` input file. | +| `LJD_PERFETTO_TRACE_PROCESSOR` | No | PATH search | Path to `trace_processor_shell` binary. | +| `LJD_PERFETTO_TIMESTAMP_POLICY` | No | `best-effort` | `best-effort` or `require-realtime`. | +| `LJD_PERFETTO_METRICS` | No | (none) | Comma-separated metric names to run, e.g. `trace_stats,android_startup`. | + +## Output Signals + +| Perfetto Source | OTel Signal | Record Type | +|-----------------|-------------|-------------| +| `slice` table | Traces (Spans) | `Traces` | +| Metrics JSON | Metrics (Gauges) | `Metrics` | +| Analysis summary | Logs | `Logs` | +| (reserved) | Events | `Events` | + +Spans are batched in groups of 200 per OTLP export request. + +## Timestamp Policy + +Perfetto timestamps are trace-clock values (typically `CLOCK_MONOTONIC`). The plugin +converts them to Unix epoch nanoseconds using `clock_snapshot` REALTIME entries. + +- **best-effort** (default): Spans without realtime data are skipped. Spans before + the first snapshot are extrapolated backwards. +- **require-realtime**: The pipeline fails if any span cannot be converted. + +## Limitations + +- **No flow-to-link mapping**: `flow` table entries are read but not yet mapped + to OTel span links. +- **No args-to-attributes mapping**: Per-slice key-value arguments are read but + not attached to spans. +- **Thread/process context**: Thread and process names are loaded but not fully + joined to spans via track relationships. +- **Replay/bridge**: Traces and metrics stored in `.logjet` can be exported via + `ljx export` but are not yet forwarded by `ljd bridge/replay` (which currently + only forwards logs). +- **Metrics**: Only scalar metric values are supported (no histograms). +- **Event signal**: The `Events` record type is reserved but not yet generated + by this plugin. diff --git a/liblogjet/include/liblogjet.h b/liblogjet/include/liblogjet.h index 28610c9..10d11d8 100644 --- a/liblogjet/include/liblogjet.h +++ b/liblogjet/include/liblogjet.h @@ -163,6 +163,7 @@ void lj_ingest_set_callback(lj_ingest_plugin *ctx, lj_record_callback cb, void * void lj_ingest_set_generic_callback(lj_ingest_plugin *ctx, lj_generic_record_callback cb, void *user); int lj_ingest_feed(lj_ingest_plugin *ctx, const uint8_t *data, size_t len); int lj_ingest_fetch(lj_ingest_plugin *ctx); +const char *lj_ingest_last_error(lj_ingest_plugin *ctx); void lj_ingest_free(lj_ingest_plugin *ctx); #ifdef __cplusplus diff --git a/ljx/src/dataset_index.rs b/ljx/src/dataset_index.rs index bb0922c..2cacd15 100644 --- a/ljx/src/dataset_index.rs +++ b/ljx/src/dataset_index.rs @@ -265,12 +265,20 @@ fn build(path: &Path, size: u64, modified_ns: Option) -> Result { + summary.merge(&block); + offset = block.offset + block.len; + blocks.push(block); + } + None => { + // No sync marker at expected offset — resync by scanning forward. + offset = match find_next_sync_marker(&mut file, offset, size)? { + Some(next) => next, + None => break, + }; + } + } } if blocks.is_empty() { @@ -279,6 +287,40 @@ fn build(path: &Path, size: u64, modified_ns: Option) -> Result Result> { + let mut buf = [0u8; 4096]; + let mut matched = 0usize; + let mut scan_pos = start; + + file.seek(SeekFrom::Start(start))?; + + while scan_pos < size { + let space = buf.len().min((size - scan_pos) as usize); + if space == 0 { + break; + } + let n = file.read(&mut buf[..space])?; + if n == 0 { + break; + } + for (idx, &byte) in buf[..n].iter().enumerate() { + if byte == logjet::DEFAULT_SYNC_MARKER[matched] { + matched += 1; + if matched == logjet::DEFAULT_SYNC_MARKER.len() { + return Ok(Some(scan_pos + idx as u64 + 1 - logjet::DEFAULT_SYNC_MARKER.len() as u64)); + } + } else { + matched = usize::from(byte == logjet::DEFAULT_SYNC_MARKER[0]); + } + } + scan_pos += n as u64; + } + Ok(None) +} + fn persist(path: &Path, sidecar_path: &Path, index: &DatasetIndex) -> Result<()> { let disk = DiskIndex { version: INDEX_VERSION, diff --git a/logjetd/src/plugin.rs b/logjetd/src/plugin.rs index 444a9d8..453bb34 100644 --- a/logjetd/src/plugin.rs +++ b/logjetd/src/plugin.rs @@ -449,7 +449,7 @@ fn push_unique_path(roots: &mut Vec, path: PathBuf) { } } -// ── C ABI types mirroring liblogjet.h ─────────────────────────────────────── +// C ABI types mirroring liblogjet.h // Legacy log-only signal mask (used when reserved[0] == 0 for old plugins). #[allow(dead_code)] @@ -516,11 +516,12 @@ type SetCallbackFn = unsafe extern "C" fn(*mut LjIngestPlugin, RecordCallback, * type FeedFn = unsafe extern "C" fn(*mut LjIngestPlugin, *const u8, usize) -> c_int; type FetchFn = unsafe extern "C" fn(*mut LjIngestPlugin) -> c_int; type FreeFn = unsafe extern "C" fn(*mut LjIngestPlugin); +type LastErrorFn = unsafe extern "C" fn(*mut LjIngestPlugin) -> *const c_char; type RecordCallback = unsafe extern "C" fn(*mut c_void, *const LjLogRecord); type GenericRecordCallback = unsafe extern "C" fn(*mut c_void, *const LjIngestRecordV1); type SetGenericCallbackFn = unsafe extern "C" fn(*mut LjIngestPlugin, GenericRecordCallback, *mut c_void); -// ── Plugin handle ─────────────────────────────────────────────────────────── +// Plugin handle /// Resolved symbols from a loaded ingest plugin. struct PluginHandle { @@ -534,6 +535,8 @@ struct PluginHandle { /// Multi-signal plugins export `lj_ingest_set_generic_callback`. If /// present, ljd calls it instead of `lj_ingest_set_callback`. set_generic_callback: Option, + /// Optional error message retrieval (`lj_ingest_last_error`). + last_error: Option, free: FreeFn, } @@ -554,6 +557,8 @@ impl PluginHandle { let fetch: Option = lib.get::(b"lj_ingest_fetch\0").ok().map(|sym| *sym); let set_generic_callback: Option = lib.get::(b"lj_ingest_set_generic_callback\0").ok().map(|sym| *sym); + let last_error: Option = + lib.get::(b"lj_ingest_last_error\0").ok().map(|sym| *sym); let free: libloading::Symbol = lib.get(b"lj_ingest_free\0").map_err(|err| io::Error::other(format!("symbol lj_ingest_free: {err}")))?; @@ -563,6 +568,7 @@ impl PluginHandle { feed: *feed, fetch, set_generic_callback, + last_error, free: *free, _lib: lib, }) @@ -575,7 +581,7 @@ impl PluginHandle { } } -// ── Callback plumbing ─────────────────────────────────────────────────────── +// Callback plumbing /// Passed through the `void *user` pointer in the C callback. struct CallbackCtx { @@ -777,7 +783,7 @@ pub(crate) fn build_otlp_payload(rec: OtlpRecord<'_>) -> Vec { request.encode_to_vec() } -// ── Public entry point ────────────────────────────────────────────────────── +// Public entry point /// Runs the plugin ingest loop: loads the .so, then either calls /// `lj_ingest_fetch` (active plugin) or binds TCP and feeds bytes (passive). @@ -870,6 +876,15 @@ fn run_active_plugin(handle: &PluginHandle, spool: Arc *const LjIngestDescriptorV1 { &LOGCAT_INGEST_DESCRIPTOR.0 } -// ── Severity constants ────────────────────────────────────────────────────── +// Severity constants const LJ_SEVERITY_TRACE: i32 = 1; const LJ_SEVERITY_DEBUG: i32 = 5; @@ -81,14 +81,14 @@ const LJ_SEVERITY_ERROR: i32 = 17; const LJ_SEVERITY_FATAL: i32 = 21; const LJ_ATTR_STRING: i32 = 0; -// ── Plugin context ────────────────────────────────────────────────────────── +// Plugin context pub struct LogcatPlugin { callback: Option, user: *mut c_void, } -// ── Exported C ABI ────────────────────────────────────────────────────────── +// Exported C ABI /// Creates a new logcat parsing context. #[unsafe(no_mangle)] @@ -158,7 +158,7 @@ pub unsafe extern "C" fn lj_ingest_free(ctx: *mut LogcatPlugin) { let _ = unsafe { Box::from_raw(ctx) }; } -// ── Logcat parsing ────────────────────────────────────────────────────────── +// Logcat parsing struct Parsed<'a> { severity: i32, @@ -263,7 +263,7 @@ fn map_logcat_level(ch: u8) -> (i32, &'static str) { } } -// ── Record emission ───────────────────────────────────────────────────────── +// Record emission fn emit_record(ctx: &LogcatPlugin, line: &str) { let Some(cb) = ctx.callback else { return }; diff --git a/plugins/perfetto-ingest/Cargo.toml b/plugins/perfetto-ingest/Cargo.toml new file mode 100644 index 0000000..318d1a7 --- /dev/null +++ b/plugins/perfetto-ingest/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "lj-perfetto-ingest" +version.workspace = true +edition.workspace = true +license.workspace = true + +[lib] +crate-type = ["cdylib"] + +[dependencies] +opentelemetry-proto = { version = "0.31", features = ["gen-tonic", "trace", "metrics", "logs"] } +prost = "0.14" +rusqlite = { version = "0.34", features = ["bundled"] } +serde_json = "1" diff --git a/plugins/perfetto-ingest/src/lib.rs b/plugins/perfetto-ingest/src/lib.rs new file mode 100644 index 0000000..685f6e4 --- /dev/null +++ b/plugins/perfetto-ingest/src/lib.rs @@ -0,0 +1,324 @@ +//! Perfetto trace ingest plugin for ljd. +//! +//! Active-source plugin: exports `lj_ingest_fetch` which reads a `.pftrace` +//! file, invokes Perfetto `trace_processor` for analysis, maps results into +//! OTel traces/metrics/logs/events, and streams pre-encoded OTLP payloads +//! through the generic record callback. + +mod log_mapper; +mod metric_mapper; +mod metrics_reader; +mod perfetto_invoke; +mod sqlite_reader; +mod timestamp; +mod trace_mapper; + +use std::ffi::{c_char, c_int, c_void}; + +// C ABI types (must match liblogjet.h exactly) + +#[repr(C)] +pub struct LjAttribute { + key: *const c_char, + value: *const c_char, + value_type: i32, +} + +#[repr(C)] +pub struct LjLogRecord { + timestamp_unix_ns: u64, + severity_number: i32, + severity_text: *const c_char, + body: *const c_char, + attributes: *const LjAttribute, + attributes_len: usize, + event_name: *const c_char, + service_name: *const c_char, + scope_name: *const c_char, + resource_attrs: *const LjAttribute, + resource_attrs_len: usize, + scope_attrs: *const LjAttribute, + scope_attrs_len: usize, +} + +#[repr(C)] +pub struct LjIngestRecordV1 { + struct_size: u32, + record_type: u32, + timestamp_unix_ns: u64, + payload: *const u8, + payload_len: usize, + flags: u32, + reserved: [u64; 4], +} + +pub type RecordCallback = unsafe extern "C" fn(*mut c_void, *const LjLogRecord); +pub type GenericRecordCallback = unsafe extern "C" fn(*mut c_void, *const LjIngestRecordV1); + +#[repr(C)] +pub struct LjIngestDescriptorV1 { + struct_size: u32, + abi_major: u32, + abi_minor: u32, + name: *const c_char, + display_name: *const c_char, + mode: u32, + reserved: [u64; 8], +} + +// Signal constants + +const LJ_INGEST_SIGNAL_LOGS: u32 = 1 << 0; +const LJ_INGEST_SIGNAL_METRICS: u32 = 1 << 1; +const LJ_INGEST_SIGNAL_TRACES: u32 = 1 << 2; +const LJ_INGEST_SIGNAL_EVENTS: u32 = 1 << 3; + +#[allow(dead_code)] +const LJ_INGEST_RECORD_TYPE_LOGS: u32 = 1; +#[allow(dead_code)] +const LJ_INGEST_RECORD_TYPE_METRICS: u32 = 2; +#[allow(dead_code)] +const LJ_INGEST_RECORD_TYPE_TRACES: u32 = 3; +#[allow(dead_code)] +const LJ_INGEST_RECORD_TYPE_EVENTS: u32 = 4; + +// Descriptor + +struct IngestDescriptor(LjIngestDescriptorV1); + +unsafe impl Sync for IngestDescriptor {} + +static PERFETTO_INGEST_DESCRIPTOR: IngestDescriptor = IngestDescriptor(LjIngestDescriptorV1 { + struct_size: std::mem::size_of::() as u32, + abi_major: 1, + abi_minor: 1, + name: c"perfetto".as_ptr(), + display_name: c"Perfetto trace importer".as_ptr(), + mode: 1, + reserved: { + let mut r = [0u64; 8]; + r[0] = (LJ_INGEST_SIGNAL_LOGS | LJ_INGEST_SIGNAL_METRICS | LJ_INGEST_SIGNAL_TRACES | LJ_INGEST_SIGNAL_EVENTS) as u64; + r + }, +}); + +#[unsafe(no_mangle)] +pub extern "C" fn lj_ingest_descriptor_v1() -> *const LjIngestDescriptorV1 { + &PERFETTO_INGEST_DESCRIPTOR.0 +} + +// Plugin context + +pub struct PerfettoPlugin { + pub(crate) legacy_callback: Option, + pub(crate) legacy_user: *mut c_void, + pub(crate) generic_callback: Option, + pub(crate) generic_user: *mut c_void, + last_error: Option, +} + +// Exported C ABI + +#[unsafe(no_mangle)] +pub extern "C" fn lj_ingest_create() -> *mut PerfettoPlugin { + Box::into_raw(Box::new(PerfettoPlugin { + legacy_callback: None, + legacy_user: std::ptr::null_mut(), + generic_callback: None, + generic_user: std::ptr::null_mut(), + last_error: None, + })) +} + +/// # Safety +/// +/// `ctx` must be a valid pointer from `lj_ingest_create` or null. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn lj_ingest_set_callback(ctx: *mut PerfettoPlugin, cb: RecordCallback, user: *mut c_void) { + if ctx.is_null() { + return; + } + let ctx = unsafe { &mut *ctx }; + ctx.legacy_callback = Some(cb); + ctx.legacy_user = user; +} + +/// # Safety +/// +/// `ctx` must be a valid pointer from `lj_ingest_create` or null. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn lj_ingest_set_generic_callback(ctx: *mut PerfettoPlugin, cb: GenericRecordCallback, user: *mut c_void) { + if ctx.is_null() { + return; + } + let ctx = unsafe { &mut *ctx }; + ctx.generic_callback = Some(cb); + ctx.generic_user = user; +} + +/// Passive feed — not used by this plugin but required by the ABI. +/// +/// # Safety +/// +/// Pointers must be valid. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn lj_ingest_feed(_ctx: *mut PerfettoPlugin, _data: *const u8, _len: usize) -> c_int { + 0 +} + +/// Returns the last error message, or NULL if none. +/// +/// # Safety +/// +/// `ctx` must be a valid pointer from `lj_ingest_create`. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn lj_ingest_last_error(ctx: *mut PerfettoPlugin) -> *const c_char { + if ctx.is_null() { + return std::ptr::null(); + } + let ctx = unsafe { &*ctx }; + match &ctx.last_error { + Some(msg) => msg.as_ptr().cast::(), + None => std::ptr::null(), + } +} + +/// Active source: reads a `.pftrace` file, invokes trace_processor, maps +/// results to OTel, and streams records through the generic callback. +/// +/// # Safety +/// +/// `ctx` must be a valid pointer from `lj_ingest_create` with a callback set. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn lj_ingest_fetch(ctx: *mut PerfettoPlugin) -> c_int { + if ctx.is_null() { + eprintln!("perfetto-ingest: lj_ingest_fetch called with null context"); + return -1; + } + let ctx = unsafe { &mut *ctx }; + + let trace_file = match std::env::var("LJD_PERFETTO_TRACE_FILE") { + Ok(path) => std::path::PathBuf::from(path), + Err(_) => { + ctx.last_error = Some("LJD_PERFETTO_TRACE_FILE is not set".to_string()); + return -2; + } + }; + + if !trace_file.is_file() { + ctx.last_error = Some(format!("trace file not found: {}", trace_file.display())); + return -3; + } + + match run_pipeline(ctx, &trace_file) { + Ok(()) => 0, + Err(err) => { + ctx.last_error = Some(err.to_string()); + eprintln!("perfetto-ingest: {err}"); + -4 + } + } +} + +fn run_pipeline(plugin: &mut PerfettoPlugin, trace_file: &std::path::Path) -> Result<(), String> { + let tp_path = perfetto_invoke::find_trace_processor() + .map_err(|err| format!("trace_processor not found: {err}"))?; + + eprintln!("perfetto-ingest: using trace_processor {}", tp_path.display()); + eprintln!("perfetto-ingest: exporting SQLite from {}", trace_file.display()); + + let sqlite_path = perfetto_invoke::export_sqlite(trace_file, &tp_path) + .map_err(|err| format!("SQLite export failed: {err}"))?; + + let db = sqlite_reader::PerfettoDb::open(&sqlite_path) + .map_err(|err| format!("failed to open exported DB: {err}"))?; + + let snaps = db.read_clock_snapshots() + .map_err(|err| format!("failed to read clock snapshots: {err}"))?; + + let policy = match std::env::var("LJD_PERFETTO_TIMESTAMP_POLICY").as_deref() { + Ok("require-realtime") => timestamp::TimestampPolicy::RequireRealtime, + _ => timestamp::TimestampPolicy::BestEffort, + }; + + let converter = timestamp::TimestampConverter::new(snaps, policy); + + if converter.has_realtime() { + eprintln!("perfetto-ingest: realtime clock available"); + } else { + eprintln!("perfetto-ingest: no realtime clock snapshots — timestamps will be unavailable"); + } + + eprintln!("perfetto-ingest: mapping traces..."); + trace_mapper::map_traces(&db, &converter, emit_generic, plugin)?; + + // Optional: run metrics export and map metrics. + let metrics_names: Vec = std::env::var("LJD_PERFETTO_METRICS") + .ok() + .map(|s| s.split(',').map(|s| s.trim().to_string()).collect::>()) + .unwrap_or_default(); + let metrics_refs: Vec<&str> = metrics_names.iter().map(|s| s.as_str()).collect(); + + if !metrics_refs.is_empty() + && let Ok(Some(metrics_path)) = perfetto_invoke::export_metrics(trace_file, &tp_path, &metrics_refs) { + eprintln!("perfetto-ingest: mapping metrics..."); + let metrics = metrics_reader::parse_metrics_json(&metrics_path) + .map_err(|err| format!("failed to parse metrics JSON: {err}"))?; + metric_mapper::map_metrics(&metrics, &converter, emit_generic, plugin)?; + let _ = std::fs::remove_file(&metrics_path); + } + + eprintln!("perfetto-ingest: mapping logs..."); + log_mapper::map_logs(&db, &converter, emit_generic, plugin)?; + + let _ = std::fs::remove_file(&sqlite_path); + + eprintln!("perfetto-ingest: done"); + Ok(()) +} + +/// Destroys the plugin context. Accepts NULL. +/// +/// # Safety +/// +/// `ctx` must be null or a valid pointer that has not been freed. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn lj_ingest_free(ctx: *mut PerfettoPlugin) { + if ctx.is_null() { + return; + } + let _ = unsafe { Box::from_raw(ctx) }; +} + +// Record emission helpers + +/// Calls the generic callback with a pre-encoded OTLP payload. +/// +/// # Safety +/// +/// `ctx` must have a generic callback set. +#[allow(dead_code)] +pub(crate) unsafe fn emit_generic( + ctx: &PerfettoPlugin, + record_type: u32, + ts_unix_ns: u64, + payload: &[u8], +) { + let Some(cb) = ctx.generic_callback else { + return; + }; + let record = LjIngestRecordV1 { + struct_size: std::mem::size_of::() as u32, + record_type, + timestamp_unix_ns: ts_unix_ns, + payload: payload.as_ptr(), + payload_len: payload.len(), + flags: 0, + reserved: [0; 4], + }; + unsafe { cb(ctx.generic_user, &record) }; +} + +#[cfg(test)] +#[path = "../tests/unit/tests.rs"] +mod tests; diff --git a/plugins/perfetto-ingest/src/log_mapper.rs b/plugins/perfetto-ingest/src/log_mapper.rs new file mode 100644 index 0000000..ea78d6c --- /dev/null +++ b/plugins/perfetto-ingest/src/log_mapper.rs @@ -0,0 +1,254 @@ +//! Maps Perfetto stats/errors to OTel log records. + +use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; +use opentelemetry_proto::tonic::common::v1::any_value::Value; +use opentelemetry_proto::tonic::common::v1::{AnyValue, InstrumentationScope, KeyValue}; +use opentelemetry_proto::tonic::logs::v1::{LogRecord, ResourceLogs, ScopeLogs}; +use opentelemetry_proto::tonic::resource::v1::Resource; +use prost::Message; + +use crate::sqlite_reader::{PerfettoDb, PerfettoSchedSlice, PerfettoSlice}; +use crate::timestamp::TimestampConverter; + +const SEVERITY_INFO: i32 = 9; +const SLICES_PER_LOG_BATCH: usize = 1; + +pub fn map_logs( + db: &PerfettoDb, + converter: &TimestampConverter, + emit: unsafe fn(ctx: &crate::PerfettoPlugin, record_type: u32, ts_unix_ns: u64, payload: &[u8]), + plugin: &crate::PerfettoPlugin, +) -> Result<(), String> { + // Emit a per-slice log for each slice (readable in ljx view). + let slices = db.read_slices()?; + let mut batch: Vec = Vec::with_capacity(SLICES_PER_LOG_BATCH); + let mut batch_min_ts: u64 = 0; + + for slice in &slices { + let ts = converter.to_realtime(slice.ts).ok().flatten().unwrap_or(0); + + if batch.is_empty() || ts < batch_min_ts { + batch_min_ts = ts; + } + + batch.push(slice_to_log(slice, converter)); + + if batch.len() >= SLICES_PER_LOG_BATCH { + flush_log_batch(&mut batch, &mut batch_min_ts, emit, plugin); + } + } + flush_log_batch(&mut batch, &mut batch_min_ts, emit, plugin); + + // Emit sched_slice entries as log records. + let sched_slices = db.read_sched_slices()?; + for s in &sched_slices { + let ts = converter.to_realtime(s.ts).ok().flatten().unwrap_or(0); + if batch.is_empty() || ts < batch_min_ts { + batch_min_ts = ts; + } + batch.push(sched_slice_to_log(s, converter)); + if batch.len() >= SLICES_PER_LOG_BATCH { + flush_log_batch(&mut batch, &mut batch_min_ts, emit, plugin); + } + } + flush_log_batch(&mut batch, &mut batch_min_ts, emit, plugin); + + // Emit a summary log. + let threads = db.read_threads()?; + let processes = db.read_processes()?; + emit_summary(slices.len(), threads.len(), processes.len(), emit, plugin); + + Ok(()) +} + +fn slice_to_log(slice: &PerfettoSlice, converter: &TimestampConverter) -> LogRecord { + let ts = converter.to_realtime(slice.ts).ok().flatten().unwrap_or(0); + let dur_us = slice.dur as f64 / 1000.0; + let name = slice.name.as_deref().unwrap_or("(unnamed)"); + + let body = format!("{name} dur={dur_us:.1}us depth={}", slice.depth); + + LogRecord { + time_unix_nano: ts, + observed_time_unix_nano: ts, + severity_number: SEVERITY_INFO, + severity_text: "INFO".to_string(), + body: Some(AnyValue { value: Some(Value::StringValue(body)) }), + attributes: vec![ + KeyValue { + key: "perfetto.slice.id".to_string(), + value: Some(AnyValue { value: Some(Value::IntValue(slice.id)) }), + }, + KeyValue { + key: "perfetto.slice.name".to_string(), + value: Some(AnyValue { value: Some(Value::StringValue(name.to_string())) }), + }, + KeyValue { + key: "perfetto.slice.dur_ns".to_string(), + value: Some(AnyValue { value: Some(Value::IntValue(slice.dur)) }), + }, + KeyValue { + key: "perfetto.slice.depth".to_string(), + value: Some(AnyValue { value: Some(Value::IntValue(slice.depth as i64)) }), + }, + ], + dropped_attributes_count: 0, + flags: 0, + trace_id: Vec::new(), + span_id: Vec::new(), + event_name: String::new(), + } +} + +fn sched_slice_to_log(s: &PerfettoSchedSlice, converter: &TimestampConverter) -> LogRecord { + let ts = converter.to_realtime(s.ts).ok().flatten().unwrap_or(0); + let end = s.end_state.as_deref().unwrap_or("?"); + let dur_ns = s.dur as u64; + let dur_us = dur_ns as f64 / 1000.0; + let body = format!("cpu={} state={end} utid={} dur={dur_us:.1}us", s.cpu, s.utid); + + LogRecord { + time_unix_nano: ts, + observed_time_unix_nano: ts, + severity_number: SEVERITY_INFO, + severity_text: "INFO".to_string(), + body: Some(AnyValue { value: Some(Value::StringValue(body)) }), + attributes: vec![ + KeyValue { + key: "perfetto.sched.id".to_string(), + value: Some(AnyValue { value: Some(Value::IntValue(s.id)) }), + }, + KeyValue { + key: "perfetto.sched.cpu".to_string(), + value: Some(AnyValue { value: Some(Value::IntValue(s.cpu)) }), + }, + KeyValue { + key: "perfetto.sched.dur_ns".to_string(), + value: Some(AnyValue { value: Some(Value::IntValue(s.dur)) }), + }, + KeyValue { + key: "perfetto.sched.end_state".to_string(), + value: Some(AnyValue { value: Some(Value::StringValue(end.to_string())) }), + }, + ], + dropped_attributes_count: 0, + flags: 0, + trace_id: Vec::new(), + span_id: Vec::new(), + event_name: String::new(), + } +} + +fn flush_log_batch( + batch: &mut Vec, + batch_min_ts: &mut u64, + emit: unsafe fn(ctx: &crate::PerfettoPlugin, record_type: u32, ts_unix_ns: u64, payload: &[u8]), + plugin: &crate::PerfettoPlugin, +) { + if batch.is_empty() { + return; + } + + let records = std::mem::replace(batch, Vec::with_capacity(SLICES_PER_LOG_BATCH)); + let ts = *batch_min_ts; + *batch_min_ts = 0; + + let request = ExportLogsServiceRequest { + resource_logs: vec![ResourceLogs { + resource: Some(Resource { + attributes: vec![KeyValue { + key: "service.name".to_string(), + value: Some(AnyValue { value: Some(Value::StringValue("perfetto".to_string())) }), + }], + dropped_attributes_count: 0, + entity_refs: Vec::new(), + }), + scope_logs: vec![ScopeLogs { + scope: Some(InstrumentationScope { + name: "perfetto-ingest".to_string(), + version: String::new(), + attributes: Vec::new(), + dropped_attributes_count: 0, + }), + log_records: records, + schema_url: String::new(), + }], + schema_url: String::new(), + }], + }; + + let payload = request.encode_to_vec(); + unsafe { emit(plugin, crate::LJ_INGEST_RECORD_TYPE_LOGS, ts, &payload) }; +} + +fn emit_summary( + count_slices: usize, + count_threads: usize, + count_processes: usize, + emit: unsafe fn(ctx: &crate::PerfettoPlugin, record_type: u32, ts_unix_ns: u64, payload: &[u8]), + plugin: &crate::PerfettoPlugin, +) { + let now_ns = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() as u64; + + let body = format!( + "Perfetto trace analysis complete: {} slices, {} threads, {} processes", + count_slices, count_threads, count_processes + ); + + let record = LogRecord { + time_unix_nano: now_ns, + observed_time_unix_nano: now_ns, + severity_number: SEVERITY_INFO, + severity_text: "INFO".to_string(), + body: Some(AnyValue { value: Some(Value::StringValue(body)) }), + attributes: vec![ + KeyValue { + key: "perfetto.slices".to_string(), + value: Some(AnyValue { value: Some(Value::IntValue(count_slices as i64)) }), + }, + KeyValue { + key: "perfetto.threads".to_string(), + value: Some(AnyValue { value: Some(Value::IntValue(count_threads as i64)) }), + }, + KeyValue { + key: "perfetto.processes".to_string(), + value: Some(AnyValue { value: Some(Value::IntValue(count_processes as i64)) }), + }, + ], + dropped_attributes_count: 0, + flags: 0, + trace_id: Vec::new(), + span_id: Vec::new(), + event_name: String::new(), + }; + + let request = ExportLogsServiceRequest { + resource_logs: vec![ResourceLogs { + resource: Some(Resource { + attributes: vec![KeyValue { + key: "service.name".to_string(), + value: Some(AnyValue { value: Some(Value::StringValue("perfetto".to_string())) }), + }], + dropped_attributes_count: 0, + entity_refs: Vec::new(), + }), + scope_logs: vec![ScopeLogs { + scope: Some(InstrumentationScope { + name: "perfetto-ingest".to_string(), + version: String::new(), + attributes: Vec::new(), + dropped_attributes_count: 0, + }), + log_records: vec![record], + schema_url: String::new(), + }], + schema_url: String::new(), + }], + }; + + let payload = request.encode_to_vec(); + unsafe { emit(plugin, crate::LJ_INGEST_RECORD_TYPE_LOGS, now_ns, &payload) }; +} diff --git a/plugins/perfetto-ingest/src/metric_mapper.rs b/plugins/perfetto-ingest/src/metric_mapper.rs new file mode 100644 index 0000000..207e635 --- /dev/null +++ b/plugins/perfetto-ingest/src/metric_mapper.rs @@ -0,0 +1,114 @@ +//! Maps Perfetto metrics to OTel metrics. + +use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; +use opentelemetry_proto::tonic::common::v1::any_value::Value; +use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue}; +use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value as DataPointValue; +use opentelemetry_proto::tonic::metrics::v1::{ + Metric, NumberDataPoint, ResourceMetrics, ScopeMetrics, +}; +use opentelemetry_proto::tonic::resource::v1::Resource; +use prost::Message; + +use crate::metrics_reader::PerfettoMetric; +use crate::timestamp::TimestampConverter; + +pub fn map_metrics( + metrics: &[PerfettoMetric], + _converter: &TimestampConverter, + emit: unsafe fn(ctx: &crate::PerfettoPlugin, record_type: u32, ts_unix_ns: u64, payload: &[u8]), + plugin: &crate::PerfettoPlugin, +) -> Result<(), String> { + let now_ns = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() as u64; + + let mut otel_metrics: Vec = Vec::new(); + + for m in metrics { + flatten_metrics(m, &mut otel_metrics, &mut Vec::new()); + } + + if otel_metrics.is_empty() { + return Ok(()); + } + + let resource = Resource { + attributes: vec![KeyValue { + key: "service.name".to_string(), + value: Some(AnyValue { value: Some(Value::StringValue("perfetto".to_string())) }), + }], + dropped_attributes_count: 0, + entity_refs: Vec::new(), + }; + + let scope_metrics = ScopeMetrics { + scope: Some(opentelemetry_proto::tonic::common::v1::InstrumentationScope { + name: "perfetto-ingest".to_string(), + version: String::new(), + attributes: Vec::new(), + dropped_attributes_count: 0, + }), + metrics: otel_metrics, + schema_url: String::new(), + }; + + let request = ExportMetricsServiceRequest { + resource_metrics: vec![ResourceMetrics { + resource: Some(resource), + scope_metrics: vec![scope_metrics], + schema_url: String::new(), + }], + }; + + let payload = request.encode_to_vec(); + unsafe { emit(plugin, crate::LJ_INGEST_RECORD_TYPE_METRICS, now_ns, &payload) }; + + Ok(()) +} + +fn flatten_metrics( + metric: &PerfettoMetric, + out: &mut Vec, + prefix: &mut Vec, +) { + prefix.push(metric.name.clone()); + let full_name = prefix.join("."); + + if let Some(scalar) = metric.scalar_value { + let attrs: Vec = metric + .labels + .iter() + .map(|(k, v)| KeyValue { + key: k.clone(), + value: Some(AnyValue { value: Some(Value::StringValue(v.clone())) }), + }) + .collect(); + + out.push(Metric { + name: full_name, + description: metric.description.clone().unwrap_or_default(), + unit: metric.unit.clone().unwrap_or_default(), + data: Some(opentelemetry_proto::tonic::metrics::v1::metric::Data::Gauge( + opentelemetry_proto::tonic::metrics::v1::Gauge { + data_points: vec![NumberDataPoint { + attributes: attrs, + start_time_unix_nano: 0, + time_unix_nano: 0, + value: Some(DataPointValue::AsDouble(scalar)), + flags: 0, + exemplars: Vec::new(), + }], + }, + )), + metadata: Vec::new(), + }); + } + + for child in &metric.children { + flatten_metrics(child, out, prefix); + } + + prefix.pop(); +} diff --git a/plugins/perfetto-ingest/src/metrics_reader.rs b/plugins/perfetto-ingest/src/metrics_reader.rs new file mode 100644 index 0000000..74196b5 --- /dev/null +++ b/plugins/perfetto-ingest/src/metrics_reader.rs @@ -0,0 +1,86 @@ +//! Parses Perfetto metrics JSON output. +//! +//! The `trace_processor metrics --run NAMES --output json` command writes +//! JSON to stdout. Each metric is a top-level key in the JSON object. + +use std::path::Path; + +/// One parsed metric from the Perfetto metrics JSON output. +#[derive(Debug, Clone)] +pub struct PerfettoMetric { + /// Metric name, e.g. "trace_stats" or "android_startup". + pub name: String, + /// Human-readable description. + pub description: Option, + /// Unit, e.g. "ms", "bytes", or empty. + pub unit: Option, + /// Scalar value, if the metric is a simple number. + pub scalar_value: Option, + /// String labels attached to the metric (key-value pairs). + pub labels: Vec<(String, String)>, + /// Nested sub-metrics (for structured metric outputs). + pub children: Vec, +} + +/// Parses a Perfetto metrics JSON file into a flat-ish list of metrics. +/// +/// The JSON structure from trace_processor is typically: +/// ```json +/// { +/// "metric_name": { +/// "value": 123, +/// "description": "...", +/// "unit": "ms" +/// } +/// } +/// ``` +/// or for structured metrics with nested entries. +pub fn parse_metrics_json(path: &Path) -> Result, String> { + let bytes = std::fs::read(path) + .map_err(|err| format!("failed to read metrics JSON file {}: {err}", path.display()))?; + + let root: serde_json::Value = serde_json::from_slice(&bytes) + .map_err(|err| format!("failed to parse metrics JSON: {err}"))?; + + let obj = root.as_object().ok_or_else(|| format!("metrics JSON root is not an object: {}", path.display()))?; + + let mut out = Vec::new(); + for (key, value) in obj { + let metric = parse_metric(key, value); + out.push(metric); + } + Ok(out) +} + +fn parse_metric(name: &str, value: &serde_json::Value) -> PerfettoMetric { + let obj = value.as_object(); + + let scalar_value = obj + .and_then(|o| o.get("value")) + .and_then(|v| v.as_f64()) + .or_else(|| value.as_f64()); + + let description = obj.and_then(|o| o.get("description")).and_then(|v| v.as_str()).map(String::from); + + let unit = obj.and_then(|o| o.get("unit")).and_then(|v| v.as_str()).map(String::from); + + let labels = obj + .map(|o| { + o.iter() + .filter(|(k, v)| !matches!(k.as_str(), "value" | "description" | "unit") && v.is_string()) + .map(|(k, v)| (k.clone(), v.as_str().unwrap_or_default().to_string())) + .collect() + }) + .unwrap_or_default(); + + let children = obj + .map(|o| { + o.iter() + .filter(|(_, v)| v.is_object()) + .map(|(k, v)| parse_metric(k, v)) + .collect() + }) + .unwrap_or_default(); + + PerfettoMetric { name: name.to_string(), description, unit, scalar_value, labels, children } +} diff --git a/plugins/perfetto-ingest/src/perfetto_invoke.rs b/plugins/perfetto-ingest/src/perfetto_invoke.rs new file mode 100644 index 0000000..031500a --- /dev/null +++ b/plugins/perfetto-ingest/src/perfetto_invoke.rs @@ -0,0 +1,118 @@ +//! Spawns Perfetto `trace_processor` and captures its output. + +use std::env; +use std::io; +use std::path::{Path, PathBuf}; +use std::process::Command; + +/// Locates the trace_processor binary. +/// +/// Checks `LJD_PERFETTO_TRACE_PROCESSOR` env var first, then falls back to +/// PATH search for `trace_processor` (or `trace_processor_shell`). +#[allow(dead_code)] +pub fn find_trace_processor() -> io::Result { + if let Ok(raw) = env::var("LJD_PERFETTO_TRACE_PROCESSOR") { + let path = PathBuf::from(raw); + if path.is_file() { + return Ok(path); + } + return Err(io::Error::other(format!( + "LJD_PERFETTO_TRACE_PROCESSOR is set but not a file: {}", + path.display() + ))); + } + + for name in ["trace_processor", "trace_processor_shell"] { + if let Some(path) = find_on_path(name) { + return Ok(path); + } + } + + Err(io::Error::other( + "trace_processor not found. Set LJD_PERFETTO_TRACE_PROCESSOR or install Perfetto tools on PATH.", + )) +} + +#[allow(dead_code)] +fn find_on_path(name: &str) -> Option { + let path_var = env::var_os("PATH")?; + for dir in env::split_paths(&path_var) { + let candidate = dir.join(name); + if candidate.is_file() { + return Some(candidate); + } + } + None +} + +/// Runs `trace_processor export sqlite -o ` and returns the +/// path to the exported SQLite database. +#[allow(dead_code)] +pub fn export_sqlite(trace_file: &Path, tp_path: &Path) -> io::Result { + let output = temp_file_path("perfetto-export", "sqlite")?; + + let status = Command::new(tp_path) + .arg("export") + .arg("sqlite") + .arg("-o") + .arg(&output) + .arg(trace_file) + .status() + .map_err(|err| io::Error::other(format!("failed to spawn trace_processor for sqlite export: {err}")))?; + + if !status.success() { + let code = status.code().unwrap_or(-1); + let _ = std::fs::remove_file(&output); + return Err(io::Error::other(format!("trace_processor export sqlite exited with code {code}"))); + } + + if !output.exists() { + return Err(io::Error::other("trace_processor export sqlite did not produce output file")); + } + + Ok(output) +} + +/// Runs `trace_processor metrics --run --output json` and +/// returns the path to the captured JSON output. Returns `None` if metrics +/// list is empty. +#[allow(dead_code)] +pub fn export_metrics(trace_file: &Path, tp_path: &Path, metrics: &[&str]) -> io::Result> { + if metrics.is_empty() { + return Ok(None); + } + + let output = temp_file_path("perfetto-metrics", "json")?; + let metrics_arg = metrics.join(","); + + let result = Command::new(tp_path) + .arg("metrics") + .arg(trace_file) + .arg("--run") + .arg(&metrics_arg) + .arg("--output") + .arg("json") + .output() + .map_err(|err| io::Error::other(format!("failed to spawn trace_processor for metrics: {err}")))?; + + if !result.status.success() { + let code = result.status.code().unwrap_or(-1); + let stderr = String::from_utf8_lossy(&result.stderr); + let _ = std::fs::remove_file(&output); + return Err(io::Error::other(format!("trace_processor metrics exited with code {code}: {stderr}"))); + } + + std::fs::write(&output, &result.stdout)?; + Ok(Some(output)) +} + +#[allow(dead_code)] +fn temp_file_path(prefix: &str, suffix: &str) -> io::Result { + let pid = std::process::id(); + let ts = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_micros(); + let name = format!("{prefix}-{ts}-{pid}.{suffix}"); + Ok(env::temp_dir().join(name)) +} diff --git a/plugins/perfetto-ingest/src/sqlite_reader.rs b/plugins/perfetto-ingest/src/sqlite_reader.rs new file mode 100644 index 0000000..5e478ca --- /dev/null +++ b/plugins/perfetto-ingest/src/sqlite_reader.rs @@ -0,0 +1,345 @@ +//! Reads exported Perfetto SQLite databases. +//! +//! Provides typed models for all tables needed by the OTel mappers and a +//! `PerfettoDb` struct that opens the exported database and exposes query +//! methods. + +#![allow(dead_code)] + +use std::path::Path; + +// Typed models + +#[derive(Debug, Clone)] +pub struct PerfettoSlice { + pub id: i64, + pub ts: i64, + pub dur: i64, + pub name: Option, + pub parent_id: Option, + pub track_id: i64, + pub arg_set_id: Option, + pub depth: i32, +} + +#[derive(Debug, Clone)] +pub struct PerfettoFlow { + pub id: i64, + pub slice_out: i64, + pub slice_in: i64, +} + +#[derive(Debug, Clone)] +pub struct PerfettoProcess { + pub upid: i64, + pub name: Option, + pub pid: Option, +} + +#[derive(Debug, Clone)] +pub struct PerfettoThread { + pub utid: i64, + pub name: Option, + pub tid: Option, + pub upid: Option, + pub is_main_thread: bool, +} + +#[derive(Debug, Clone)] +pub struct PerfettoTrack { + pub id: i64, + pub name: Option, + pub track_type: Option, + pub utid: Option, + pub upid: Option, +} + +#[derive(Debug, Clone)] +pub struct PerfettoArg { + pub arg_set_id: i64, + pub key: String, + pub string_value: Option, + pub int_value: Option, + pub real_value: Option, +} + +#[derive(Debug, Clone)] +pub struct PerfettoClockSnapshot { + pub ts: i64, + pub clock_value: i64, +} + +#[derive(Debug, Clone)] +pub struct PerfettoSchedSlice { + pub id: i64, + pub ts: i64, + pub dur: i64, + pub utid: i64, + pub cpu: i64, + pub end_state: Option, +} + +pub struct PerfettoDb { + pub(crate) conn: rusqlite::Connection, +} + +impl PerfettoDb { + /// Opens an exported Perfetto SQLite database. + pub fn open(path: &Path) -> Result { + let conn = rusqlite::Connection::open(path) + .map_err(|err| format!("failed to open exported SQLite DB {}: {err}", path.display()))?; + + // Enable WAL mode for better read concurrency. + let _ = conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA read_uncommitted=1;"); + + Ok(Self { conn }) + } + + /// Reads all slices ordered by ts. + pub fn read_slices(&self) -> Result, String> { + let mut stmt = self + .conn + .prepare( + "SELECT id, ts, dur, name, parent_id, track_id, arg_set_id, depth + FROM slice + ORDER BY ts", + ) + .map_err(|err| format!("failed to prepare slice query: {err}"))?; + + let rows = stmt + .query_map([], |row| { + Ok(PerfettoSlice { + id: row.get(0)?, + ts: row.get(1)?, + dur: row.get(2)?, + name: row.get(3)?, + parent_id: row.get(4)?, + track_id: row.get(5)?, + arg_set_id: row.get(6)?, + depth: row.get(7)?, + }) + }) + .map_err(|err| format!("failed to query slices: {err}"))?; + + let mut out = Vec::new(); + for row in rows { + out.push(row.map_err(|err| format!("failed to read slice row: {err}"))?); + } + Ok(out) + } + + /// Reads all flows. + pub fn read_flows(&self) -> Result, String> { + let mut stmt = self + .conn + .prepare("SELECT id, slice_out, slice_in FROM flow ORDER BY id") + .map_err(|err| format!("failed to prepare flow query: {err}"))?; + + let rows = stmt + .query_map([], |row| { + Ok(PerfettoFlow { id: row.get(0)?, slice_out: row.get(1)?, slice_in: row.get(2)? }) + }) + .map_err(|err| format!("failed to query flows: {err}"))?; + + let mut out = Vec::new(); + for row in rows { + out.push(row.map_err(|err| format!("failed to read flow row: {err}"))?); + } + Ok(out) + } + + /// Reads all processes. + pub fn read_processes(&self) -> Result, String> { + let mut stmt = self + .conn + .prepare("SELECT upid, name, pid FROM process ORDER BY upid") + .map_err(|err| format!("failed to prepare process query: {err}"))?; + + let rows = stmt + .query_map([], |row| { + Ok(PerfettoProcess { upid: row.get(0)?, name: row.get(1)?, pid: row.get(2)? }) + }) + .map_err(|err| format!("failed to query processes: {err}"))?; + + let mut out = Vec::new(); + for row in rows { + out.push(row.map_err(|err| format!("failed to read process row: {err}"))?); + } + Ok(out) + } + + /// Reads all threads. + pub fn read_threads(&self) -> Result, String> { + let mut stmt = self + .conn + .prepare("SELECT utid, name, tid, upid, is_main_thread FROM thread ORDER BY utid") + .map_err(|err| format!("failed to prepare thread query: {err}"))?; + + let rows = stmt + .query_map([], |row| { + Ok(PerfettoThread { + utid: row.get(0)?, + name: row.get(1)?, + tid: row.get(2)?, + upid: row.get(3)?, + is_main_thread: row.get::<_, Option>(4)?.unwrap_or(0) != 0, + }) + }) + .map_err(|err| format!("failed to query threads: {err}"))?; + + let mut out = Vec::new(); + for row in rows { + out.push(row.map_err(|err| format!("failed to read thread row: {err}"))?); + } + Ok(out) + } + + /// Reads all tracks. Queries the intrinsic table because the `track` + /// view does not expose utid/upid columns. + pub fn read_tracks(&self) -> Result, String> { + let mut stmt = self + .conn + .prepare("SELECT id, name, type, utid, upid FROM __intrinsic_track ORDER BY id") + .map_err(|err| format!("failed to prepare track query: {err}"))?; + + let rows = stmt + .query_map([], |row| { + Ok(PerfettoTrack { + id: row.get(0)?, + name: row.get(1)?, + track_type: row.get(2)?, + utid: row.get(3)?, + upid: row.get(4)?, + }) + }) + .map_err(|err| format!("failed to query tracks: {err}"))?; + + let mut out = Vec::new(); + for row in rows { + out.push(row.map_err(|err| format!("failed to read track row: {err}"))?); + } + Ok(out) + } + + /// Reads args for the given arg_set_ids. Pass an empty slice to read all. + pub fn read_args(&self, arg_set_ids: &[i64]) -> Result, String> { + if arg_set_ids.is_empty() { + let mut stmt = self + .conn + .prepare( + "SELECT arg_set_id, flat_key, string_value, int_value, real_value + FROM args + ORDER BY arg_set_id, flat_key", + ) + .map_err(|err| format!("failed to prepare args query: {err}"))?; + + let rows = stmt + .query_map([], |row| { + Ok(PerfettoArg { + arg_set_id: row.get(0)?, + key: row.get(1)?, + string_value: row.get(2)?, + int_value: row.get(3)?, + real_value: row.get(4)?, + }) + }) + .map_err(|err| format!("failed to query args: {err}"))?; + + let mut out = Vec::new(); + for row in rows { + out.push(row.map_err(|err| format!("failed to read arg row: {err}"))?); + } + return Ok(out); + } + + let placeholders = arg_set_ids.iter().map(|_| "?").collect::>().join(","); + let sql = format!( + "SELECT arg_set_id, flat_key, string_value, int_value, real_value + FROM args + WHERE arg_set_id IN ({placeholders}) + ORDER BY arg_set_id, flat_key" + ); + + let mut stmt = self.conn.prepare(&sql).map_err(|err| format!("failed to prepare args IN query: {err}"))?; + + let params: Vec<&dyn rusqlite::types::ToSql> = arg_set_ids + .iter() + .map(|id| id as &dyn rusqlite::types::ToSql) + .collect(); + + let rows = stmt + .query_map(params.as_slice(), |row| { + Ok(PerfettoArg { + arg_set_id: row.get(0)?, + key: row.get(1)?, + string_value: row.get(2)?, + int_value: row.get(3)?, + real_value: row.get(4)?, + }) + }) + .map_err(|err| format!("failed to query args by id: {err}"))?; + + let mut out = Vec::new(); + for row in rows { + out.push(row.map_err(|err| format!("failed to read arg row: {err}"))?); + } + Ok(out) + } + + /// Reads clock snapshots ordered by ts. Returns entries for all clock_ids. + pub fn read_clock_snapshots(&self) -> Result, String> { + let mut stmt = self + .conn + .prepare( + "SELECT ts, clock_value + FROM clock_snapshot + WHERE clock_name = 'REALTIME' + ORDER BY ts", + ) + .map_err(|err| format!("failed to prepare clock_snapshot query: {err}"))?; + + let rows = stmt + .query_map([], |row| Ok(PerfettoClockSnapshot { ts: row.get(0)?, clock_value: row.get(1)? })) + .map_err(|err| format!("failed to query clock_snapshots: {err}"))?; + + let mut out = Vec::new(); + for row in rows { + match row { + Ok(entry) => out.push(entry), + Err(err) => return Err(format!("failed to read clock_snapshot row: {err}")), + } + } + Ok(out) + } + + /// Reads all sched_slice entries ordered by ts. + pub fn read_sched_slices(&self) -> Result, String> { + let mut stmt = self + .conn + .prepare( + "SELECT id, ts, dur, utid, ucpu, end_state + FROM sched_slice + ORDER BY ts", + ) + .map_err(|err| format!("failed to prepare sched_slice query: {err}"))?; + + let rows = stmt + .query_map([], |row| { + Ok(PerfettoSchedSlice { + id: row.get(0)?, + ts: row.get(1)?, + dur: row.get(2)?, + utid: row.get(3)?, + cpu: row.get(4)?, + end_state: row.get(5)?, + }) + }) + .map_err(|err| format!("failed to query sched_slice: {err}"))?; + + let mut out = Vec::new(); + for row in rows { + out.push(row.map_err(|err| format!("failed to read sched_slice row: {err}"))?); + } + Ok(out) + } +} diff --git a/plugins/perfetto-ingest/src/timestamp.rs b/plugins/perfetto-ingest/src/timestamp.rs new file mode 100644 index 0000000..27a7167 --- /dev/null +++ b/plugins/perfetto-ingest/src/timestamp.rs @@ -0,0 +1,98 @@ +//! Trace-clock to Unix epoch timestamp conversion. +//! +//! Uses the `clock_snapshot` table from the Perfetto SQLite export to convert +//! trace-clock timestamps (typically CLOCK_MONOTONIC or CLOCK_BOOTTIME) to +//! Unix epoch nanoseconds via REALTIME clock snapshots. + +use crate::sqlite_reader::PerfettoClockSnapshot; + +/// Controls behaviour when realtime conversion is unavailable for a timestamp. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TimestampPolicy { + /// Return an error if realtime conversion is unavailable. + RequireRealtime, + /// Return `None` when realtime is unavailable (caller should annotate). + BestEffort, +} + +/// Converts trace-clock timestamps to Unix epoch nanoseconds using +/// REALTIME clock snapshots. +pub struct TimestampConverter { + snapshots: Vec, + policy: TimestampPolicy, +} + +impl TimestampConverter { + /// Creates a new converter from the given clock snapshots. + /// + /// Snapshots must be sorted by `ts` ascending. + pub fn new(snapshots: Vec, policy: TimestampPolicy) -> Self { + Self { snapshots, policy } + } + + /// Converts a trace-clock timestamp to Unix epoch nanoseconds. + /// + /// Returns: + /// - `Some(ns)` on success + /// - `None` under BestEffort policy when realtime is unavailable + /// - `Err` under RequireRealtime policy when realtime is unavailable + pub fn to_realtime(&self, trace_ts: i64) -> Result, String> { + if self.snapshots.is_empty() { + match self.policy { + TimestampPolicy::RequireRealtime => { + return Err("no REALTIME clock snapshots available for timestamp conversion".to_string()); + } + TimestampPolicy::BestEffort => return Ok(None), + } + } + + // Binary search for the first snapshot with ts > trace_ts. + let idx = self.snapshots.partition_point(|s| s.ts <= trace_ts); + + if idx == 0 { + // Before first snapshot: interpolate backwards. + let first = &self.snapshots[0]; + match self.policy { + TimestampPolicy::RequireRealtime => { + return Err(format!( + "trace timestamp {trace_ts} is before first REALTIME snapshot at {}", + first.ts + )); + } + TimestampPolicy::BestEffort => { + let delta = first.ts - trace_ts; + let realtime = (first.clock_value - delta).max(0) as u64; + return Ok(Some(realtime)); + } + } + } + + if idx >= self.snapshots.len() { + // After last snapshot: extrapolate forwards. + let last = &self.snapshots[self.snapshots.len() - 1]; + let delta = trace_ts - last.ts; + Ok(Some((last.clock_value + delta).max(0) as u64)) + } else { + // Between two snapshots: linear interpolation. + let prev = &self.snapshots[idx - 1]; + let next = &self.snapshots[idx]; + + let range_ts = next.ts - prev.ts; + let range_realtime = next.clock_value - prev.clock_value; + + if range_ts == 0 { + Ok(Some(prev.clock_value as u64)) + } else { + let offset = trace_ts - prev.ts; + let realtime = (prev.clock_value as i128) + + (range_realtime as i128 * offset as i128) / range_ts as i128; + Ok(Some(realtime.max(0) as u64)) + } + } + } + + /// Returns whether the converter has snapshots (i.e., realtime is available). + pub fn has_realtime(&self) -> bool { + !self.snapshots.is_empty() + } +} diff --git a/plugins/perfetto-ingest/src/trace_mapper.rs b/plugins/perfetto-ingest/src/trace_mapper.rs new file mode 100644 index 0000000..81eee3e --- /dev/null +++ b/plugins/perfetto-ingest/src/trace_mapper.rs @@ -0,0 +1,226 @@ +//! Maps Perfetto slice/flow/process/thread data to OTel spans. +//! +//! Reads slices from the exported SQLite DB, joins with track/thread/process +//! metadata, builds OTel spans with proper IDs and attributes, encodes them as +//! `ExportTraceServiceRequest` protobuf batches, and streams them through the +//! generic record callback. + +#![allow(dead_code)] + +use std::collections::HashMap; + +use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; +use opentelemetry_proto::tonic::common::v1::any_value::Value; +use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue}; +use opentelemetry_proto::tonic::resource::v1::Resource; +use opentelemetry_proto::tonic::trace::v1::span::SpanKind; +use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, ScopeSpans, Span, Status}; +use opentelemetry_proto::tonic::trace::v1::status::StatusCode; +use prost::Message; + +use crate::sqlite_reader::{PerfettoDb, PerfettoSlice}; +use crate::timestamp::TimestampConverter; + +/// Maximum spans per OTLP export batch. +const SPANS_PER_BATCH: usize = 200; + +/// Context gathered from the DB for span construction. +struct TraceContext { + /// Thread name by utid. + thread_names: HashMap, + /// Process name by upid. + process_names: HashMap, + /// upid for each thread utid. + thread_process: HashMap, +} + +impl TraceContext { + fn build(db: &PerfettoDb) -> Result { + let threads = db.read_threads()?; + let _tracks = db.read_tracks()?; + let processes = db.read_processes()?; + + let thread_names: HashMap = threads + .iter() + .filter_map(|t| t.name.clone().map(|n| (t.utid, n))) + .collect(); + let process_names: HashMap = processes + .iter() + .filter_map(|p| p.name.clone().map(|n| (p.upid, n))) + .collect(); + let thread_process: HashMap = threads.iter().filter_map(|t| t.upid.map(|u| (t.utid, u))).collect(); + + Ok(Self { thread_names, process_names, thread_process }) + } +} + +/// Maps all slices in the DB to OTel spans and streams them through `emit`. +pub fn map_traces( + db: &PerfettoDb, + converter: &TimestampConverter, + emit: unsafe fn(ctx: &crate::PerfettoPlugin, record_type: u32, ts_unix_ns: u64, payload: &[u8]), + plugin: &crate::PerfettoPlugin, +) -> Result<(), String> { + let ctx = TraceContext::build(db)?; + let slices = db.read_slices()?; + + if slices.is_empty() { + return Ok(()); + } + + // Build a constant trace_id for this trace file. + let trace_id = make_trace_id(); + + let mut batch: Vec = Vec::with_capacity(SPANS_PER_BATCH); + let mut batch_min_ts: Option = None; + + for slice in &slices { + let span = build_span(slice, &trace_id, &ctx, converter)?; + let span_ts = match converter.to_realtime(slice.ts)? { + Some(ts) => ts, + None => continue, // BestEffort: skip spans without realtime + }; + + if batch_min_ts.is_none() || span_ts < batch_min_ts.unwrap() { + batch_min_ts = Some(span_ts); + } + + batch.push(span); + + if batch.len() >= SPANS_PER_BATCH { + flush_batch(&mut batch, &mut batch_min_ts, &trace_id, emit, plugin)?; + } + } + + flush_batch(&mut batch, &mut batch_min_ts, &trace_id, emit, plugin)?; + Ok(()) +} + +fn build_span( + slice: &PerfettoSlice, + trace_id: &[u8; 16], + _ctx: &TraceContext, + converter: &TimestampConverter, +) -> Result { + let start_time = converter.to_realtime(slice.ts)?.unwrap_or(0); + let end_time = converter.to_realtime(slice.ts.saturating_add(slice.dur))?.unwrap_or_else(|| start_time.saturating_add(slice.dur.max(0) as u64)); + + let span_id = make_span_id(slice.id); + let parent_span_id = slice.parent_id.map(make_span_id).unwrap_or_default(); + + let name = slice.name.clone().unwrap_or_else(|| format!("slice-{}", slice.id)); + + let mut attrs = vec![ + key_value("perfetto.slice.id", any_string(&slice.id.to_string())), + key_value("perfetto.slice.ts", any_string(&slice.ts.to_string())), + key_value("perfetto.slice.dur_ns", any_string(&slice.dur.to_string())), + key_value("perfetto.slice.track_id", any_string(&slice.track_id.to_string())), + ]; + + // Attach thread/process context via track_id lookup. + // We don't have track_id → utid mapping directly in slices, but we can add + // a note. For now, attach the slice depth. + attrs.push(key_value("perfetto.slice.depth", any_int(slice.depth as i64))); + + let status = Status { code: StatusCode::Unset as i32, message: String::new() }; + + Ok(Span { + trace_id: trace_id.to_vec(), + span_id: span_id.to_vec(), + trace_state: String::new(), + parent_span_id: parent_span_id.to_vec(), + name, + kind: SpanKind::Internal as i32, + start_time_unix_nano: start_time, + end_time_unix_nano: end_time, + attributes: attrs, + dropped_attributes_count: 0, + events: Vec::new(), + dropped_events_count: 0, + links: Vec::new(), + dropped_links_count: 0, + status: Some(status), + flags: 0, + }) +} + +fn flush_batch( + batch: &mut Vec, + batch_min_ts: &mut Option, + _trace_id: &[u8; 16], + emit: unsafe fn(ctx: &crate::PerfettoPlugin, record_type: u32, ts_unix_ns: u64, payload: &[u8]), + plugin: &crate::PerfettoPlugin, +) -> Result<(), String> { + if batch.is_empty() { + return Ok(()); + } + + let spans = std::mem::replace(batch, Vec::with_capacity(SPANS_PER_BATCH)); + let ts = batch_min_ts.unwrap_or(0); + *batch_min_ts = None; + + let resource = Resource { + attributes: vec![ + key_value("service.name", any_string("perfetto")), + ], + dropped_attributes_count: 0, + entity_refs: Vec::new(), + }; + + let scope_spans = ScopeSpans { + scope: Some(opentelemetry_proto::tonic::common::v1::InstrumentationScope { + name: "perfetto-ingest".to_string(), + version: String::new(), + attributes: Vec::new(), + dropped_attributes_count: 0, + }), + spans, + schema_url: String::new(), + }; + + let request = ExportTraceServiceRequest { + resource_spans: vec![ResourceSpans { + resource: Some(resource), + scope_spans: vec![scope_spans], + schema_url: String::new(), + }], + }; + + let payload = request.encode_to_vec(); + unsafe { emit(plugin, crate::LJ_INGEST_RECORD_TYPE_TRACES, ts, &payload) }; + + Ok(()) +} + +// ID generation + +fn make_trace_id() -> [u8; 16] { + let mut id = [0u8; 16]; + let ts = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() as u64; + id[..8].copy_from_slice(&ts.to_le_bytes()); + id[8..16].copy_from_slice(&(std::process::id() as u64).to_le_bytes()); + id +} + +fn make_span_id(slice_id: i64) -> [u8; 8] { + let mut id = [0u8; 8]; + id.copy_from_slice(&slice_id.to_le_bytes()); + id +} + +// Attribute helpers + +fn key_value(key: &str, value: AnyValue) -> KeyValue { + KeyValue { key: key.to_string(), value: Some(value) } +} + +fn any_string(s: &str) -> AnyValue { + AnyValue { value: Some(Value::StringValue(s.to_string())) } +} + +fn any_int(v: i64) -> AnyValue { + AnyValue { value: Some(Value::IntValue(v)) } +} diff --git a/plugins/perfetto-ingest/tests/unit/tests.rs b/plugins/perfetto-ingest/tests/unit/tests.rs new file mode 100644 index 0000000..0bafa80 --- /dev/null +++ b/plugins/perfetto-ingest/tests/unit/tests.rs @@ -0,0 +1,483 @@ +//! Unit tests for the perfetto-ingest plugin. + +use super::*; +use prost::Message; + +/// Captured emitted record: (record_type, timestamp_unix_ns, payload). +type EmittedRecord = (u32, u64, Vec); + +// sqlite_reader tests + +#[test] +fn sqlite_reader_reads_slices_ordered_by_ts() { + let db = test_db(); + let slices = db.read_slices().unwrap(); + assert_eq!(slices.len(), 3); + assert_eq!(slices[0].id, 3); // ts=1000 + assert_eq!(slices[1].id, 1); // ts=5000 + assert_eq!(slices[2].id, 2); // ts=8000 +} + +#[test] +fn sqlite_reader_reads_slice_fields() { + let db = test_db(); + let slices = db.read_slices().unwrap(); + let s = &slices[0]; + assert_eq!(s.id, 3); + assert_eq!(s.ts, 1000); + assert_eq!(s.dur, 500); + assert_eq!(s.name.as_deref(), Some("early-slice")); + assert_eq!(s.parent_id, None); + assert_eq!(s.depth, 0); + assert_eq!(s.track_id, 10); + assert_eq!(s.arg_set_id, Some(1)); +} + +#[test] +fn sqlite_reader_reads_flows() { + let db = test_db(); + let flows = db.read_flows().unwrap(); + assert_eq!(flows.len(), 1); + assert_eq!(flows[0].slice_out, 1); + assert_eq!(flows[0].slice_in, 2); +} + +#[test] +fn sqlite_reader_reads_processes() { + let db = test_db(); + let procs = db.read_processes().unwrap(); + assert_eq!(procs.len(), 1); + assert_eq!(procs[0].upid, 100); + assert_eq!(procs[0].name.as_deref(), Some("test-process")); + assert_eq!(procs[0].pid, Some(1234)); +} + +#[test] +fn sqlite_reader_reads_threads() { + let db = test_db(); + let threads = db.read_threads().unwrap(); + assert_eq!(threads.len(), 1); + assert_eq!(threads[0].utid, 200); + assert_eq!(threads[0].name.as_deref(), Some("test-thread")); + assert_eq!(threads[0].tid, Some(5678)); + assert_eq!(threads[0].upid, Some(100)); + assert!(threads[0].is_main_thread); +} + +#[test] +fn sqlite_reader_reads_tracks() { + let db = test_db(); + let tracks = db.read_tracks().unwrap(); + assert_eq!(tracks.len(), 1); + assert_eq!(tracks[0].id, 10); + assert_eq!(tracks[0].name.as_deref(), Some("test-track")); + assert_eq!(tracks[0].track_type.as_deref(), Some("thread_track")); + assert_eq!(tracks[0].utid, Some(200)); +} + +#[test] +fn sqlite_reader_reads_all_args() { + let db = test_db(); + let args = db.read_args(&[]).unwrap(); + assert_eq!(args.len(), 2); + assert!(!args.is_empty()); +} + +#[test] +fn sqlite_reader_reads_args_by_arg_set_id() { + let db = test_db(); + let args = db.read_args(&[1]).unwrap(); + assert_eq!(args.len(), 1); + assert_eq!(args[0].key, "slice.name"); + assert_eq!(args[0].string_value.as_deref(), Some("early-slice")); +} + +#[test] +fn sqlite_reader_reads_empty_args_for_unknown_id() { + let db = test_db(); + let args = db.read_args(&[999]).unwrap(); + assert!(args.is_empty()); +} + +#[test] +fn sqlite_reader_reads_clock_snapshots() { + let db = test_db(); + let snaps = db.read_clock_snapshots().unwrap(); + assert_eq!(snaps.len(), 2); + assert_eq!(snaps[0].ts, 0); + assert_eq!(snaps[0].clock_value, 1_700_000_000_000_000_000); + assert_eq!(snaps[1].ts, 10000); + assert_eq!(snaps[1].clock_value, 1_700_000_000_000_010_000); +} + +// metrics_reader tests + +#[test] +fn metrics_reader_parses_scalar_metric() { + let json = r#"{"trace_stats": {"value": 42.5, "description": "test metric", "unit": "ms"}}"#; + let path = temp_json("scalar", json); + let metrics = metrics_reader::parse_metrics_json(&path).unwrap(); + assert_eq!(metrics.len(), 1); + assert_eq!(metrics[0].name, "trace_stats"); + assert_eq!(metrics[0].scalar_value, Some(42.5)); + assert_eq!(metrics[0].description.as_deref(), Some("test metric")); + assert_eq!(metrics[0].unit.as_deref(), Some("ms")); +} + +#[test] +fn metrics_reader_parses_metric_with_labels() { + let json = r#"{"cpu_usage": {"value": 85.0, "cpu": "cpu0", "mode": "user"}}"#; + let path = temp_json("labels", json); + let metrics = metrics_reader::parse_metrics_json(&path).unwrap(); + assert_eq!(metrics.len(), 1); + assert_eq!(metrics[0].scalar_value, Some(85.0)); + assert!(metrics[0].labels.iter().any(|(k, v)| k == "cpu" && v == "cpu0")); + assert!(metrics[0].labels.iter().any(|(k, v)| k == "mode" && v == "user")); +} + +#[test] +fn metrics_reader_parses_multiple_metrics() { + let json = r#"{"m1": {"value": 1.0}, "m2": {"value": 2.0}}"#; + let path = temp_json("multi", json); + let metrics = metrics_reader::parse_metrics_json(&path).unwrap(); + assert_eq!(metrics.len(), 2); +} + +#[test] +fn metrics_reader_parses_nested_metric() { + let json = r#"{"parent": {"value": 10.0, "child": {"value": 5.0}}}"#; + let path = temp_json("nested", json); + let metrics = metrics_reader::parse_metrics_json(&path).unwrap(); + assert_eq!(metrics.len(), 1); + assert_eq!(metrics[0].children.len(), 1); + assert_eq!(metrics[0].children[0].name, "child"); + assert_eq!(metrics[0].children[0].scalar_value, Some(5.0)); +} + +// helpers + +fn temp_json(name: &str, content: &str) -> std::path::PathBuf { + let dir = std::env::temp_dir(); + let path = dir.join(format!("perfetto-test-{name}-{}.json", std::process::id())); + std::fs::write(&path, content).unwrap(); + path +} + +fn test_db() -> super::sqlite_reader::PerfettoDb { + let conn = rusqlite::Connection::open_in_memory().unwrap(); + + conn.execute_batch( + " + CREATE TABLE slice ( + id INTEGER, ts INTEGER, dur INTEGER, name TEXT, + parent_id INTEGER, track_id INTEGER, arg_set_id INTEGER, depth INTEGER + ); + CREATE TABLE flow (id INTEGER, slice_out INTEGER, slice_in INTEGER); + CREATE TABLE process (upid INTEGER, name TEXT, pid INTEGER); + CREATE TABLE thread (utid INTEGER, name TEXT, tid INTEGER, upid INTEGER, is_main_thread INTEGER); + CREATE TABLE __intrinsic_track (id INTEGER, name TEXT, type TEXT, utid INTEGER, upid INTEGER); + CREATE TABLE sched_slice (id INTEGER, ts INTEGER, dur INTEGER, utid INTEGER, ucpu INTEGER, end_state TEXT); + CREATE TABLE args (arg_set_id INTEGER, flat_key TEXT, string_value TEXT, int_value INTEGER, real_value REAL); + CREATE TABLE clock_snapshot (ts INTEGER, clock_value INTEGER, clock_id INTEGER, clock_name TEXT); + + INSERT INTO slice VALUES + (1, 5000, 3000, 'main-slice', NULL, 10, 2, 0), + (2, 8000, 1000, 'child-slice', 1, 10, NULL, 1), + (3, 1000, 500, 'early-slice', NULL, 10, 1, 0); + INSERT INTO flow VALUES (1, 1, 2); + INSERT INTO process VALUES (100, 'test-process', 1234); + INSERT INTO thread VALUES (200, 'test-thread', 5678, 100, 1); + INSERT INTO __intrinsic_track VALUES (10, 'test-track', 'thread_track', 200, NULL); + INSERT INTO args VALUES (1, 'slice.name', 'early-slice', NULL, NULL); + INSERT INTO args VALUES (2, 'slice.name', 'main-slice', NULL, NULL); + INSERT INTO clock_snapshot VALUES + (0, 1700000000000000000, 1, 'REALTIME'), + (10000, 1700000000000010000, 1, 'REALTIME'); + ", + ) + .unwrap(); + + super::sqlite_reader::PerfettoDb { conn } +} + +// timestamp tests + +fn make_snapshots(pairs: &[(i64, i64)]) -> Vec { + pairs.iter().map(|(ts, cv)| crate::sqlite_reader::PerfettoClockSnapshot { ts: *ts, clock_value: *cv }).collect() +} + +#[test] +fn timestamp_to_realtime_interpolates_between_snapshots() { + let snaps = make_snapshots(&[(0, 1_700_000_000_000_000_000), (10000, 1_700_000_000_000_010_000)]); + let conv = timestamp::TimestampConverter::new(snaps, timestamp::TimestampPolicy::BestEffort); + let result = conv.to_realtime(5000).unwrap(); + assert_eq!(result, Some(1_700_000_000_000_005_000)); +} + +#[test] +fn timestamp_to_realtime_exact_snapshot() { + let snaps = make_snapshots(&[(0, 1_700_000_000_000_000_000), (10000, 1_700_000_000_000_010_000)]); + let conv = timestamp::TimestampConverter::new(snaps, timestamp::TimestampPolicy::BestEffort); + let result = conv.to_realtime(0).unwrap(); + assert_eq!(result, Some(1_700_000_000_000_000_000)); +} + +#[test] +fn timestamp_to_realtime_after_last_snapshot() { + let snaps = make_snapshots(&[(0, 1_700_000_000_000_000_000), (10000, 1_700_000_000_000_010_000)]); + let conv = timestamp::TimestampConverter::new(snaps, timestamp::TimestampPolicy::BestEffort); + let result = conv.to_realtime(20000).unwrap(); + assert_eq!(result, Some(1_700_000_000_000_020_000)); +} + +#[test] +fn timestamp_to_realtime_before_first_snapshot_best_effort() { + let snaps = make_snapshots(&[(1000, 1_700_000_000_001_000_000)]); + let conv = timestamp::TimestampConverter::new(snaps, timestamp::TimestampPolicy::BestEffort); + let result = conv.to_realtime(500).unwrap(); + assert_eq!(result, Some(1_700_000_000_000_999_500)); // first.clock_value - (first.ts - trace_ts) +} + +#[test] +fn timestamp_to_realtime_before_first_snapshot_require_fails() { + let snaps = make_snapshots(&[(1000, 1_700_000_000_001_000_000)]); + let conv = timestamp::TimestampConverter::new(snaps, timestamp::TimestampPolicy::RequireRealtime); + assert!(conv.to_realtime(500).is_err()); +} + +#[test] +fn timestamp_to_realtime_empty_snapshots_best_effort_returns_none() { + let conv = timestamp::TimestampConverter::new(vec![], timestamp::TimestampPolicy::BestEffort); + let result = conv.to_realtime(500).unwrap(); + assert_eq!(result, None); +} + +#[test] +fn timestamp_to_realtime_empty_snapshots_require_fails() { + let conv = timestamp::TimestampConverter::new(vec![], timestamp::TimestampPolicy::RequireRealtime); + assert!(conv.to_realtime(500).is_err()); +} + +#[test] +fn timestamp_has_realtime() { + let conv = timestamp::TimestampConverter::new(vec![], timestamp::TimestampPolicy::BestEffort); + assert!(!conv.has_realtime()); + + let snaps = make_snapshots(&[(0, 1_700_000_000_000_000_000)]); + let conv = timestamp::TimestampConverter::new(snaps, timestamp::TimestampPolicy::BestEffort); + assert!(conv.has_realtime()); +} + +// trace_mapper tests + +#[test] +fn trace_mapper_produces_spans_from_slices() { + let db = test_db(); + let snaps = vec![crate::sqlite_reader::PerfettoClockSnapshot { + ts: 0, + clock_value: 1_700_000_000_000_000_000, + }]; + let converter = timestamp::TimestampConverter::new(snaps, timestamp::TimestampPolicy::BestEffort); + let emitted = run_trace_mapper(&db, &converter); + + assert!(!emitted.is_empty(), "expected at least one span batch"); + assert!(emitted[0].1 > 0, "expected valid record_type"); +} + +#[test] +fn trace_mapper_skips_slices_without_realtime_best_effort() { + let db = test_db(); + let converter = timestamp::TimestampConverter::new(vec![], timestamp::TimestampPolicy::BestEffort); + let emitted = run_trace_mapper(&db, &converter); + + // BestEffort with no snapshots: slices should be skipped. + assert!(emitted.is_empty()); +} + +#[test] +fn trace_mapper_fails_without_realtime_require() { + let db = test_db(); + let converter = timestamp::TimestampConverter::new(vec![], timestamp::TimestampPolicy::RequireRealtime); + let result = run_trace_mapper_result(&db, &converter); + assert!(result.is_err()); +} + +// metric_mapper tests + +#[test] +fn metric_mapper_encodes_scalar_metrics() { + let metrics = vec![crate::metrics_reader::PerfettoMetric { + name: "test_metric".to_string(), + description: Some("a test metric".to_string()), + unit: Some("ms".to_string()), + scalar_value: Some(42.0), + labels: vec![("cpu".to_string(), "0".to_string())], + children: vec![], + }]; + + let emitted = run_metric_mapper(&metrics); + assert!(!emitted.is_empty()); +} + +#[test] +fn metric_mapper_handles_empty_metrics() { + let emitted = run_metric_mapper(&[]); + assert!(emitted.is_empty()); +} + +#[test] +fn metric_mapper_flattens_nested_metrics() { + let metrics = vec![crate::metrics_reader::PerfettoMetric { + name: "parent".to_string(), + description: None, + unit: None, + scalar_value: Some(10.0), + labels: vec![], + children: vec![crate::metrics_reader::PerfettoMetric { + name: "child".to_string(), + description: None, + unit: None, + scalar_value: Some(5.0), + labels: vec![], + children: vec![], + }], + }]; + + let emitted = run_metric_mapper(&metrics); + let payload = &emitted[0].2; + let req = opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest::decode(payload.as_slice()) + .unwrap(); + let names: Vec<&str> = req.resource_metrics[0].scope_metrics[0] + .metrics + .iter() + .map(|m| m.name.as_str()) + .collect(); + assert!(names.contains(&"parent")); + assert!(names.contains(&"parent.child")); +} + +// log_mapper tests + +#[test] +fn log_mapper_produces_per_slice_and_summary_logs() { + let db = test_db(); + let emitted = run_log_mapper(&db); + // 3 slices + 1 summary = 4 log records + assert!(emitted.len() >= 2, "expected per-slice logs + summary"); + assert!(emitted.iter().all(|(rt, _, _)| *rt == crate::LJ_INGEST_RECORD_TYPE_LOGS)); +} + +// run_pipeline integration test + +#[test] +fn run_pipeline_integration_with_sqlite() { + // This test exercises the pipeline end-to-end using a pre-made SQLite DB. + // It does NOT require trace_processor — we export the in-memory DB to a + // temp file and feed the pipeline directly via a manual path. + + let tmp = temp_sqlite_file(); + let emitted = run_core_pipeline(&tmp); + + assert!(!emitted.is_empty()); + + let has_logs = emitted.iter().any(|(rt, _, _)| *rt == crate::LJ_INGEST_RECORD_TYPE_LOGS); + assert!(has_logs, "expected at least one log record from summary"); + + let _ = std::fs::remove_file(&tmp); +} + +// test helpers for mapper tests + +fn run_trace_mapper( + db: &super::sqlite_reader::PerfettoDb, + converter: ×tamp::TimestampConverter, +) -> Vec { + run_trace_mapper_result(db, converter).unwrap() +} + +fn run_trace_mapper_result( + db: &super::sqlite_reader::PerfettoDb, + converter: ×tamp::TimestampConverter, +) -> Result, String> { + let plugin = dummy_plugin(dummy_emit); + trace_mapper::map_traces(db, converter, super::emit_generic, &plugin)?; + Ok(take_records(&plugin)) +} + +fn run_metric_mapper(metrics: &[crate::metrics_reader::PerfettoMetric]) -> Vec { + let plugin = dummy_plugin(dummy_emit); + let converter = timestamp::TimestampConverter::new(vec![], timestamp::TimestampPolicy::BestEffort); + metric_mapper::map_metrics(metrics, &converter, super::emit_generic, &plugin).unwrap(); + take_records(&plugin) +} + +fn run_log_mapper(db: &super::sqlite_reader::PerfettoDb) -> Vec { + let plugin = dummy_plugin(dummy_emit); + let snaps = vec![crate::sqlite_reader::PerfettoClockSnapshot { ts: 0, clock_value: 1_700_000_000_000_000_000 }]; + let converter = timestamp::TimestampConverter::new(snaps, timestamp::TimestampPolicy::BestEffort); + log_mapper::map_logs(db, &converter, super::emit_generic, &plugin).unwrap(); + take_records(&plugin) +} + +fn run_core_pipeline(sqlite_path: &std::path::Path) -> Vec { + let plugin = dummy_plugin(dummy_emit); + let db = super::sqlite_reader::PerfettoDb::open(sqlite_path).unwrap(); + let snaps = db.read_clock_snapshots().unwrap(); + let converter = timestamp::TimestampConverter::new(snaps, timestamp::TimestampPolicy::BestEffort); + let _ = trace_mapper::map_traces(&db, &converter, super::emit_generic, &plugin); + let _ = log_mapper::map_logs(&db, &converter, super::emit_generic, &plugin); + take_records(&plugin) +} + +fn dummy_plugin(cb: super::GenericRecordCallback) -> super::PerfettoPlugin { + let records: Box>> = Box::new(std::cell::RefCell::new(Vec::new())); + let user_ptr = Box::into_raw(records) as *mut std::ffi::c_void; + + super::PerfettoPlugin { + legacy_callback: None, + legacy_user: std::ptr::null_mut(), + generic_callback: Some(cb), + generic_user: user_ptr, + last_error: None, + } +} + +unsafe extern "C" fn dummy_emit(user: *mut std::ffi::c_void, record: *const super::LjIngestRecordV1) { + let records = unsafe { &*(user as *const std::cell::RefCell>) }; + let rec = unsafe { &*record }; + let payload = if rec.payload.is_null() || rec.payload_len == 0 { + Vec::new() + } else { + unsafe { std::slice::from_raw_parts(rec.payload, rec.payload_len) }.to_vec() + }; + records.borrow_mut().push((rec.record_type, rec.timestamp_unix_ns, payload)); +} + +fn take_records(plugin: &super::PerfettoPlugin) -> Vec { + if plugin.generic_user.is_null() { + return Vec::new(); + } + let records_box = unsafe { Box::from_raw(plugin.generic_user as *mut std::cell::RefCell>) }; + let cell = *records_box; + cell.into_inner() +} + +fn temp_sqlite_file() -> std::path::PathBuf { + let path = std::env::temp_dir().join(format!("perfetto-test-pipeline-{}.sqlite", std::process::id())); + let conn = rusqlite::Connection::open(&path).unwrap(); + conn.execute_batch( + " + CREATE TABLE slice (id INTEGER, ts INTEGER, dur INTEGER, name TEXT, parent_id INTEGER, track_id INTEGER, arg_set_id INTEGER, depth INTEGER); + CREATE TABLE thread (utid INTEGER, name TEXT, tid INTEGER, upid INTEGER, is_main_thread INTEGER); + CREATE TABLE process (upid INTEGER, name TEXT, pid INTEGER); + CREATE TABLE __intrinsic_track (id INTEGER, name TEXT, type TEXT, utid INTEGER, upid INTEGER); + CREATE TABLE sched_slice (id INTEGER, ts INTEGER, dur INTEGER, utid INTEGER, ucpu INTEGER, end_state TEXT); + CREATE TABLE clock_snapshot (ts INTEGER, clock_value INTEGER, clock_id INTEGER, clock_name TEXT); + INSERT INTO slice VALUES (1, 5000, 3000, 'test', NULL, 10, NULL, 0), (2, 10000, 500, 'child', 1, 10, NULL, 1); + INSERT INTO thread VALUES (1, 'main', 100, 1, 1); + INSERT INTO process VALUES (1, 'testproc', 1000); + INSERT INTO clock_snapshot VALUES (0, 1700000000000000000, 1, 'REALTIME'), (20000, 1700000000000020000, 1, 'REALTIME'); + ", + ).unwrap(); + path +} diff --git a/plugins/stress-ingest/src/lib.rs b/plugins/stress-ingest/src/lib.rs index d1fbb9b..0506cda 100644 --- a/plugins/stress-ingest/src/lib.rs +++ b/plugins/stress-ingest/src/lib.rs @@ -6,7 +6,7 @@ use std::ffi::{CString, c_char, c_int, c_void}; -// ── C ABI types (must match liblogjet.h) ──────────────────────────────────── +// C ABI types (must match liblogjet.h) #[repr(C)] pub struct LjAttribute { @@ -66,14 +66,14 @@ pub extern "C" fn lj_ingest_descriptor_v1() -> *const LjIngestDescriptorV1 { &STRESS_INGEST_DESCRIPTOR.0 } -// ── Plugin context ────────────────────────────────────────────────────────── +// Plugin context pub struct StressPlugin { callback: Option, user: *mut c_void, } -// ── Exported C ABI ────────────────────────────────────────────────────────── +// Exported C ABI #[unsafe(no_mangle)] pub extern "C" fn lj_ingest_create() -> *mut StressPlugin { @@ -140,7 +140,7 @@ pub unsafe extern "C" fn lj_ingest_free(ctx: *mut StressPlugin) { let _ = unsafe { Box::from_raw(ctx) }; } -// ── Record generation ─────────────────────────────────────────────────────── +// Record generation fn emit_record(ctx: &StressPlugin, cb: RecordCallback, seq: u64) { let now = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_nanos() as u64; diff --git a/plugins/syslog-ingest/src/lib.rs b/plugins/syslog-ingest/src/lib.rs index 2851c1b..88cf885 100644 --- a/plugins/syslog-ingest/src/lib.rs +++ b/plugins/syslog-ingest/src/lib.rs @@ -7,7 +7,7 @@ use std::ffi::{CString, c_char, c_int, c_void}; use std::time::{SystemTime, UNIX_EPOCH}; -// ── C ABI types (must match liblogjet.h exactly) ──────────────────────────── +// C ABI types (must match liblogjet.h exactly) #[repr(C)] pub struct LjAttribute { @@ -65,7 +65,7 @@ pub extern "C" fn lj_ingest_descriptor_v1() -> *const LjIngestDescriptorV1 { &SYSLOG_INGEST_DESCRIPTOR.0 } -// ── Severity constants matching liblogjet.h ───────────────────────────────── +// Severity constants matching liblogjet.h const LJ_SEVERITY_TRACE: i32 = 1; const LJ_SEVERITY_DEBUG: i32 = 5; @@ -75,7 +75,7 @@ const LJ_SEVERITY_ERROR: i32 = 17; const LJ_SEVERITY_FATAL: i32 = 21; const LJ_ATTR_STRING: i32 = 0; -// ── Plugin context ────────────────────────────────────────────────────────── +// Plugin context /// Parsing context accumulates partial lines from the TCP stream. pub struct SyslogPlugin { @@ -84,7 +84,7 @@ pub struct SyslogPlugin { user: *mut c_void, } -// ── Exported C ABI ────────────────────────────────────────────────────────── +// Exported C ABI /// Creates a new syslog parsing context. #[unsafe(no_mangle)] @@ -155,7 +155,7 @@ pub unsafe extern "C" fn lj_ingest_free(ctx: *mut SyslogPlugin) { let _ = unsafe { Box::from_raw(ctx) }; } -// ── Syslog parsing ───────────────────────────────────────────────────────── +// Syslog parsing /// Parsed fields from a syslog line. struct Parsed<'a> { @@ -249,7 +249,7 @@ fn facility_name(facility: u32) -> &'static str { TABLE.get(facility as usize).copied().unwrap_or("unknown") } -// ── Record emission ───────────────────────────────────────────────────────── +// Record emission /// Emits a parsed syslog record through the C callback. fn emit_record(ctx: &SyslogPlugin, line: &[u8]) { diff --git a/scripts/build-perfetto.sh b/scripts/build-perfetto.sh new file mode 100755 index 0000000..bfe2aa9 --- /dev/null +++ b/scripts/build-perfetto.sh @@ -0,0 +1,153 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR=$(CDPATH= cd -- "$(dirname -- "$0")" && pwd) +PERFETTO_SRC="${1:-$SCRIPT_DIR/../../perfetto}" + +if [ ! -d "$PERFETTO_SRC" ]; then + echo "Usage: $0 /path/to/perfetto/source" + echo " Defaults to ../../perfetto if not specified." + exit 1 +fi + +cd "$PERFETTO_SRC" + +# OS / package detection + +detect_pkg_manager() { + if [ -f /etc/os-release ]; then + local id + id=$(awk -F= '/^ID=/{print tolower($2)}' /etc/os-release | tr -d '"') + case "$id" in + ubuntu|debian|pop|linuxmint|elementary|zorin|neon) echo "apt" ;; + fedora|centos|rhel|rocky|almalinux) echo "dnf" ;; + arch|manjaro|endeavouros) echo "pacman" ;; + opensuse*|suse) echo "zypper" ;; + *) echo "unknown" ;; + esac + else + echo "unknown" + fi +} + +need() { + command -v "$1" >/dev/null 2>&1 +} + +ensure_system_deps() { + local pm + pm=$(detect_pkg_manager) + local missing="" + + # python3-venv is needed on Debian/Ubuntu for ensurepip inside venv. + if [ "$pm" = "apt" ]; then + if ! python3 -c 'import ensurepip' 2>/dev/null; then + echo "Installing python3-venv + python3.12-venv..." + sudo apt install -y python3-venv python3.12-venv || sudo apt install -y python3-venv + fi + fi + + # Also install core deps that might be missing. + for cmd in git python3 curl tar; do + if ! need "$cmd"; then + missing="$missing $cmd" + fi + done + + if [ -z "$missing" ]; then + return + fi + + echo "Missing system packages:$missing" + + if [ "$pm" = "unknown" ]; then + echo "Install them manually and re-run this script." + exit 1 + fi + + case "$pm" in + apt) + echo "Running: sudo apt install -y git python3 curl tar" + sudo apt install -y git python3 curl tar + ;; + dnf) + echo "Running: sudo dnf install -y git python3 curl tar" + sudo dnf install -y git python3 curl tar + ;; + pacman) + echo "Running: sudo pacman -S --noconfirm git python3 curl tar" + sudo pacman -S --noconfirm git python3 curl tar + ;; + zypper) + echo "Running: sudo zypper install -y git-core python3 curl tar" + sudo zypper install -y git-core python3 curl tar + ;; + esac +} + +ensure_system_deps + +# Clean stale venv from previous failed runs + +if [ -d ".venv" ] && [ ! -f ".venv/bin/activate" ]; then + echo "" + echo "Removing broken .venv from previous run..." + rm -rf .venv +fi + +# Hermetic toolchain download + +echo "" +echo "Downloading hermetic build dependencies (GN, Ninja, clang, libs)..." +echo "This may take a while on first run." + +if [ -x tools/install-build-deps ]; then + python3 tools/install-build-deps +elif [ -f tools/install-build-deps ]; then + chmod +x tools/install-build-deps + python3 tools/install-build-deps +else + echo "tools/install-build-deps not found. Download it from:" + echo " https://raw.githubusercontent.com/google/perfetto/main/tools/install-build-deps" + exit 1 +fi + +# Build + +OUT_DIR="out/linux_release" + +echo "" +echo "Generating build config..." + +python3 tools/gn gen "$OUT_DIR" --args=' + is_debug = false + is_clang = true + use_custom_libcxx = true + treat_warnings_as_errors = false +' + +echo "" +echo "Building trace_processor_shell..." +python3 tools/ninja -C "$OUT_DIR" trace_processor_shell + +echo "" +echo "Building traced..." +python3 tools/ninja -C "$OUT_DIR" traced + +echo "" +echo "Building traced_probes..." +python3 tools/ninja -C "$OUT_DIR" traced_probes + +echo "" +echo "Building tracebox..." +python3 tools/ninja -C "$OUT_DIR" tracebox + +echo "" +echo "Perfetto build complete." +echo "Binaries:" +echo " trace_processor_shell → $OUT_DIR/trace_processor_shell" +echo " traced → $OUT_DIR/traced" +echo " traced_probes → $OUT_DIR/traced_probes" +echo " tracebox → $OUT_DIR/tracebox" +echo "" +echo "Add to PATH or set LJD_PERFETTO_TRACE_PROCESSOR=$PWD/$OUT_DIR/trace_processor_shell" diff --git a/scripts/setup-rust.sh b/scripts/setup-rust.sh index 9d1b4cb..401a6f4 100644 --- a/scripts/setup-rust.sh +++ b/scripts/setup-rust.sh @@ -14,7 +14,7 @@ have_rustup() { command -v rustup >/dev/null 2>&1; } have_cc() { command -v cc >/dev/null 2>&1; } have_nextest() { command -v cargo-nextest >/dev/null 2>&1; } -# ── CC / system package helpers ───────────────────────────────────────────── +# CC / system package helpers detect_pkg_manager() { case "$(uname -s)" in @@ -106,7 +106,7 @@ ensure_cc() { echo "C compiler installed: $(cc --version 2>&1 | head -1)" } -# ── Main ──────────────────────────────────────────────────────────────────── +# Main # Already installed: silent unless something needs fixing. if have_rustc && have_cargo && have_rustup; then