Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion demo/perfetto/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ Add to PATH or set `LJD_PERFETTO_TRACE_PROCESSOR` to point at
## Demos

- [linux-data-record](linux-data-record/) — capture and inspect a system trace
- [perfetto-to-logjet](perfetto-to-logjet/) — full end-to-end: record ftrace, import via plugin, view in ljx
- [perfetto-to-logjet](perfetto-to-logjet/) — full end-to-end: record ftrace, import via plugin (SQLite export), view in ljx
- [perfetto-to-logjet-rpc](perfetto-to-logjet-rpc/) — same but using RPC stdio mode (no temp files)
48 changes: 48 additions & 0 deletions demo/perfetto/perfetto-to-logjet-rpc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Perfetto-to-logjet (RPC)

Same as `perfetto-to-logjet/` but uses trace processor's `server stdio` RPC
mode instead of SQLite export. No temp files — queries go directly over stdin/stdout.

## Build First

```bash
make dev
./demo/perfetto/build-perfetto.sh
```

## Run

```bash
cd demo/perfetto/perfetto-to-logjet-rpc
./run-demo.sh
```

## What Happens

1. `tracebox` records 5s of scheduler events (CPU switches, process lifecycle,
CPU frequency, interrupts) via ftrace.
2. `ljd` loads the perfetto-ingest plugin with `LJD_PERFETTO_ACQUISITION=rpc`
set via `ingest.plugin-env` config.
3. The plugin spawns a fresh `trace_processor server stdio` for each query,
sends SQL, receives protobuf responses, maps rows to OTel log records,
and streams them into a `.logjet` spool — no temp SQLite files.
4. `ljx view` opens the spool.

## What You Should See

Same output as the SQLite demo — thousands of log lines across multiple types
(sched slices, thread states, ftrace events, spurious wakeups).

## SQLite vs RPC

| | SQLite (default) | RPC |
|---|---|---|
| Config | `ingest.plugin-path: ...` | + `ingest.plugin-env: LJD_PERFETTO_ACQUISITION=rpc` |
| Temp files | Yes (SQLite export) | No |
| Speed | Faster (single trace load) | Slower (one load per query type) |
| Maturity | Stable | New |

## Troubleshooting

- **0 records**: Run with `sudo ./run-demo.sh` for ftrace access.
- **ljx view shows fewer records**: Delete stale index cache: `rm -rf ~/.cache/ljx`.
157 changes: 157 additions & 0 deletions demo/perfetto/perfetto-to-logjet-rpc/run-demo.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
#!/bin/sh
set -eu

SCRIPT_DIR=$(CDPATH= cd -- "$(dirname -- "$0")" && pwd)
ROOT_DIR="$SCRIPT_DIR/../../.."
BUILD_DIR="$ROOT_DIR/target/debug"
PERFETTO_OUT="${PERFETTO_OUT:-$ROOT_DIR/perfetto/out/linux_release}"
SPOOL_DIR="$SCRIPT_DIR/spool"
TRACE_FILE="$SCRIPT_DIR/trace.pftrace"

LJD="$BUILD_DIR/ljd"
LJX="$BUILD_DIR/ljx"
PLUGIN="$BUILD_DIR/liblj_perfetto_ingest.so"
TRACED="$PERFETTO_OUT/traced"
TRACED_PROBES="$PERFETTO_OUT/traced_probes"
TRACEBOX="$PERFETTO_OUT/tracebox"
TP="$PERFETTO_OUT/trace_processor_shell"

for bin in "$LJD" "$LJX" "$PLUGIN"; do
if [ ! -e "$bin" ]; then
echo "missing $bin"
echo "build first with: make dev"
exit 1
fi
done

for bin in "$TRACED" "$TRACED_PROBES" "$TRACEBOX" "$TP"; do
if [ ! -x "$bin" ]; then
echo "missing $bin"
echo "build perfetto first with: ./demo/perfetto/build-perfetto.sh"
exit 1
fi
done

# ── Record a trace ────────────────────────────────────────────────────────────

echo "Starting traced..."
"$TRACED" &>/dev/null &
TRACED_PID=$!

echo "Starting traced_probes..."
"$TRACED_PROBES" &>/dev/null &
PROBES_PID=$!

cleanup_trace() {
kill "$TRACED_PID" 2>/dev/null || true
kill "$PROBES_PID" 2>/dev/null || true
wait "$TRACED_PID" 2>/dev/null || true
wait "$PROBES_PID" 2>/dev/null || true
}

trap cleanup_trace EXIT INT TERM

sleep 1

echo "Recording 5s of ftrace to $TRACE_FILE..."
CONFIG_FILE="$SCRIPT_DIR/trace-config.txt"
cat > "$CONFIG_FILE" <<'ENDCONFIG'
buffers: {
size_kb: 8192
fill_policy: RING_BUFFER
}
data_sources: {
config {
name: "linux.ftrace"
ftrace_config {
ftrace_events: "sched/sched_switch"
ftrace_events: "sched/sched_waking"
ftrace_events: "sched/sched_process_exec"
ftrace_events: "sched/sched_process_fork"
ftrace_events: "sched/sched_process_exit"
ftrace_events: "power/cpu_frequency"
ftrace_events: "power/cpu_idle"
ftrace_events: "irq/irq_handler_entry"
ftrace_events: "irq/irq_handler_exit"
}
}
}
duration_ms: 5000
ENDCONFIG

if [ "$(id -u)" -eq 0 ]; then
"$TRACEBOX" --txt -c "$CONFIG_FILE" -o "$TRACE_FILE"
else
sudo "$TRACEBOX" --txt -c "$CONFIG_FILE" -o "$TRACE_FILE"
sudo chown "$(id -u):$(id -g)" "$TRACE_FILE"
fi

rm -f "$CONFIG_FILE"

cleanup_trace
trap - EXIT INT TERM

if [ ! -f "$TRACE_FILE" ]; then
echo "Trace file not created."
exit 1
fi

SIZE=$(du -h "$TRACE_FILE" | cut -f1)
echo "Trace recorded: $TRACE_FILE ($SIZE)"
echo ""

# ── Import via perfetto-ingest plugin ─────────────────────────────────────────

echo "Importing into .logjet..."
rm -rf "$SPOOL_DIR"
mkdir -p "$SPOOL_DIR"

# Use a temp config so we don't interfere with the user's config.
CONFIG_FILE="$SCRIPT_DIR/ljd-perfetto.conf"
cat > "$CONFIG_FILE" <<EOF
output: file
file.path: "$SPOOL_DIR"
file.size: 10mb
file.name: perfetto.logjet
ingest.protocol: plugin
ingest.plugin-path: "$PLUGIN"
ingest.plugin-env:
- LJD_PERFETTO_ACQUISITION=rpc
EOF

LJD_PERFETTO_TRACE_FILE="$TRACE_FILE" \
LJD_PERFETTO_TRACE_PROCESSOR="$TP" \
"$LJD" serve --config "$CONFIG_FILE" &
LJD_PID=$!

cleanup_ljd() {
kill "$LJD_PID" 2>/dev/null || true
wait "$LJD_PID" 2>/dev/null || true
rm -f "$CONFIG_FILE"
}

trap cleanup_ljd EXIT INT TERM

# Poll until records appear (plugin finishes), up to 60s.
echo "Waiting for import..."
elapsed=0
while [ "$elapsed" -lt 60 ]; do
if [ -f "$SPOOL_DIR/perfetto.logjet" ]; then
COUNT=$("$LJX" count "$SPOOL_DIR/perfetto.logjet" 2>/dev/null || echo "0")
if [ "$COUNT" -gt 0 ] 2>/dev/null; then
echo "Imported $COUNT records into $SPOOL_DIR/perfetto.logjet"
break
fi
fi
sleep 1
elapsed=$((elapsed + 1))
done

kill "$LJD_PID" 2>/dev/null || true
wait "$LJD_PID" 2>/dev/null || true
trap - EXIT INT TERM

# ── View the result ───────────────────────────────────────────────────────────

echo "Opening ljx view..."
"$LJX" view "$SPOOL_DIR/perfetto.logjet"
50 changes: 36 additions & 14 deletions doc/perfetto-ingest.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@ ecosystem as OTel logs, traces, and metrics.

## Architecture

Two acquisition modes — selectable via `ingest.plugin-env` config:

```
.pftrace ──→ trace_processor (spawned as subprocess)
├── export sqlite ──→ sqlite_reader ──→ trace_mapper ──→ OTel spans
└── --run-metrics ──→ metrics_reader ──→ metric_mapper ──→ OTel metrics
log_mapper ──→ OTel logs
buffer & sort by ts
ljd spool (.logjet)
├── SQLite (default): export sqlite → sqlite_reader → mappers
└── RPC: server stdio → rpc_reader → mappers
buffer & sort by ts
ljd spool (.logjet)
```

The plugin is an **active source** (`mode: 1`). ljd calls `lj_ingest_fetch()` once,
Expand All @@ -34,11 +35,9 @@ to guarantee monotonic timestamps in the logjet block format.

## Usage

```bash
# Build the plugin and ljd:
make dev
### SQLite path (default)

# Create a config file (ljd uses YAML config, not CLI flags):
```bash
cat > /tmp/perfetto.conf <<EOF
output: file
file.path: ./spool
Expand All @@ -48,12 +47,34 @@ ingest.protocol: plugin
ingest.plugin-path: ./target/debug/liblj_perfetto_ingest.so
EOF

# Run the import:
LJD_PERFETTO_TRACE_FILE=/path/to/trace.pftrace \
LJD_PERFETTO_TRACE_PROCESSOR=/path/to/trace_processor_shell \
ljd serve --config /tmp/perfetto.conf
```

### RPC path (no temp SQLite files)

```bash
cat > /tmp/perfetto-rpc.conf <<EOF
output: file
file.path: ./spool
file.size: 10mb
file.name: perfetto.logjet
ingest.protocol: plugin
ingest.plugin-path: ./target/debug/liblj_perfetto_ingest.so
ingest.plugin-env:
- LJD_PERFETTO_ACQUISITION=rpc
EOF

LJD_PERFETTO_TRACE_FILE=/path/to/trace.pftrace \
LJD_PERFETTO_TRACE_PROCESSOR=/path/to/trace_processor_shell \
ljd serve --config /tmp/perfetto-rpc.conf
```

`ingest.plugin-env` is a generic ljd config key that passes `KEY=VALUE` pairs
to plugins as environment variables before loading. This avoids plugin-specific
config keys.

See `demo/perfetto/perfetto-to-logjet/run-demo.sh` for a complete end-to-end
example that records, imports, and opens the result in `ljx view`.

Expand All @@ -64,6 +85,7 @@ example that records, imports, and opens the result in `ljx view`.
| `LJD_PERFETTO_TRACE_FILE` | **Yes** | — | Path to the `.pftrace` input file. |
| `LJD_PERFETTO_TRACE_PROCESSOR` | No | PATH search | Path to `trace_processor_shell` binary. |
| `LJD_PERFETTO_TIMESTAMP_POLICY` | No | `best-effort` | `best-effort` or `require-realtime`. |
| `LJD_PERFETTO_ACQUISITION` | No | `sqlite` | `rpc` for stdio RPC mode (no temp SQLite files). |
| `LJD_PERFETTO_METRICS` | No | (none) | Comma-separated metric names to run, e.g. `trace_stats`. |

## Covered Perfetto Types
Expand Down
10 changes: 2 additions & 8 deletions ljx/src/commands/discover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,21 +95,15 @@ fn paged_entries(dataset: &Dataset, offset: usize, limit: Option<usize>) -> Resu
}

fn scan_entries_parallel(
entries: Vec<DatasetEntry>,
predicate: &RecordPredicate,
service_filter: Option<&HashSet<String>>,
severity_filter: Option<&HashSet<String>>,
entries: Vec<DatasetEntry>, predicate: &RecordPredicate, service_filter: Option<&HashSet<String>>, severity_filter: Option<&HashSet<String>>,
workers: usize,
) -> Result<Vec<FileScan>> {
if entries.is_empty() {
return Ok(Vec::new());
}
let worker_count = workers.max(1).min(entries.len());
if worker_count == 1 {
return entries
.iter()
.map(|entry| scan_entry(entry, predicate, service_filter, severity_filter))
.collect();
return entries.iter().map(|entry| scan_entry(entry, predicate, service_filter, severity_filter)).collect();
}

let predicate = predicate.clone();
Expand Down
10 changes: 7 additions & 3 deletions ljx/src/commands/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,10 @@ fn insert_otlp_log_fields_with_preview(
) {
target.insert(
"body".to_string(),
JsonValue::String(truncate_preview(&log_record.body.as_ref().map(|v| format_any_value(Some(v))).filter(|s| !s.is_empty()).unwrap_or_default(), preview_bytes)),
JsonValue::String(truncate_preview(
&log_record.body.as_ref().map(|v| format_any_value(Some(v))).filter(|s| !s.is_empty()).unwrap_or_default(),
preview_bytes,
)),
);
target.insert("timestamp".to_string(), JsonValue::String(format_timestamp(log_record.time_unix_nano.max(fallback_ts_unix_ns))));
if log_record.observed_time_unix_nano > 0 {
Expand Down Expand Up @@ -229,7 +232,9 @@ fn any_value_to_json(value: &AnyValue, preview_bytes: Option<usize>) -> Option<J
Some(Value::IntValue(number)) => Some(JsonValue::Number((*number).into())),
Some(Value::DoubleValue(number)) => serde_json::Number::from_f64(*number).map(JsonValue::Number),
Some(Value::BytesValue(bytes)) => Some(JsonValue::String(truncate_preview(&format!("<{} bytes>", bytes.len()), preview_bytes))),
Some(Value::ArrayValue(array)) => Some(JsonValue::Array(array.values.iter().filter_map(|value| any_value_to_json(value, preview_bytes)).collect())),
Some(Value::ArrayValue(array)) => {
Some(JsonValue::Array(array.values.iter().filter_map(|value| any_value_to_json(value, preview_bytes)).collect()))
}
Some(Value::KvlistValue(map)) => Some(JsonValue::Object(
map.values
.iter()
Expand Down Expand Up @@ -282,4 +287,3 @@ fn format_timestamp(ts_unix_ns: u64) -> String {
#[cfg(test)]
#[path = "../../tests/unit/commands/top_level_query_ut.rs"]
mod top_level_query_ut;

Loading
Loading