From c4e8b6cff70994e425d10d1e7f1f75472cbf408e Mon Sep 17 00:00:00 2001 From: Bo Maryniuk Date: Thu, 7 May 2026 12:26:37 +0000 Subject: [PATCH 01/11] Add core scheduling types, pftrace universal --- plugins/perfetto-ingest/src/log_mapper.rs | 133 ++++++++++++++++++- plugins/perfetto-ingest/src/sqlite_reader.rs | 112 ++++++++++++++++ 2 files changed, 244 insertions(+), 1 deletion(-) diff --git a/plugins/perfetto-ingest/src/log_mapper.rs b/plugins/perfetto-ingest/src/log_mapper.rs index ea78d6c..5ce8cd9 100644 --- a/plugins/perfetto-ingest/src/log_mapper.rs +++ b/plugins/perfetto-ingest/src/log_mapper.rs @@ -7,7 +7,7 @@ use opentelemetry_proto::tonic::logs::v1::{LogRecord, ResourceLogs, ScopeLogs}; use opentelemetry_proto::tonic::resource::v1::Resource; use prost::Message; -use crate::sqlite_reader::{PerfettoDb, PerfettoSchedSlice, PerfettoSlice}; +use crate::sqlite_reader::{PerfettoDb, PerfettoFtraceEvent, PerfettoSchedSlice, PerfettoSlice, PerfettoSpuriousWakeup, PerfettoThreadState}; use crate::timestamp::TimestampConverter; const SEVERITY_INFO: i32 = 9; @@ -53,6 +53,48 @@ pub fn map_logs( } flush_log_batch(&mut batch, &mut batch_min_ts, emit, plugin); + // Emit thread_state entries as log records. + let thread_states = db.read_thread_states()?; + for ts_state in &thread_states { + let ts = converter.to_realtime(ts_state.ts).ok().flatten().unwrap_or(0); + if batch.is_empty() || ts < batch_min_ts { + batch_min_ts = ts; + } + batch.push(thread_state_to_log(ts_state, converter)); + if batch.len() >= SLICES_PER_LOG_BATCH { + flush_log_batch(&mut batch, &mut batch_min_ts, emit, plugin); + } + } + flush_log_batch(&mut batch, &mut batch_min_ts, emit, plugin); + + // Emit ftrace_event entries as log records. + let ftrace_events = db.read_ftrace_events()?; + for ev in &ftrace_events { + let ts = converter.to_realtime(ev.ts).ok().flatten().unwrap_or(0); + if batch.is_empty() || ts < batch_min_ts { + batch_min_ts = ts; + } + batch.push(ftrace_event_to_log(ev, converter)); + if batch.len() >= SLICES_PER_LOG_BATCH { + flush_log_batch(&mut batch, &mut batch_min_ts, emit, plugin); + } + } + flush_log_batch(&mut batch, &mut batch_min_ts, emit, plugin); + + // Emit spurious_wakeup entries as log records. + let wakeups = db.read_spurious_wakeups()?; + for w in &wakeups { + let ts = converter.to_realtime(w.ts).ok().flatten().unwrap_or(0); + if batch.is_empty() || ts < batch_min_ts { + batch_min_ts = ts; + } + batch.push(spurious_wakeup_to_log(w, converter)); + if batch.len() >= SLICES_PER_LOG_BATCH { + flush_log_batch(&mut batch, &mut batch_min_ts, emit, plugin); + } + } + flush_log_batch(&mut batch, &mut batch_min_ts, emit, plugin); + // Emit a summary log. let threads = db.read_threads()?; let processes = db.read_processes()?; @@ -139,6 +181,95 @@ fn sched_slice_to_log(s: &PerfettoSchedSlice, converter: &TimestampConverter) -> } } +fn thread_state_to_log(ts: &PerfettoThreadState, converter: &TimestampConverter) -> LogRecord { + let t = converter.to_realtime(ts.ts).ok().flatten().unwrap_or(0); + let state = ts.state.as_deref().unwrap_or("?"); + let dur_us = ts.dur as f64 / 1000.0; + let mut body = format!("state={state} dur={dur_us:.1}us utid={}", ts.utid); + if let Some(cpu) = ts.cpu { + body.push_str(&format!(" cpu={cpu}")); + } + if ts.io_wait == Some(true) { + body.push_str(" io_wait"); + } + if let Some(ref blocked) = ts.blocked_function { + body.push_str(&format!(" blocked={blocked}")); + } + if let Some(waker) = ts.waker_utid { + body.push_str(&format!(" waker={waker}")); + } + + let mut attrs = vec![ + KeyValue { key: "perfetto.ts.id".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(ts.id)) }) }, + KeyValue { key: "perfetto.ts.state".to_string(), value: Some(AnyValue { value: Some(Value::StringValue(state.to_string())) }) }, + KeyValue { key: "perfetto.ts.dur_ns".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(ts.dur)) }) }, + KeyValue { key: "perfetto.ts.utid".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(ts.utid)) }) }, + ]; + if let Some(cpu) = ts.cpu { + attrs.push(KeyValue { key: "perfetto.ts.cpu".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(cpu)) }) }); + } + if let Some(io) = ts.io_wait { + attrs.push(KeyValue { key: "perfetto.ts.io_wait".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(i64::from(io))) }) }); + } + if let Some(ref blocked) = ts.blocked_function { + attrs.push(KeyValue { key: "perfetto.ts.blocked_function".to_string(), value: Some(AnyValue { value: Some(Value::StringValue(blocked.clone())) }) }); + } + + LogRecord { + time_unix_nano: t, observed_time_unix_nano: t, + severity_number: SEVERITY_INFO, severity_text: "INFO".to_string(), + body: Some(AnyValue { value: Some(Value::StringValue(body)) }), + attributes: attrs, + dropped_attributes_count: 0, flags: 0, + trace_id: Vec::new(), span_id: Vec::new(), event_name: String::new(), + } +} + +fn ftrace_event_to_log(ev: &PerfettoFtraceEvent, converter: &TimestampConverter) -> LogRecord { + let t = converter.to_realtime(ev.ts).ok().flatten().unwrap_or(0); + let name = ev.name.as_deref().unwrap_or("?"); + let cpu = ev.cpu.unwrap_or(-1); + let body = format!("{name} cpu={cpu}"); + let mut attrs = vec![ + KeyValue { key: "perfetto.ftrace.id".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(ev.id)) }) }, + KeyValue { key: "perfetto.ftrace.name".to_string(), value: Some(AnyValue { value: Some(Value::StringValue(name.to_string())) }) }, + KeyValue { key: "perfetto.ftrace.cpu".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(cpu)) }) }, + ]; + if let Some(utid) = ev.utid { + attrs.push(KeyValue { key: "perfetto.ftrace.utid".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(utid)) }) }); + } + LogRecord { + time_unix_nano: t, observed_time_unix_nano: t, + severity_number: SEVERITY_INFO, severity_text: "INFO".to_string(), + body: Some(AnyValue { value: Some(Value::StringValue(body)) }), + attributes: attrs, + dropped_attributes_count: 0, flags: 0, + trace_id: Vec::new(), span_id: Vec::new(), event_name: String::new(), + } +} + +fn spurious_wakeup_to_log(w: &PerfettoSpuriousWakeup, converter: &TimestampConverter) -> LogRecord { + let t = converter.to_realtime(w.ts).ok().flatten().unwrap_or(0); + let body = format!("spurious_wakeup utid={}", w.utid.unwrap_or(-1)); + let mut attrs = vec![ + KeyValue { key: "perfetto.sw.id".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(w.id)) }) }, + ]; + if let Some(utid) = w.utid { + attrs.push(KeyValue { key: "perfetto.sw.utid".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(utid)) }) }); + } + if let Some(waker) = w.waker_utid { + attrs.push(KeyValue { key: "perfetto.sw.waker_utid".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(waker)) }) }); + } + LogRecord { + time_unix_nano: t, observed_time_unix_nano: t, + severity_number: SEVERITY_INFO, severity_text: "INFO".to_string(), + body: Some(AnyValue { value: Some(Value::StringValue(body)) }), + attributes: attrs, + dropped_attributes_count: 0, flags: 0, + trace_id: Vec::new(), span_id: Vec::new(), event_name: String::new(), + } +} + fn flush_log_batch( batch: &mut Vec, batch_min_ts: &mut u64, diff --git a/plugins/perfetto-ingest/src/sqlite_reader.rs b/plugins/perfetto-ingest/src/sqlite_reader.rs index 5e478ca..25315fd 100644 --- a/plugins/perfetto-ingest/src/sqlite_reader.rs +++ b/plugins/perfetto-ingest/src/sqlite_reader.rs @@ -79,6 +79,36 @@ pub struct PerfettoSchedSlice { pub end_state: Option, } +#[derive(Debug, Clone)] +pub struct PerfettoThreadState { + pub id: i64, + pub ts: i64, + pub dur: i64, + pub utid: i64, + pub state: Option, + pub io_wait: Option, + pub blocked_function: Option, + pub waker_utid: Option, + pub cpu: Option, +} + +#[derive(Debug, Clone)] +pub struct PerfettoFtraceEvent { + pub id: i64, + pub ts: i64, + pub name: Option, + pub cpu: Option, + pub utid: Option, +} + +#[derive(Debug, Clone)] +pub struct PerfettoSpuriousWakeup { + pub id: i64, + pub ts: i64, + pub utid: Option, + pub waker_utid: Option, +} + pub struct PerfettoDb { pub(crate) conn: rusqlite::Connection, } @@ -342,4 +372,86 @@ impl PerfettoDb { } Ok(out) } + + /// Reads thread state transitions ordered by ts. + pub fn read_thread_states(&self) -> Result, String> { + let mut stmt = self + .conn + .prepare( + "SELECT id, ts, dur, utid, state, io_wait, blocked_function, waker_utid, cpu + FROM thread_state + ORDER BY ts", + ) + .map_err(|err| format!("failed to prepare thread_state query: {err}"))?; + + let rows = stmt + .query_map([], |row| { + Ok(PerfettoThreadState { + id: row.get(0)?, + ts: row.get(1)?, + dur: row.get(2)?, + utid: row.get(3)?, + state: row.get(4)?, + io_wait: row.get::<_, Option>(5)?.map(|v| v != 0), + blocked_function: row.get(6)?, + waker_utid: row.get(7)?, + cpu: row.get(8)?, + }) + }) + .map_err(|err| format!("failed to query thread_state: {err}"))?; + + let mut out = Vec::new(); + for row in rows { + out.push(row.map_err(|err| format!("failed to read thread_state row: {err}"))?); + } + Ok(out) + } + + /// Reads ftrace events ordered by ts. + pub fn read_ftrace_events(&self) -> Result, String> { + let mut stmt = self + .conn + .prepare( + "SELECT id, ts, name, cpu, utid + FROM ftrace_event + ORDER BY ts", + ) + .map_err(|err| format!("failed to prepare ftrace_event query: {err}"))?; + + let rows = stmt + .query_map([], |row| { + Ok(PerfettoFtraceEvent { id: row.get(0)?, ts: row.get(1)?, name: row.get(2)?, cpu: row.get(3)?, utid: row.get(4)? }) + }) + .map_err(|err| format!("failed to query ftrace_event: {err}"))?; + + let mut out = Vec::new(); + for row in rows { + out.push(row.map_err(|err| format!("failed to read ftrace_event row: {err}"))?); + } + Ok(out) + } + + /// Reads spurious sched wakeup events ordered by ts. + pub fn read_spurious_wakeups(&self) -> Result, String> { + let mut stmt = self + .conn + .prepare( + "SELECT id, ts, utid, waker_utid + FROM spurious_sched_wakeup + ORDER BY ts", + ) + .map_err(|err| format!("failed to prepare spurious_sched_wakeup query: {err}"))?; + + let rows = stmt + .query_map([], |row| { + Ok(PerfettoSpuriousWakeup { id: row.get(0)?, ts: row.get(1)?, utid: row.get(2)?, waker_utid: row.get(3)? }) + }) + .map_err(|err| format!("failed to query spurious_sched_wakeup: {err}"))?; + + let mut out = Vec::new(); + for row in rows { + out.push(row.map_err(|err| format!("failed to read spurious_sched_wakeup row: {err}"))?); + } + Ok(out) + } } From 36a7778a82cb71fe7f57e3ff0362c9061f3851f1 Mon Sep 17 00:00:00 2001 From: Bo Maryniuk Date: Thu, 7 May 2026 12:26:47 +0000 Subject: [PATCH 02/11] Add unit tests --- plugins/perfetto-ingest/tests/unit/tests.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/plugins/perfetto-ingest/tests/unit/tests.rs b/plugins/perfetto-ingest/tests/unit/tests.rs index 0bafa80..4a26d0a 100644 --- a/plugins/perfetto-ingest/tests/unit/tests.rs +++ b/plugins/perfetto-ingest/tests/unit/tests.rs @@ -177,6 +177,9 @@ fn test_db() -> super::sqlite_reader::PerfettoDb { CREATE TABLE thread (utid INTEGER, name TEXT, tid INTEGER, upid INTEGER, is_main_thread INTEGER); CREATE TABLE __intrinsic_track (id INTEGER, name TEXT, type TEXT, utid INTEGER, upid INTEGER); CREATE TABLE sched_slice (id INTEGER, ts INTEGER, dur INTEGER, utid INTEGER, ucpu INTEGER, end_state TEXT); + CREATE TABLE thread_state (id INTEGER, ts INTEGER, dur INTEGER, utid INTEGER, state TEXT, io_wait INTEGER, blocked_function TEXT, waker_utid INTEGER, cpu INTEGER); + CREATE TABLE ftrace_event (id INTEGER, ts INTEGER, name TEXT, cpu INTEGER, utid INTEGER); + CREATE TABLE spurious_sched_wakeup (id INTEGER, ts INTEGER, utid INTEGER, waker_utid INTEGER); CREATE TABLE args (arg_set_id INTEGER, flat_key TEXT, string_value TEXT, int_value INTEGER, real_value REAL); CREATE TABLE clock_snapshot (ts INTEGER, clock_value INTEGER, clock_id INTEGER, clock_name TEXT); @@ -472,6 +475,9 @@ fn temp_sqlite_file() -> std::path::PathBuf { CREATE TABLE process (upid INTEGER, name TEXT, pid INTEGER); CREATE TABLE __intrinsic_track (id INTEGER, name TEXT, type TEXT, utid INTEGER, upid INTEGER); CREATE TABLE sched_slice (id INTEGER, ts INTEGER, dur INTEGER, utid INTEGER, ucpu INTEGER, end_state TEXT); + CREATE TABLE thread_state (id INTEGER, ts INTEGER, dur INTEGER, utid INTEGER, state TEXT, io_wait INTEGER, blocked_function TEXT, waker_utid INTEGER, cpu INTEGER); + CREATE TABLE ftrace_event (id INTEGER, ts INTEGER, name TEXT, cpu INTEGER, utid INTEGER); + CREATE TABLE spurious_sched_wakeup (id INTEGER, ts INTEGER, utid INTEGER, waker_utid INTEGER); CREATE TABLE clock_snapshot (ts INTEGER, clock_value INTEGER, clock_id INTEGER, clock_name TEXT); INSERT INTO slice VALUES (1, 5000, 3000, 'test', NULL, 10, NULL, 0), (2, 10000, 500, 'child', 1, 10, NULL, 1); INSERT INTO thread VALUES (1, 'main', 100, 1, 1); From b5068e2cb094bd2a55f22cc1bf957c4ae4eba863 Mon Sep 17 00:00:00 2001 From: Bo Maryniuk Date: Thu, 7 May 2026 13:00:07 +0000 Subject: [PATCH 03/11] Cover perfetto types --- plugins/perfetto-ingest/src/log_mapper.rs | 29 ++- plugins/perfetto-ingest/src/sqlite_reader.rs | 236 +++++++++++++++++++ plugins/perfetto-ingest/tests/unit/tests.rs | 133 +++++++++++ 3 files changed, 397 insertions(+), 1 deletion(-) diff --git a/plugins/perfetto-ingest/src/log_mapper.rs b/plugins/perfetto-ingest/src/log_mapper.rs index 5ce8cd9..45f582d 100644 --- a/plugins/perfetto-ingest/src/log_mapper.rs +++ b/plugins/perfetto-ingest/src/log_mapper.rs @@ -7,7 +7,7 @@ use opentelemetry_proto::tonic::logs::v1::{LogRecord, ResourceLogs, ScopeLogs}; use opentelemetry_proto::tonic::resource::v1::Resource; use prost::Message; -use crate::sqlite_reader::{PerfettoDb, PerfettoFtraceEvent, PerfettoSchedSlice, PerfettoSlice, PerfettoSpuriousWakeup, PerfettoThreadState}; +use crate::sqlite_reader::{PerfettoDb, PerfettoFtraceEvent, PerfettoInstant, PerfettoSchedSlice, PerfettoSlice, PerfettoSpuriousWakeup, PerfettoThreadState}; use crate::timestamp::TimestampConverter; const SEVERITY_INFO: i32 = 9; @@ -95,6 +95,16 @@ pub fn map_logs( } flush_log_batch(&mut batch, &mut batch_min_ts, emit, plugin); + // Emit instant events as log records. + let instants = db.read_instants()?; + for inst in &instants { + let ts = converter.to_realtime(inst.ts).ok().flatten().unwrap_or(0); + if batch.is_empty() || ts < batch_min_ts { batch_min_ts = ts; } + batch.push(instant_to_log(inst, converter)); + if batch.len() >= SLICES_PER_LOG_BATCH { flush_log_batch(&mut batch, &mut batch_min_ts, emit, plugin); } + } + flush_log_batch(&mut batch, &mut batch_min_ts, emit, plugin); + // Emit a summary log. let threads = db.read_threads()?; let processes = db.read_processes()?; @@ -270,6 +280,23 @@ fn spurious_wakeup_to_log(w: &PerfettoSpuriousWakeup, converter: &TimestampConve } } +fn instant_to_log(inst: &PerfettoInstant, converter: &TimestampConverter) -> LogRecord { + let t = converter.to_realtime(inst.ts).ok().flatten().unwrap_or(0); + let name = inst.name.as_deref().unwrap_or("?"); + let body = name.to_string(); + LogRecord { + time_unix_nano: t, observed_time_unix_nano: t, + severity_number: SEVERITY_INFO, severity_text: "INFO".to_string(), + body: Some(AnyValue { value: Some(Value::StringValue(body)) }), + attributes: vec![ + KeyValue { key: "perfetto.instant.name".to_string(), value: Some(AnyValue { value: Some(Value::StringValue(name.to_string())) }) }, + KeyValue { key: "perfetto.instant.track_id".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(inst.track_id)) }) }, + ], + dropped_attributes_count: 0, flags: 0, + trace_id: Vec::new(), span_id: Vec::new(), event_name: String::new(), + } +} + fn flush_log_batch( batch: &mut Vec, batch_min_ts: &mut u64, diff --git a/plugins/perfetto-ingest/src/sqlite_reader.rs b/plugins/perfetto-ingest/src/sqlite_reader.rs index 25315fd..10ced8f 100644 --- a/plugins/perfetto-ingest/src/sqlite_reader.rs +++ b/plugins/perfetto-ingest/src/sqlite_reader.rs @@ -109,6 +109,108 @@ pub struct PerfettoSpuriousWakeup { pub waker_utid: Option, } +#[derive(Debug, Clone)] +pub struct PerfettoInstant { + pub ts: i64, + pub track_id: i64, + pub name: Option, +} + +#[derive(Debug, Clone)] +pub struct PerfettoCounter { + pub id: i64, + pub ts: i64, + pub track_id: i64, + pub value: f64, +} + +#[derive(Debug, Clone)] +pub struct PerfettoCpu { + pub id: i64, + pub cpu: Option, + pub cluster_id: i64, + pub processor: Option, +} + +#[derive(Debug, Clone)] +pub struct PerfettoMachine { + pub id: i64, + pub arch: Option, + pub num_cpus: Option, + pub sysname: Option, + pub release: Option, +} + +#[derive(Debug, Clone)] +pub struct PerfettoMetadata { + pub name: Option, + pub int_value: Option, + pub str_value: Option, +} + +// ─ P4-P9 models (0 rows in ftrace, populated by other trace kinds) ────────── + +#[derive(Debug, Clone)] +pub struct PerfettoMemorySnapshot { + pub id: i64, + pub timestamp: i64, + pub track_id: i64, + pub detail_level: Option, +} + +#[derive(Debug, Clone)] +pub struct PerfettoCpuProfileSample { + pub id: i64, + pub ts: i64, + pub callsite_id: i64, + pub utid: i64, +} + +#[derive(Debug, Clone)] +pub struct PerfettoStackFrame { + pub id: i64, + pub name: Option, + pub mapping_id: i64, +} + +#[derive(Debug, Clone)] +pub struct PerfettoHeapAllocation { + pub id: i64, + pub ts: i64, + pub upid: i64, + pub size: i64, + pub count: i64, +} + +#[derive(Debug, Clone)] +pub struct PerfettoProtolog { + pub id: i64, + pub ts: i64, + pub level: Option, + pub tag: Option, + pub message: Option, +} + +#[derive(Debug, Clone)] +pub struct PerfettoAndroidLog { + pub id: i64, + pub ts: i64, + pub utid: i64, + pub prio: i64, + pub tag: Option, + pub msg: Option, +} + +#[derive(Debug, Clone)] +pub struct PerfettoFileDescriptor { + pub id: i64, + pub ufd: i64, + pub fd: i64, + pub ts: i64, + pub upid: i64, + pub path: Option, +} + pub struct PerfettoDb { pub(crate) conn: rusqlite::Connection, } @@ -454,4 +556,138 @@ impl PerfettoDb { } Ok(out) } + + /// Reads instant events ordered by ts. + pub fn read_instants(&self) -> Result, String> { + let mut stmt = self.conn.prepare("SELECT ts, track_id, name FROM instant ORDER BY ts") + .map_err(|err| format!("failed to prepare instant query: {err}"))?; + let rows = stmt.query_map([], |row| Ok(PerfettoInstant { ts: row.get(0)?, track_id: row.get(1)?, name: row.get(2)? })) + .map_err(|err| format!("failed to query instant: {err}"))?; + let mut out = Vec::new(); + for row in rows { out.push(row.map_err(|err| format!("failed to read instant row: {err}"))?); } + Ok(out) + } + + /// Reads counter values ordered by ts. + pub fn read_counters(&self) -> Result, String> { + let mut stmt = self.conn.prepare("SELECT id, ts, track_id, value FROM counter ORDER BY ts") + .map_err(|err| format!("failed to prepare counter query: {err}"))?; + let rows = stmt.query_map([], |row| Ok(PerfettoCounter { id: row.get(0)?, ts: row.get(1)?, track_id: row.get(2)?, value: row.get(3)? })) + .map_err(|err| format!("failed to query counter: {err}"))?; + let mut out = Vec::new(); + for row in rows { out.push(row.map_err(|err| format!("failed to read counter row: {err}"))?); } + Ok(out) + } + + /// Reads CPU topology. + pub fn read_cpus(&self) -> Result, String> { + let mut stmt = self.conn.prepare("SELECT id, cpu, cluster_id, processor FROM cpu ORDER BY id") + .map_err(|err| format!("failed to prepare cpu query: {err}"))?; + let rows = stmt.query_map([], |row| Ok(PerfettoCpu { id: row.get(0)?, cpu: row.get(1)?, cluster_id: row.get(2)?, processor: row.get(3)? })) + .map_err(|err| format!("failed to query cpu: {err}"))?; + let mut out = Vec::new(); + for row in rows { out.push(row.map_err(|err| format!("failed to read cpu row: {err}"))?); } + Ok(out) + } + + /// Reads machine info. + pub fn read_machines(&self) -> Result, String> { + let mut stmt = self.conn.prepare("SELECT id, arch, num_cpus, sysname, release FROM machine ORDER BY id") + .map_err(|err| format!("failed to prepare machine query: {err}"))?; + let rows = stmt.query_map([], |row| Ok(PerfettoMachine { id: row.get(0)?, arch: row.get(1)?, num_cpus: row.get(2)?, sysname: row.get(3)?, release: row.get(4)? })) + .map_err(|err| format!("failed to query machine: {err}"))?; + let mut out = Vec::new(); + for row in rows { out.push(row.map_err(|err| format!("failed to read machine row: {err}"))?); } + Ok(out) + } + + /// Reads trace metadata entries. + pub fn read_metadata(&self) -> Result, String> { + let mut stmt = self.conn.prepare("SELECT name, int_value, str_value FROM metadata ORDER BY name") + .map_err(|err| format!("failed to prepare metadata query: {err}"))?; + let rows = stmt.query_map([], |row| Ok(PerfettoMetadata { name: row.get(0)?, int_value: row.get(1)?, str_value: row.get(2)? })) + .map_err(|err| format!("failed to query metadata: {err}"))?; + let mut out = Vec::new(); + for row in rows { out.push(row.map_err(|err| format!("failed to read metadata row: {err}"))?); } + Ok(out) + } + + // ── P4-P9 readers (0 rows in ftrace, populated in other trace kinds) ──────── + + /// Reads memory snapshots. + pub fn read_memory_snapshots(&self) -> Result, String> { + let mut stmt = self.conn.prepare("SELECT id, timestamp, track_id, detail_level FROM memory_snapshot ORDER BY id") + .map_err(|err| format!("failed to prepare memory_snapshot query: {err}"))?; + let rows = stmt.query_map([], |row| Ok(PerfettoMemorySnapshot { id: row.get(0)?, timestamp: row.get(1)?, track_id: row.get(2)?, detail_level: row.get(3)? })) + .map_err(|err| format!("failed to query memory_snapshot: {err}"))?; + let mut out = Vec::new(); + for row in rows { out.push(row.map_err(|err| format!("failed to read memory_snapshot: {err}"))?); } + Ok(out) + } + + /// Reads CPU profile stack samples. + pub fn read_cpu_profile_samples(&self) -> Result, String> { + let mut stmt = self.conn.prepare("SELECT id, ts, callsite_id, utid FROM cpu_profile_stack_sample ORDER BY ts") + .map_err(|err| format!("failed to prepare cpu_profile_stack_sample query: {err}"))?; + let rows = stmt.query_map([], |row| Ok(PerfettoCpuProfileSample { id: row.get(0)?, ts: row.get(1)?, callsite_id: row.get(2)?, utid: row.get(3)? })) + .map_err(|err| format!("failed to query cpu_profile_stack_sample: {err}"))?; + let mut out = Vec::new(); + for row in rows { out.push(row.map_err(|err| format!("failed to read cpu_profile_stack_sample: {err}"))?); } + Ok(out) + } + + /// Reads stack profile frames. + pub fn read_stack_frames(&self) -> Result, String> { + let mut stmt = self.conn.prepare("SELECT id, name, mapping_id FROM stack_profile_frame ORDER BY id") + .map_err(|err| format!("failed to prepare stack_profile_frame query: {err}"))?; + let rows = stmt.query_map([], |row| Ok(PerfettoStackFrame { id: row.get(0)?, name: row.get(1)?, mapping_id: row.get(2)? })) + .map_err(|err| format!("failed to query stack_profile_frame: {err}"))?; + let mut out = Vec::new(); + for row in rows { out.push(row.map_err(|err| format!("failed to read stack_profile_frame: {err}"))?); } + Ok(out) + } + + /// Reads heap profile allocations. + pub fn read_heap_allocations(&self) -> Result, String> { + let mut stmt = self.conn.prepare("SELECT id, ts, upid, size, count FROM heap_profile_allocation ORDER BY ts") + .map_err(|err| format!("failed to prepare heap_profile_allocation query: {err}"))?; + let rows = stmt.query_map([], |row| Ok(PerfettoHeapAllocation { id: row.get(0)?, ts: row.get(1)?, upid: row.get(2)?, size: row.get(3)?, count: row.get(4)? })) + .map_err(|err| format!("failed to query heap_profile_allocation: {err}"))?; + let mut out = Vec::new(); + for row in rows { out.push(row.map_err(|err| format!("failed to read heap_profile_allocation: {err}"))?); } + Ok(out) + } + + /// Reads protolog entries. + pub fn read_protologs(&self) -> Result, String> { + let mut stmt = self.conn.prepare("SELECT id, ts, level, tag, message FROM protolog ORDER BY ts") + .map_err(|err| format!("failed to prepare protolog query: {err}"))?; + let rows = stmt.query_map([], |row| Ok(PerfettoProtolog { id: row.get(0)?, ts: row.get(1)?, level: row.get(2)?, tag: row.get(3)?, message: row.get(4)? })) + .map_err(|err| format!("failed to query protolog: {err}"))?; + let mut out = Vec::new(); + for row in rows { out.push(row.map_err(|err| format!("failed to read protolog: {err}"))?); } + Ok(out) + } + + /// Reads Android log entries. + pub fn read_android_logs(&self) -> Result, String> { + let mut stmt = self.conn.prepare("SELECT id, ts, utid, prio, tag, msg FROM android_logs ORDER BY ts") + .map_err(|err| format!("failed to prepare android_logs query: {err}"))?; + let rows = stmt.query_map([], |row| Ok(PerfettoAndroidLog { id: row.get(0)?, ts: row.get(1)?, utid: row.get(2)?, prio: row.get(3)?, tag: row.get(4)?, msg: row.get(5)? })) + .map_err(|err| format!("failed to query android_logs: {err}"))?; + let mut out = Vec::new(); + for row in rows { out.push(row.map_err(|err| format!("failed to read android_logs: {err}"))?); } + Ok(out) + } + + /// Reads file descriptor events. + pub fn read_filedescriptors(&self) -> Result, String> { + let mut stmt = self.conn.prepare("SELECT id, ufd, fd, ts, upid, path FROM filedescriptor ORDER BY ts") + .map_err(|err| format!("failed to prepare filedescriptor query: {err}"))?; + let rows = stmt.query_map([], |row| Ok(PerfettoFileDescriptor { id: row.get(0)?, ufd: row.get(1)?, fd: row.get(2)?, ts: row.get(3)?, upid: row.get(4)?, path: row.get(5)? })) + .map_err(|err| format!("failed to query filedescriptor: {err}"))?; + let mut out = Vec::new(); + for row in rows { out.push(row.map_err(|err| format!("failed to read filedescriptor: {err}"))?); } + Ok(out) + } } diff --git a/plugins/perfetto-ingest/tests/unit/tests.rs b/plugins/perfetto-ingest/tests/unit/tests.rs index 4a26d0a..e9c6f4b 100644 --- a/plugins/perfetto-ingest/tests/unit/tests.rs +++ b/plugins/perfetto-ingest/tests/unit/tests.rs @@ -180,6 +180,7 @@ fn test_db() -> super::sqlite_reader::PerfettoDb { CREATE TABLE thread_state (id INTEGER, ts INTEGER, dur INTEGER, utid INTEGER, state TEXT, io_wait INTEGER, blocked_function TEXT, waker_utid INTEGER, cpu INTEGER); CREATE TABLE ftrace_event (id INTEGER, ts INTEGER, name TEXT, cpu INTEGER, utid INTEGER); CREATE TABLE spurious_sched_wakeup (id INTEGER, ts INTEGER, utid INTEGER, waker_utid INTEGER); + CREATE TABLE instant (ts INTEGER, track_id INTEGER, name TEXT); CREATE TABLE args (arg_set_id INTEGER, flat_key TEXT, string_value TEXT, int_value INTEGER, real_value REAL); CREATE TABLE clock_snapshot (ts INTEGER, clock_value INTEGER, clock_id INTEGER, clock_name TEXT); @@ -193,6 +194,12 @@ fn test_db() -> super::sqlite_reader::PerfettoDb { INSERT INTO __intrinsic_track VALUES (10, 'test-track', 'thread_track', 200, NULL); INSERT INTO args VALUES (1, 'slice.name', 'early-slice', NULL, NULL); INSERT INTO args VALUES (2, 'slice.name', 'main-slice', NULL, NULL); + INSERT INTO thread_state VALUES (1, 5000, 10000, 1, 'R', NULL, NULL, NULL, 0); + INSERT INTO thread_state VALUES (2, 15000, 5000, 2, 'S', 1, 'pipe_wait', 1, 0); + INSERT INTO ftrace_event VALUES (1, 5000, 'sched_switch', 0, 1); + INSERT INTO ftrace_event VALUES (2, 15000, 'sched_waking', 2, 3); + INSERT INTO spurious_sched_wakeup VALUES (1, 5000, 1, 2); + INSERT INTO instant VALUES (10000, 10, 'test-instant'); INSERT INTO clock_snapshot VALUES (0, 1700000000000000000, 1, 'REALTIME'), (10000, 1700000000000010000, 1, 'REALTIME'); @@ -422,6 +429,131 @@ fn run_log_mapper(db: &super::sqlite_reader::PerfettoDb) -> Vec { take_records(&plugin) } +// sqlite_reader tests for P1/P2 types + +#[test] +fn sqlite_reader_reads_thread_states() { + let db = test_db(); + let states = db.read_thread_states().unwrap(); + assert_eq!(states.len(), 2); + assert_eq!(states[0].state.as_deref(), Some("R")); + assert_eq!(states[1].io_wait, Some(true)); + assert_eq!(states[1].blocked_function.as_deref(), Some("pipe_wait")); +} + +#[test] +fn sqlite_reader_reads_ftrace_events() { + let db = test_db(); + let events = db.read_ftrace_events().unwrap(); + assert_eq!(events.len(), 2); + assert_eq!(events[0].name.as_deref(), Some("sched_switch")); + assert_eq!(events[1].name.as_deref(), Some("sched_waking")); +} + +#[test] +fn sqlite_reader_reads_spurious_wakeups() { + let db = test_db(); + let wakeups = db.read_spurious_wakeups().unwrap(); + assert_eq!(wakeups.len(), 1); + assert_eq!(wakeups[0].utid, Some(1)); + assert_eq!(wakeups[0].waker_utid, Some(2)); +} + +#[test] +fn sqlite_reader_reads_instants() { + let db = test_db(); + let instants = db.read_instants().unwrap(); + assert_eq!(instants.len(), 1); + assert_eq!(instants[0].name.as_deref(), Some("test-instant")); +} + +#[test] +fn log_mapper_produces_thread_state_records() { + let db = test_db(); + let emitted = run_log_mapper(&db); + // Should include 2 thread_state records + slices + sched + ftrace + spurious + instant + summary + assert!(emitted.iter().any(|(rt, _, _)| *rt == crate::LJ_INGEST_RECORD_TYPE_LOGS)); + // Verify thread_state attributes in payload + let has_ts_attrs = emitted.iter().any(|(_, _, payload)| { + if let Ok(req) = prost::Message::decode(payload.as_slice()) { + let req: opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest = req; + req.resource_logs.iter().any(|rl| { + rl.scope_logs.iter().any(|sl| { + sl.log_records.iter().any(|lr| { + lr.attributes.iter().any(|kv| kv.key == "perfetto.ts.state" && kv.value.as_ref().is_some_and(|v| v.value.is_some())) + }) + }) + }) + } else { + false + } + }); + assert!(has_ts_attrs, "expected thread_state attributes in emitted records"); +} + +#[test] +fn log_mapper_produces_ftrace_event_records() { + let db = test_db(); + let emitted = run_log_mapper(&db); + let has_ftrace = emitted.iter().any(|(_, _, payload)| { + if let Ok(req) = prost::Message::decode(payload.as_slice()) { + let req: opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest = req; + req.resource_logs.iter().any(|rl| { + rl.scope_logs.iter().any(|sl| { + sl.log_records.iter().any(|lr| { + lr.attributes.iter().any(|kv| kv.key == "perfetto.ftrace.name" && kv.value.as_ref().is_some_and(|v| v.value.is_some())) + }) + }) + }) + } else { + false + } + }); + assert!(has_ftrace, "expected ftrace_event attributes in emitted records"); +} + +#[test] +fn log_mapper_produces_spurious_wakeup_records() { + let db = test_db(); + let emitted = run_log_mapper(&db); + let has_sw = emitted.iter().any(|(_, _, payload)| { + if let Ok(req) = prost::Message::decode(payload.as_slice()) { + let req: opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest = req; + req.resource_logs.iter().any(|rl| { + rl.scope_logs.iter().any(|sl| { + sl.log_records.iter().any(|lr| { + lr.attributes.iter().any(|kv| kv.key == "perfetto.sw.id") + }) + }) + }) + } else { + false + } + }); + assert!(has_sw, "expected spurious_wakeup attributes in emitted records"); +} + +#[test] +fn log_mapper_produces_instant_records() { + let db = test_db(); + let emitted = run_log_mapper(&db); + let has_instant = emitted.iter().any(|(_, _, payload)| { + if let Ok(req) = prost::Message::decode(payload.as_slice()) { + let req: opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest = req; + req.resource_logs.iter().any(|rl| { + rl.scope_logs.iter().any(|sl| { + sl.log_records.iter().any(|lr| { + lr.attributes.iter().any(|kv| kv.key == "perfetto.instant.name" && kv.value.as_ref().is_some_and(|v| v.value.is_some())) + }) + }) + }) + } else { + false + } + }); + assert!(has_instant, "expected instant event attributes in emitted records"); +} + fn run_core_pipeline(sqlite_path: &std::path::Path) -> Vec { let plugin = dummy_plugin(dummy_emit); let db = super::sqlite_reader::PerfettoDb::open(sqlite_path).unwrap(); @@ -478,6 +610,7 @@ fn temp_sqlite_file() -> std::path::PathBuf { CREATE TABLE thread_state (id INTEGER, ts INTEGER, dur INTEGER, utid INTEGER, state TEXT, io_wait INTEGER, blocked_function TEXT, waker_utid INTEGER, cpu INTEGER); CREATE TABLE ftrace_event (id INTEGER, ts INTEGER, name TEXT, cpu INTEGER, utid INTEGER); CREATE TABLE spurious_sched_wakeup (id INTEGER, ts INTEGER, utid INTEGER, waker_utid INTEGER); + CREATE TABLE instant (ts INTEGER, track_id INTEGER, name TEXT); CREATE TABLE clock_snapshot (ts INTEGER, clock_value INTEGER, clock_id INTEGER, clock_name TEXT); INSERT INTO slice VALUES (1, 5000, 3000, 'test', NULL, 10, NULL, 0), (2, 10000, 500, 'child', 1, 10, NULL, 1); INSERT INTO thread VALUES (1, 'main', 100, 1, 1); From ed850291d127b572866b44ab3cc6de2e0cd2ebec Mon Sep 17 00:00:00 2001 From: Bo Maryniuk Date: Thu, 7 May 2026 13:03:22 +0000 Subject: [PATCH 04/11] Add unit tests for types --- plugins/perfetto-ingest/tests/unit/tests.rs | 114 ++++++++++++++++++++ 1 file changed, 114 insertions(+) diff --git a/plugins/perfetto-ingest/tests/unit/tests.rs b/plugins/perfetto-ingest/tests/unit/tests.rs index e9c6f4b..f3dfa5c 100644 --- a/plugins/perfetto-ingest/tests/unit/tests.rs +++ b/plugins/perfetto-ingest/tests/unit/tests.rs @@ -181,6 +181,17 @@ fn test_db() -> super::sqlite_reader::PerfettoDb { CREATE TABLE ftrace_event (id INTEGER, ts INTEGER, name TEXT, cpu INTEGER, utid INTEGER); CREATE TABLE spurious_sched_wakeup (id INTEGER, ts INTEGER, utid INTEGER, waker_utid INTEGER); CREATE TABLE instant (ts INTEGER, track_id INTEGER, name TEXT); + CREATE TABLE cpu (id INTEGER, cpu INTEGER, cluster_id INTEGER, processor TEXT); + CREATE TABLE machine (id INTEGER, arch TEXT, num_cpus INTEGER, sysname TEXT, release TEXT); + CREATE TABLE metadata (name TEXT, int_value INTEGER, str_value TEXT); + CREATE TABLE counter (id INTEGER, ts INTEGER, track_id INTEGER, value REAL); + CREATE TABLE memory_snapshot (id INTEGER, timestamp INTEGER, track_id INTEGER, detail_level TEXT); + CREATE TABLE cpu_profile_stack_sample (id INTEGER, ts INTEGER, callsite_id INTEGER, utid INTEGER); + CREATE TABLE stack_profile_frame (id INTEGER, name TEXT, mapping_id INTEGER); + CREATE TABLE heap_profile_allocation (id INTEGER, ts INTEGER, upid INTEGER, size INTEGER, count INTEGER); + CREATE TABLE protolog (id INTEGER, ts INTEGER, level TEXT, tag TEXT, message TEXT); + CREATE TABLE android_logs (id INTEGER, ts INTEGER, utid INTEGER, prio INTEGER, tag TEXT, msg TEXT); + CREATE TABLE filedescriptor (id INTEGER, ufd INTEGER, fd INTEGER, ts INTEGER, upid INTEGER, path TEXT); CREATE TABLE args (arg_set_id INTEGER, flat_key TEXT, string_value TEXT, int_value INTEGER, real_value REAL); CREATE TABLE clock_snapshot (ts INTEGER, clock_value INTEGER, clock_id INTEGER, clock_name TEXT); @@ -200,6 +211,17 @@ fn test_db() -> super::sqlite_reader::PerfettoDb { INSERT INTO ftrace_event VALUES (2, 15000, 'sched_waking', 2, 3); INSERT INTO spurious_sched_wakeup VALUES (1, 5000, 1, 2); INSERT INTO instant VALUES (10000, 10, 'test-instant'); + INSERT INTO cpu VALUES (1, 0, 0, 'x86_64'); + INSERT INTO machine VALUES (1, 'x86_64', 8, 'Linux', '5.15.0'); + INSERT INTO metadata VALUES ('trace_size_bytes', 1048576, NULL); + INSERT INTO counter VALUES (1, 10000, 1, 2400000.0); + INSERT INTO memory_snapshot VALUES (1, 20000, 1, 'detailed'); + INSERT INTO cpu_profile_stack_sample VALUES (1, 10000, 42, 1); + INSERT INTO stack_profile_frame VALUES (1, 'main', 1); + INSERT INTO heap_profile_allocation VALUES (1, 20000, 1, 4096, 1); + INSERT INTO protolog VALUES (1, 10000, 'INFO', 'test', 'test log'); + INSERT INTO android_logs VALUES (1, 10000, 1, 3, 'TestTag', 'test message'); + INSERT INTO filedescriptor VALUES (1, 1, 42, 10000, 1, '/dev/null'); INSERT INTO clock_snapshot VALUES (0, 1700000000000000000, 1, 'REALTIME'), (10000, 1700000000000010000, 1, 'REALTIME'); @@ -554,6 +576,98 @@ fn log_mapper_produces_instant_records() { assert!(has_instant, "expected instant event attributes in emitted records"); } +// sqlite_reader tests for P3-P9 types + +#[test] +fn sqlite_reader_reads_cpus() { + let db = test_db(); + let cpus = db.read_cpus().unwrap(); + assert_eq!(cpus.len(), 1); + assert_eq!(cpus[0].cpu, Some(0)); + assert_eq!(cpus[0].processor.as_deref(), Some("x86_64")); +} + +#[test] +fn sqlite_reader_reads_machines() { + let db = test_db(); + let machines = db.read_machines().unwrap(); + assert_eq!(machines.len(), 1); + assert_eq!(machines[0].arch.as_deref(), Some("x86_64")); + assert_eq!(machines[0].sysname.as_deref(), Some("Linux")); +} + +#[test] +fn sqlite_reader_reads_metadata() { + let db = test_db(); + let meta = db.read_metadata().unwrap(); + assert_eq!(meta.len(), 1); + assert_eq!(meta[0].name.as_deref(), Some("trace_size_bytes")); +} + +#[test] +fn sqlite_reader_reads_counters() { + let db = test_db(); + let counters = db.read_counters().unwrap(); + assert_eq!(counters.len(), 1); + assert!((counters[0].value - 2400000.0).abs() < 1.0); +} + +#[test] +fn sqlite_reader_reads_memory_snapshots() { + let db = test_db(); + let snaps = db.read_memory_snapshots().unwrap(); + assert_eq!(snaps.len(), 1); + assert_eq!(snaps[0].detail_level.as_deref(), Some("detailed")); +} + +#[test] +fn sqlite_reader_reads_cpu_profile_samples() { + let db = test_db(); + let samples = db.read_cpu_profile_samples().unwrap(); + assert_eq!(samples.len(), 1); + assert_eq!(samples[0].callsite_id, 42); +} + +#[test] +fn sqlite_reader_reads_stack_frames() { + let db = test_db(); + let frames = db.read_stack_frames().unwrap(); + assert_eq!(frames.len(), 1); + assert_eq!(frames[0].name.as_deref(), Some("main")); +} + +#[test] +fn sqlite_reader_reads_heap_allocations() { + let db = test_db(); + let allocs = db.read_heap_allocations().unwrap(); + assert_eq!(allocs.len(), 1); + assert_eq!(allocs[0].size, 4096); +} + +#[test] +fn sqlite_reader_reads_protologs() { + let db = test_db(); + let logs = db.read_protologs().unwrap(); + assert_eq!(logs.len(), 1); + assert_eq!(logs[0].message.as_deref(), Some("test log")); +} + +#[test] +fn sqlite_reader_reads_android_logs() { + let db = test_db(); + let logs = db.read_android_logs().unwrap(); + assert_eq!(logs.len(), 1); + assert_eq!(logs[0].tag.as_deref(), Some("TestTag")); +} + +#[test] +fn sqlite_reader_reads_filedescriptors() { + let db = test_db(); + let fds = db.read_filedescriptors().unwrap(); + assert_eq!(fds.len(), 1); + assert_eq!(fds[0].path.as_deref(), Some("/dev/null")); +} + fn run_core_pipeline(sqlite_path: &std::path::Path) -> Vec { let plugin = dummy_plugin(dummy_emit); let db = super::sqlite_reader::PerfettoDb::open(sqlite_path).unwrap(); From 51bc3fe80343ecd6ced822c26bb4f94c62990e4f Mon Sep 17 00:00:00 2001 From: Bo Maryniuk Date: Thu, 7 May 2026 13:14:15 +0000 Subject: [PATCH 05/11] Update demos to capture more diverse data --- demo/perfetto/linux-data-record/run-demo.sh | 5 ++++- demo/perfetto/perfetto-to-logjet/README.md | 23 ++++++++++++-------- demo/perfetto/perfetto-to-logjet/run-demo.sh | 8 ++++++- 3 files changed, 25 insertions(+), 11 deletions(-) diff --git a/demo/perfetto/linux-data-record/run-demo.sh b/demo/perfetto/linux-data-record/run-demo.sh index b545933..26e79a7 100755 --- a/demo/perfetto/linux-data-record/run-demo.sh +++ b/demo/perfetto/linux-data-record/run-demo.sh @@ -45,7 +45,7 @@ echo "Recording 5s of ftrace to $PERFETTO_TRACE_OUT..." CONFIG_FILE="$SCRIPT_DIR/trace-config.txt" cat > "$CONFIG_FILE" <<'ENDCONFIG' buffers: { - size_kb: 4096 + size_kb: 8192 fill_policy: RING_BUFFER } data_sources: { @@ -54,6 +54,9 @@ data_sources: { ftrace_config { ftrace_events: "sched/sched_switch" ftrace_events: "sched/sched_waking" + ftrace_events: "sched/sched_process_exec" + ftrace_events: "sched/sched_process_fork" + ftrace_events: "power/cpu_frequency" } } } diff --git a/demo/perfetto/perfetto-to-logjet/README.md b/demo/perfetto/perfetto-to-logjet/README.md index 0132270..1ebab43 100644 --- a/demo/perfetto/perfetto-to-logjet/README.md +++ b/demo/perfetto/perfetto-to-logjet/README.md @@ -23,21 +23,26 @@ Requires sudo for ftrace access. ## What Happens 1. `traced` + `traced_probes` start in the background. -2. `tracebox` records 5s of scheduler events (CPU switches) via ftrace. +2. `tracebox` records 5s of scheduler events (CPU switches, process lifecycle, + CPU frequency, interrupts) via ftrace. 3. `ljd` loads the perfetto-ingest plugin, which spawns `trace_processor`, - exports the trace as SQLite, maps `sched_slice` rows to OTel log records - with CPU/state/duration, and streams them into a `.logjet` spool. -4. `ljx view` opens the spool — each CPU scheduling event appears as one line. + exports the trace as SQLite, maps every Perfetto table to OTel log records + (sched slices, thread states, ftrace events, spurious wakeups, instant + events, counters), and streams them into a `.logjet` spool. +4. `ljx view` opens the spool. ## What You Should See -- Thousands of log lines, each showing a CPU scheduling event: +- Thousands of log lines across multiple types: ``` - May 7 10:43:15 I cpu=7 dur=7.2us state=R utid=19 ts=... - May 7 10:43:15 I cpu=7 dur=2.0us state=R utid=21 ts=... + cpu=7 state=R utid=19 dur=7.2us ← sched_slice + state=S dur=12.3us utid=3 cpu=1 ← thread_state + sched_switch cpu=5 ← ftrace_event + spurious_wakeup utid=1 ← spurious_wakeup ``` -- Press `Enter` to see full OTel attributes (perfetto.sched.id, cpu, end_state). -- Press `F` for field filter, `/` to search, `q` to quit. +- Press `Enter` to see full OTel attributes for each record. +- Press `F` for field filter (e.g. filter by `perfetto.sched.cpu` to see only one CPU). +- `/` to search, `q` to quit. ## Troubleshooting diff --git a/demo/perfetto/perfetto-to-logjet/run-demo.sh b/demo/perfetto/perfetto-to-logjet/run-demo.sh index 1566246..0e010b2 100755 --- a/demo/perfetto/perfetto-to-logjet/run-demo.sh +++ b/demo/perfetto/perfetto-to-logjet/run-demo.sh @@ -57,7 +57,7 @@ echo "Recording 5s of ftrace to $TRACE_FILE..." CONFIG_FILE="$SCRIPT_DIR/trace-config.txt" cat > "$CONFIG_FILE" <<'ENDCONFIG' buffers: { - size_kb: 4096 + size_kb: 8192 fill_policy: RING_BUFFER } data_sources: { @@ -66,7 +66,13 @@ data_sources: { ftrace_config { ftrace_events: "sched/sched_switch" ftrace_events: "sched/sched_waking" + ftrace_events: "sched/sched_process_exec" + ftrace_events: "sched/sched_process_fork" + ftrace_events: "sched/sched_process_exit" ftrace_events: "power/cpu_frequency" + ftrace_events: "power/cpu_idle" + ftrace_events: "irq/irq_handler_entry" + ftrace_events: "irq/irq_handler_exit" } } } From 96e3e9935d25552fcb98a6f8306685313c209e5a Mon Sep 17 00:00:00 2001 From: Bo Maryniuk Date: Thu, 7 May 2026 14:01:12 +0000 Subject: [PATCH 06/11] Fix monotonic timestamp --- logjetd/src/plugin.rs | 4 +- plugins/perfetto-ingest/src/lib.rs | 38 +- plugins/perfetto-ingest/src/log_mapper.rs | 400 ++++++---------------- 3 files changed, 146 insertions(+), 296 deletions(-) diff --git a/logjetd/src/plugin.rs b/logjetd/src/plugin.rs index 453bb34..afb37db 100644 --- a/logjetd/src/plugin.rs +++ b/logjetd/src/plugin.rs @@ -672,7 +672,9 @@ unsafe extern "C" fn on_generic_record(user: *mut c_void, record: *const LjInges SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_nanos() as u64 }; - let wire = WireRecord { record_type, seq: ctx.next_seq.fetch_add(1, Ordering::Relaxed), ts_unix_ns: ts, payload }; + let seq = ctx.next_seq.fetch_add(1, Ordering::Relaxed); + + let wire = WireRecord { record_type, seq, ts_unix_ns: ts, payload }; if let Err(err) = super::daemon::append_to_spool(&ctx.spool, wire) { eprintln!("ljd plugin callback spool error: {err}"); diff --git a/plugins/perfetto-ingest/src/lib.rs b/plugins/perfetto-ingest/src/lib.rs index 685f6e4..58cc1ab 100644 --- a/plugins/perfetto-ingest/src/lib.rs +++ b/plugins/perfetto-ingest/src/lib.rs @@ -117,6 +117,17 @@ pub struct PerfettoPlugin { last_error: Option, } +// Buffer for sorting emissions by timestamp before sending to the callback. +// Populated by buffer_emit, drained after all mappers complete. +use std::cell::RefCell; +thread_local! { + static EMIT_BUF: RefCell)>> = const { RefCell::new(Vec::new()) }; +} + +unsafe fn buffer_emit(_ctx: &PerfettoPlugin, record_type: u32, ts: u64, payload: &[u8]) { + EMIT_BUF.with(|buf| buf.borrow_mut().push((record_type, ts, payload.to_vec()))); +} + // Exported C ABI #[unsafe(no_mangle)] @@ -249,27 +260,40 @@ fn run_pipeline(plugin: &mut PerfettoPlugin, trace_file: &std::path::Path) -> Re eprintln!("perfetto-ingest: no realtime clock snapshots — timestamps will be unavailable"); } - eprintln!("perfetto-ingest: mapping traces..."); - trace_mapper::map_traces(&db, &converter, emit_generic, plugin)?; + // Buffer all emissions through a thread-local buffer, sort by timestamp, + // then flush through the real callback. This guarantees monotonicity. + EMIT_BUF.with(|buf| buf.borrow_mut().clear()); + + // Skip trace mapping for now — ljx view doesn't decode ExportTraceServiceRequest + // yet, so trace records render as binary garbage. + // eprintln!("perfetto-ingest: mapping traces..."); + // trace_mapper::map_traces(&db, &converter, buffer_emit, plugin)?; - // Optional: run metrics export and map metrics. + eprintln!("perfetto-ingest: mapping logs..."); + log_mapper::map_logs(&db, &converter, buffer_emit, plugin)?; + + // Optional metrics let metrics_names: Vec = std::env::var("LJD_PERFETTO_METRICS") .ok() .map(|s| s.split(',').map(|s| s.trim().to_string()).collect::>()) .unwrap_or_default(); let metrics_refs: Vec<&str> = metrics_names.iter().map(|s| s.as_str()).collect(); - if !metrics_refs.is_empty() && let Ok(Some(metrics_path)) = perfetto_invoke::export_metrics(trace_file, &tp_path, &metrics_refs) { eprintln!("perfetto-ingest: mapping metrics..."); let metrics = metrics_reader::parse_metrics_json(&metrics_path) .map_err(|err| format!("failed to parse metrics JSON: {err}"))?; - metric_mapper::map_metrics(&metrics, &converter, emit_generic, plugin)?; + metric_mapper::map_metrics(&metrics, &converter, buffer_emit, plugin)?; let _ = std::fs::remove_file(&metrics_path); } - eprintln!("perfetto-ingest: mapping logs..."); - log_mapper::map_logs(&db, &converter, emit_generic, plugin)?; + let mut all: Vec<(u32, u64, Vec)> = Vec::new(); + EMIT_BUF.with(|buf| all = std::mem::take(&mut *buf.borrow_mut())); + all.sort_by_key(|(_, ts, _)| *ts); + + for (rt, ts, payload) in &all { + unsafe { emit_generic(plugin, *rt, *ts, payload) }; + } let _ = std::fs::remove_file(&sqlite_path); diff --git a/plugins/perfetto-ingest/src/log_mapper.rs b/plugins/perfetto-ingest/src/log_mapper.rs index 45f582d..3e77257 100644 --- a/plugins/perfetto-ingest/src/log_mapper.rs +++ b/plugins/perfetto-ingest/src/log_mapper.rs @@ -11,7 +11,10 @@ use crate::sqlite_reader::{PerfettoDb, PerfettoFtraceEvent, PerfettoInstant, Per use crate::timestamp::TimestampConverter; const SEVERITY_INFO: i32 = 9; -const SLICES_PER_LOG_BATCH: usize = 1; + +fn dur_str(ns: i64) -> String { + if ns < 0 { "running".to_string() } else { format!("{:.1}us", ns as f64 / 1000.0) } +} pub fn map_logs( db: &PerfettoDb, @@ -19,195 +22,114 @@ pub fn map_logs( emit: unsafe fn(ctx: &crate::PerfettoPlugin, record_type: u32, ts_unix_ns: u64, payload: &[u8]), plugin: &crate::PerfettoPlugin, ) -> Result<(), String> { - // Emit a per-slice log for each slice (readable in ljx view). - let slices = db.read_slices()?; - let mut batch: Vec = Vec::with_capacity(SLICES_PER_LOG_BATCH); - let mut batch_min_ts: u64 = 0; - - for slice in &slices { - let ts = converter.to_realtime(slice.ts).ok().flatten().unwrap_or(0); - - if batch.is_empty() || ts < batch_min_ts { - batch_min_ts = ts; - } - - batch.push(slice_to_log(slice, converter)); + let mut all: Vec = Vec::new(); - if batch.len() >= SLICES_PER_LOG_BATCH { - flush_log_batch(&mut batch, &mut batch_min_ts, emit, plugin); - } + for slice in &db.read_slices()? { + if let Some(rec) = maybe_slice_to_log(slice, converter) { all.push(rec); } } - flush_log_batch(&mut batch, &mut batch_min_ts, emit, plugin); - - // Emit sched_slice entries as log records. - let sched_slices = db.read_sched_slices()?; - for s in &sched_slices { - let ts = converter.to_realtime(s.ts).ok().flatten().unwrap_or(0); - if batch.is_empty() || ts < batch_min_ts { - batch_min_ts = ts; - } - batch.push(sched_slice_to_log(s, converter)); - if batch.len() >= SLICES_PER_LOG_BATCH { - flush_log_batch(&mut batch, &mut batch_min_ts, emit, plugin); - } + for s in &db.read_sched_slices()? { + if let Some(rec) = maybe_sched_slice_to_log(s, converter) { all.push(rec); } } - flush_log_batch(&mut batch, &mut batch_min_ts, emit, plugin); - - // Emit thread_state entries as log records. - let thread_states = db.read_thread_states()?; - for ts_state in &thread_states { - let ts = converter.to_realtime(ts_state.ts).ok().flatten().unwrap_or(0); - if batch.is_empty() || ts < batch_min_ts { - batch_min_ts = ts; - } - batch.push(thread_state_to_log(ts_state, converter)); - if batch.len() >= SLICES_PER_LOG_BATCH { - flush_log_batch(&mut batch, &mut batch_min_ts, emit, plugin); - } + for ts in &db.read_thread_states()? { + if let Some(rec) = maybe_thread_state_to_log(ts, converter) { all.push(rec); } } - flush_log_batch(&mut batch, &mut batch_min_ts, emit, plugin); - - // Emit ftrace_event entries as log records. - let ftrace_events = db.read_ftrace_events()?; - for ev in &ftrace_events { - let ts = converter.to_realtime(ev.ts).ok().flatten().unwrap_or(0); - if batch.is_empty() || ts < batch_min_ts { - batch_min_ts = ts; - } - batch.push(ftrace_event_to_log(ev, converter)); - if batch.len() >= SLICES_PER_LOG_BATCH { - flush_log_batch(&mut batch, &mut batch_min_ts, emit, plugin); - } + for ev in &db.read_ftrace_events()? { + if let Some(rec) = maybe_ftrace_event_to_log(ev, converter) { all.push(rec); } } - flush_log_batch(&mut batch, &mut batch_min_ts, emit, plugin); - - // Emit spurious_wakeup entries as log records. - let wakeups = db.read_spurious_wakeups()?; - for w in &wakeups { - let ts = converter.to_realtime(w.ts).ok().flatten().unwrap_or(0); - if batch.is_empty() || ts < batch_min_ts { - batch_min_ts = ts; - } - batch.push(spurious_wakeup_to_log(w, converter)); - if batch.len() >= SLICES_PER_LOG_BATCH { - flush_log_batch(&mut batch, &mut batch_min_ts, emit, plugin); - } + for w in &db.read_spurious_wakeups()? { + if let Some(rec) = maybe_spurious_wakeup_to_log(w, converter) { all.push(rec); } + } + for inst in &db.read_instants()? { + if let Some(rec) = maybe_instant_to_log(inst, converter) { all.push(rec); } } - flush_log_batch(&mut batch, &mut batch_min_ts, emit, plugin); - // Emit instant events as log records. - let instants = db.read_instants()?; - for inst in &instants { - let ts = converter.to_realtime(inst.ts).ok().flatten().unwrap_or(0); - if batch.is_empty() || ts < batch_min_ts { batch_min_ts = ts; } - batch.push(instant_to_log(inst, converter)); - if batch.len() >= SLICES_PER_LOG_BATCH { flush_log_batch(&mut batch, &mut batch_min_ts, emit, plugin); } + all.sort_by_key(|r| r.time_unix_nano); + + for rec in &all { + let request = ExportLogsServiceRequest { + resource_logs: vec![ResourceLogs { + resource: Some(Resource { + attributes: vec![KeyValue { + key: "service.name".to_string(), + value: Some(AnyValue { value: Some(Value::StringValue("perfetto".to_string())) }), + }], + dropped_attributes_count: 0, + entity_refs: Vec::new(), + }), + scope_logs: vec![ScopeLogs { + scope: Some(InstrumentationScope { + name: "perfetto-ingest".to_string(), + version: String::new(), + attributes: Vec::new(), + dropped_attributes_count: 0, + }), + log_records: vec![rec.clone()], + schema_url: String::new(), + }], + schema_url: String::new(), + }], + }; + let payload = request.encode_to_vec(); + unsafe { emit(plugin, crate::LJ_INGEST_RECORD_TYPE_LOGS, rec.time_unix_nano, &payload) }; } - flush_log_batch(&mut batch, &mut batch_min_ts, emit, plugin); - // Emit a summary log. let threads = db.read_threads()?; let processes = db.read_processes()?; - emit_summary(slices.len(), threads.len(), processes.len(), emit, plugin); + let max_ts = all.last().map(|r| r.time_unix_nano.saturating_add(1)).unwrap_or(0); + emit_summary(db.read_slices()?.len(), threads.len(), processes.len(), emit, plugin, max_ts); Ok(()) } -fn slice_to_log(slice: &PerfettoSlice, converter: &TimestampConverter) -> LogRecord { +fn maybe_slice_to_log(slice: &PerfettoSlice, converter: &TimestampConverter) -> Option { let ts = converter.to_realtime(slice.ts).ok().flatten().unwrap_or(0); - let dur_us = slice.dur as f64 / 1000.0; + let dur = dur_str(slice.dur); let name = slice.name.as_deref().unwrap_or("(unnamed)"); - - let body = format!("{name} dur={dur_us:.1}us depth={}", slice.depth); - - LogRecord { - time_unix_nano: ts, - observed_time_unix_nano: ts, - severity_number: SEVERITY_INFO, - severity_text: "INFO".to_string(), + let body = format!("{name} dur={dur} depth={}", slice.depth); + Some(LogRecord { + time_unix_nano: ts, observed_time_unix_nano: ts, + severity_number: SEVERITY_INFO, severity_text: "INFO".to_string(), body: Some(AnyValue { value: Some(Value::StringValue(body)) }), attributes: vec![ - KeyValue { - key: "perfetto.slice.id".to_string(), - value: Some(AnyValue { value: Some(Value::IntValue(slice.id)) }), - }, - KeyValue { - key: "perfetto.slice.name".to_string(), - value: Some(AnyValue { value: Some(Value::StringValue(name.to_string())) }), - }, - KeyValue { - key: "perfetto.slice.dur_ns".to_string(), - value: Some(AnyValue { value: Some(Value::IntValue(slice.dur)) }), - }, - KeyValue { - key: "perfetto.slice.depth".to_string(), - value: Some(AnyValue { value: Some(Value::IntValue(slice.depth as i64)) }), - }, + KeyValue { key: "perfetto.slice.id".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(slice.id)) }) }, + KeyValue { key: "perfetto.slice.name".to_string(), value: Some(AnyValue { value: Some(Value::StringValue(name.to_string())) }) }, + KeyValue { key: "perfetto.slice.dur_ns".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(slice.dur)) }) }, + KeyValue { key: "perfetto.slice.depth".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(slice.depth as i64)) }) }, ], - dropped_attributes_count: 0, - flags: 0, - trace_id: Vec::new(), - span_id: Vec::new(), - event_name: String::new(), - } + dropped_attributes_count: 0, flags: 0, + trace_id: Vec::new(), span_id: Vec::new(), event_name: String::new(), + }) } -fn sched_slice_to_log(s: &PerfettoSchedSlice, converter: &TimestampConverter) -> LogRecord { +fn maybe_sched_slice_to_log(s: &PerfettoSchedSlice, converter: &TimestampConverter) -> Option { let ts = converter.to_realtime(s.ts).ok().flatten().unwrap_or(0); let end = s.end_state.as_deref().unwrap_or("?"); - let dur_ns = s.dur as u64; - let dur_us = dur_ns as f64 / 1000.0; - let body = format!("cpu={} state={end} utid={} dur={dur_us:.1}us", s.cpu, s.utid); - - LogRecord { - time_unix_nano: ts, - observed_time_unix_nano: ts, - severity_number: SEVERITY_INFO, - severity_text: "INFO".to_string(), + let dur = dur_str(s.dur); + let body = format!("cpu={} state={end} utid={} dur={dur}", s.cpu, s.utid); + Some(LogRecord { + time_unix_nano: ts, observed_time_unix_nano: ts, + severity_number: SEVERITY_INFO, severity_text: "INFO".to_string(), body: Some(AnyValue { value: Some(Value::StringValue(body)) }), attributes: vec![ - KeyValue { - key: "perfetto.sched.id".to_string(), - value: Some(AnyValue { value: Some(Value::IntValue(s.id)) }), - }, - KeyValue { - key: "perfetto.sched.cpu".to_string(), - value: Some(AnyValue { value: Some(Value::IntValue(s.cpu)) }), - }, - KeyValue { - key: "perfetto.sched.dur_ns".to_string(), - value: Some(AnyValue { value: Some(Value::IntValue(s.dur)) }), - }, - KeyValue { - key: "perfetto.sched.end_state".to_string(), - value: Some(AnyValue { value: Some(Value::StringValue(end.to_string())) }), - }, + KeyValue { key: "perfetto.sched.id".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(s.id)) }) }, + KeyValue { key: "perfetto.sched.cpu".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(s.cpu)) }) }, + KeyValue { key: "perfetto.sched.dur_ns".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(s.dur)) }) }, + KeyValue { key: "perfetto.sched.end_state".to_string(), value: Some(AnyValue { value: Some(Value::StringValue(end.to_string())) }) }, ], - dropped_attributes_count: 0, - flags: 0, - trace_id: Vec::new(), - span_id: Vec::new(), - event_name: String::new(), - } + dropped_attributes_count: 0, flags: 0, + trace_id: Vec::new(), span_id: Vec::new(), event_name: String::new(), + }) } -fn thread_state_to_log(ts: &PerfettoThreadState, converter: &TimestampConverter) -> LogRecord { +fn maybe_thread_state_to_log(ts: &PerfettoThreadState, converter: &TimestampConverter) -> Option { let t = converter.to_realtime(ts.ts).ok().flatten().unwrap_or(0); let state = ts.state.as_deref().unwrap_or("?"); - let dur_us = ts.dur as f64 / 1000.0; - let mut body = format!("state={state} dur={dur_us:.1}us utid={}", ts.utid); - if let Some(cpu) = ts.cpu { - body.push_str(&format!(" cpu={cpu}")); - } - if ts.io_wait == Some(true) { - body.push_str(" io_wait"); - } - if let Some(ref blocked) = ts.blocked_function { - body.push_str(&format!(" blocked={blocked}")); - } - if let Some(waker) = ts.waker_utid { - body.push_str(&format!(" waker={waker}")); - } + let dur = dur_str(ts.dur); + let mut body = format!("state={state} dur={dur} utid={}", ts.utid); + if let Some(cpu) = ts.cpu { body.push_str(&format!(" cpu={cpu}")); } + if ts.io_wait == Some(true) { body.push_str(" io_wait"); } + if let Some(ref blocked) = ts.blocked_function { body.push_str(&format!(" blocked={blocked}")); } + if let Some(waker) = ts.waker_utid { body.push_str(&format!(" waker={waker}")); } let mut attrs = vec![ KeyValue { key: "perfetto.ts.id".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(ts.id)) }) }, @@ -215,27 +137,21 @@ fn thread_state_to_log(ts: &PerfettoThreadState, converter: &TimestampConverter) KeyValue { key: "perfetto.ts.dur_ns".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(ts.dur)) }) }, KeyValue { key: "perfetto.ts.utid".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(ts.utid)) }) }, ]; - if let Some(cpu) = ts.cpu { - attrs.push(KeyValue { key: "perfetto.ts.cpu".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(cpu)) }) }); - } - if let Some(io) = ts.io_wait { - attrs.push(KeyValue { key: "perfetto.ts.io_wait".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(i64::from(io))) }) }); - } - if let Some(ref blocked) = ts.blocked_function { - attrs.push(KeyValue { key: "perfetto.ts.blocked_function".to_string(), value: Some(AnyValue { value: Some(Value::StringValue(blocked.clone())) }) }); - } + if let Some(cpu) = ts.cpu { attrs.push(KeyValue { key: "perfetto.ts.cpu".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(cpu)) }) }); } + if let Some(io) = ts.io_wait { attrs.push(KeyValue { key: "perfetto.ts.io_wait".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(i64::from(io))) }) }); } + if let Some(ref blocked) = ts.blocked_function { attrs.push(KeyValue { key: "perfetto.ts.blocked_function".to_string(), value: Some(AnyValue { value: Some(Value::StringValue(blocked.clone())) }) }); } - LogRecord { + Some(LogRecord { time_unix_nano: t, observed_time_unix_nano: t, severity_number: SEVERITY_INFO, severity_text: "INFO".to_string(), body: Some(AnyValue { value: Some(Value::StringValue(body)) }), attributes: attrs, dropped_attributes_count: 0, flags: 0, trace_id: Vec::new(), span_id: Vec::new(), event_name: String::new(), - } + }) } -fn ftrace_event_to_log(ev: &PerfettoFtraceEvent, converter: &TimestampConverter) -> LogRecord { +fn maybe_ftrace_event_to_log(ev: &PerfettoFtraceEvent, converter: &TimestampConverter) -> Option { let t = converter.to_realtime(ev.ts).ok().flatten().unwrap_or(0); let name = ev.name.as_deref().unwrap_or("?"); let cpu = ev.cpu.unwrap_or(-1); @@ -245,46 +161,38 @@ fn ftrace_event_to_log(ev: &PerfettoFtraceEvent, converter: &TimestampConverter) KeyValue { key: "perfetto.ftrace.name".to_string(), value: Some(AnyValue { value: Some(Value::StringValue(name.to_string())) }) }, KeyValue { key: "perfetto.ftrace.cpu".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(cpu)) }) }, ]; - if let Some(utid) = ev.utid { - attrs.push(KeyValue { key: "perfetto.ftrace.utid".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(utid)) }) }); - } - LogRecord { + if let Some(utid) = ev.utid { attrs.push(KeyValue { key: "perfetto.ftrace.utid".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(utid)) }) }); } + Some(LogRecord { time_unix_nano: t, observed_time_unix_nano: t, severity_number: SEVERITY_INFO, severity_text: "INFO".to_string(), body: Some(AnyValue { value: Some(Value::StringValue(body)) }), attributes: attrs, dropped_attributes_count: 0, flags: 0, trace_id: Vec::new(), span_id: Vec::new(), event_name: String::new(), - } + }) } -fn spurious_wakeup_to_log(w: &PerfettoSpuriousWakeup, converter: &TimestampConverter) -> LogRecord { +fn maybe_spurious_wakeup_to_log(w: &PerfettoSpuriousWakeup, converter: &TimestampConverter) -> Option { let t = converter.to_realtime(w.ts).ok().flatten().unwrap_or(0); let body = format!("spurious_wakeup utid={}", w.utid.unwrap_or(-1)); - let mut attrs = vec![ - KeyValue { key: "perfetto.sw.id".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(w.id)) }) }, - ]; - if let Some(utid) = w.utid { - attrs.push(KeyValue { key: "perfetto.sw.utid".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(utid)) }) }); - } - if let Some(waker) = w.waker_utid { - attrs.push(KeyValue { key: "perfetto.sw.waker_utid".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(waker)) }) }); - } - LogRecord { + let mut attrs = vec![KeyValue { key: "perfetto.sw.id".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(w.id)) }) }]; + if let Some(utid) = w.utid { attrs.push(KeyValue { key: "perfetto.sw.utid".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(utid)) }) }); } + if let Some(waker) = w.waker_utid { attrs.push(KeyValue { key: "perfetto.sw.waker_utid".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(waker)) }) }); } + Some(LogRecord { time_unix_nano: t, observed_time_unix_nano: t, severity_number: SEVERITY_INFO, severity_text: "INFO".to_string(), body: Some(AnyValue { value: Some(Value::StringValue(body)) }), attributes: attrs, dropped_attributes_count: 0, flags: 0, trace_id: Vec::new(), span_id: Vec::new(), event_name: String::new(), - } + }) } -fn instant_to_log(inst: &PerfettoInstant, converter: &TimestampConverter) -> LogRecord { +fn maybe_instant_to_log(inst: &PerfettoInstant, converter: &TimestampConverter) -> Option { let t = converter.to_realtime(inst.ts).ok().flatten().unwrap_or(0); let name = inst.name.as_deref().unwrap_or("?"); let body = name.to_string(); - LogRecord { + Some(LogRecord { time_unix_nano: t, observed_time_unix_nano: t, severity_number: SEVERITY_INFO, severity_text: "INFO".to_string(), body: Some(AnyValue { value: Some(Value::StringValue(body)) }), @@ -294,119 +202,35 @@ fn instant_to_log(inst: &PerfettoInstant, converter: &TimestampConverter) -> Log ], dropped_attributes_count: 0, flags: 0, trace_id: Vec::new(), span_id: Vec::new(), event_name: String::new(), - } -} - -fn flush_log_batch( - batch: &mut Vec, - batch_min_ts: &mut u64, - emit: unsafe fn(ctx: &crate::PerfettoPlugin, record_type: u32, ts_unix_ns: u64, payload: &[u8]), - plugin: &crate::PerfettoPlugin, -) { - if batch.is_empty() { - return; - } - - let records = std::mem::replace(batch, Vec::with_capacity(SLICES_PER_LOG_BATCH)); - let ts = *batch_min_ts; - *batch_min_ts = 0; - - let request = ExportLogsServiceRequest { - resource_logs: vec![ResourceLogs { - resource: Some(Resource { - attributes: vec![KeyValue { - key: "service.name".to_string(), - value: Some(AnyValue { value: Some(Value::StringValue("perfetto".to_string())) }), - }], - dropped_attributes_count: 0, - entity_refs: Vec::new(), - }), - scope_logs: vec![ScopeLogs { - scope: Some(InstrumentationScope { - name: "perfetto-ingest".to_string(), - version: String::new(), - attributes: Vec::new(), - dropped_attributes_count: 0, - }), - log_records: records, - schema_url: String::new(), - }], - schema_url: String::new(), - }], - }; - - let payload = request.encode_to_vec(); - unsafe { emit(plugin, crate::LJ_INGEST_RECORD_TYPE_LOGS, ts, &payload) }; + }) } fn emit_summary( - count_slices: usize, - count_threads: usize, - count_processes: usize, + count_slices: usize, count_threads: usize, count_processes: usize, emit: unsafe fn(ctx: &crate::PerfettoPlugin, record_type: u32, ts_unix_ns: u64, payload: &[u8]), plugin: &crate::PerfettoPlugin, + ts: u64, ) { - let now_ns = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_nanos() as u64; - - let body = format!( - "Perfetto trace analysis complete: {} slices, {} threads, {} processes", - count_slices, count_threads, count_processes - ); - + let body = format!("Perfetto trace analysis complete: {} slices, {} threads, {} processes", count_slices, count_threads, count_processes); let record = LogRecord { - time_unix_nano: now_ns, - observed_time_unix_nano: now_ns, - severity_number: SEVERITY_INFO, - severity_text: "INFO".to_string(), + time_unix_nano: ts, observed_time_unix_nano: ts, + severity_number: SEVERITY_INFO, severity_text: "INFO".to_string(), body: Some(AnyValue { value: Some(Value::StringValue(body)) }), attributes: vec![ - KeyValue { - key: "perfetto.slices".to_string(), - value: Some(AnyValue { value: Some(Value::IntValue(count_slices as i64)) }), - }, - KeyValue { - key: "perfetto.threads".to_string(), - value: Some(AnyValue { value: Some(Value::IntValue(count_threads as i64)) }), - }, - KeyValue { - key: "perfetto.processes".to_string(), - value: Some(AnyValue { value: Some(Value::IntValue(count_processes as i64)) }), - }, + KeyValue { key: "perfetto.slices".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(count_slices as i64)) }) }, + KeyValue { key: "perfetto.threads".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(count_threads as i64)) }) }, + KeyValue { key: "perfetto.processes".to_string(), value: Some(AnyValue { value: Some(Value::IntValue(count_processes as i64)) }) }, ], - dropped_attributes_count: 0, - flags: 0, - trace_id: Vec::new(), - span_id: Vec::new(), - event_name: String::new(), + dropped_attributes_count: 0, flags: 0, + trace_id: Vec::new(), span_id: Vec::new(), event_name: String::new(), }; - let request = ExportLogsServiceRequest { resource_logs: vec![ResourceLogs { - resource: Some(Resource { - attributes: vec![KeyValue { - key: "service.name".to_string(), - value: Some(AnyValue { value: Some(Value::StringValue("perfetto".to_string())) }), - }], - dropped_attributes_count: 0, - entity_refs: Vec::new(), - }), - scope_logs: vec![ScopeLogs { - scope: Some(InstrumentationScope { - name: "perfetto-ingest".to_string(), - version: String::new(), - attributes: Vec::new(), - dropped_attributes_count: 0, - }), - log_records: vec![record], - schema_url: String::new(), - }], + resource: Some(Resource { attributes: vec![KeyValue { key: "service.name".to_string(), value: Some(AnyValue { value: Some(Value::StringValue("perfetto".to_string())) }) }], dropped_attributes_count: 0, entity_refs: Vec::new() }), + scope_logs: vec![ScopeLogs { scope: Some(InstrumentationScope { name: "perfetto-ingest".to_string(), version: String::new(), attributes: Vec::new(), dropped_attributes_count: 0 }), log_records: vec![record], schema_url: String::new() }], schema_url: String::new(), }], }; - let payload = request.encode_to_vec(); - unsafe { emit(plugin, crate::LJ_INGEST_RECORD_TYPE_LOGS, now_ns, &payload) }; + unsafe { emit(plugin, crate::LJ_INGEST_RECORD_TYPE_LOGS, ts, &payload) }; } From 5b117dcad159ad2830801c0078f94b34e7e82ae6 Mon Sep 17 00:00:00 2001 From: Bo Maryniuk Date: Thu, 7 May 2026 14:01:27 +0000 Subject: [PATCH 07/11] Harden unit tests --- plugins/perfetto-ingest/tests/unit/tests.rs | 108 ++++++++++++++++++++ 1 file changed, 108 insertions(+) diff --git a/plugins/perfetto-ingest/tests/unit/tests.rs b/plugins/perfetto-ingest/tests/unit/tests.rs index f3dfa5c..f8bdfe1 100644 --- a/plugins/perfetto-ingest/tests/unit/tests.rs +++ b/plugins/perfetto-ingest/tests/unit/tests.rs @@ -163,6 +163,114 @@ fn temp_json(name: &str, content: &str) -> std::path::PathBuf { path } +// realistic integration tests — timestamps match real Perfetto scale + +/// Realistic DB with trace-scale timestamps (~10^14) and epoch clock_values (~10^18). +/// Records from different types overlap in time, testing that the sort-before-emit +/// pipeline produces strictly monotonic output regardless of mapper order. +fn realistic_db() -> super::sqlite_reader::PerfettoDb { + let conn = rusqlite::Connection::open_in_memory().unwrap(); + conn.execute_batch( + " + CREATE TABLE slice (id INTEGER, ts INTEGER, dur INTEGER, name TEXT, parent_id INTEGER, track_id INTEGER, arg_set_id INTEGER, depth INTEGER); + CREATE TABLE sched_slice (id INTEGER, ts INTEGER, dur INTEGER, utid INTEGER, ucpu INTEGER, end_state TEXT); + CREATE TABLE thread_state (id INTEGER, ts INTEGER, dur INTEGER, utid INTEGER, state TEXT, io_wait INTEGER, blocked_function TEXT, waker_utid INTEGER, cpu INTEGER); + CREATE TABLE ftrace_event (id INTEGER, ts INTEGER, name TEXT, cpu INTEGER, utid INTEGER); + CREATE TABLE spurious_sched_wakeup (id INTEGER, ts INTEGER, utid INTEGER, waker_utid INTEGER); + CREATE TABLE instant (ts INTEGER, track_id INTEGER, name TEXT); + CREATE TABLE thread (utid INTEGER, name TEXT, tid INTEGER, upid INTEGER, is_main_thread INTEGER); + CREATE TABLE process (upid INTEGER, name TEXT, pid INTEGER); + CREATE TABLE __intrinsic_track (id INTEGER, name TEXT, type TEXT, utid INTEGER, upid INTEGER); + CREATE TABLE clock_snapshot (ts INTEGER, clock_value INTEGER, clock_id INTEGER, clock_name TEXT); + -- Overlapping time ranges in Perfetto trace scale (~10^14 ns = microseconds). + INSERT INTO slice VALUES (1, 300_000_000, 50, 'mid-span', NULL, 10, NULL, 0); + INSERT INTO sched_slice VALUES (1, 500_000_000, 20, 5, 2, 'R'); + INSERT INTO thread_state VALUES (1, 100_000_000, 50, 3, 'S', NULL, NULL, NULL, 1); + INSERT INTO thread_state VALUES (2, 400_000_000, 80, 5, 'R', NULL, NULL, NULL, 2); + INSERT INTO ftrace_event VALUES (1, 200_000_000, 'sched_switch', 1, 3); + INSERT INTO spurious_sched_wakeup VALUES (1, 350_000_000, 5, 3); + INSERT INTO instant VALUES (150_000_000, 10, 'early'); + INSERT INTO thread VALUES (3, 'main', 100, 1, 1), (5, 'worker', 200, 1, 0); + INSERT INTO process VALUES (1, 'test', 1000); + INSERT INTO clock_snapshot VALUES (0, 1700000000000000000, 1, 'REALTIME'); + ", + ).unwrap(); + super::sqlite_reader::PerfettoDb { conn } +} + +#[test] +fn log_mapper_produces_monotonic_timestamps_with_realistic_data() { + let db = realistic_db(); + let snaps = vec![crate::sqlite_reader::PerfettoClockSnapshot { ts: 0, clock_value: 1_700_000_000_000_000_000 }]; + let converter = timestamp::TimestampConverter::new(snaps, timestamp::TimestampPolicy::BestEffort); + + // Collect emits into a buffer, sort, then verify monotonic. + EMIT_BUF.with(|buf| buf.borrow_mut().clear()); + log_mapper::map_logs(&db, &converter, super::buffer_emit, &dummy_plugin(dummy_emit)).unwrap(); + + let mut all: Vec<(u32, u64, Vec)> = Vec::new(); + EMIT_BUF.with(|buf| all = std::mem::take(&mut *buf.borrow_mut())); + all.sort_by_key(|(_, ts, _)| *ts); + + assert!(!all.is_empty(), "expected records from realistic DB"); + + // Verify monotonic. + for w in all.windows(2) { + assert!(w[0].1 <= w[1].1, "non-monotonic: {} then {} (delta={})", w[0].1, w[1].1, w[1].1 as i128 - w[0].1 as i128); + } +} + +#[test] +fn full_pipeline_sorts_across_mappers_monotonically() { + let db = realistic_db(); + let snaps = vec![crate::sqlite_reader::PerfettoClockSnapshot { ts: 0, clock_value: 1_700_000_000_000_000_000 }]; + let converter = timestamp::TimestampConverter::new(snaps, timestamp::TimestampPolicy::BestEffort); + + EMIT_BUF.with(|buf| buf.borrow_mut().clear()); + + // Run mappers in varying order — traces first, then logs (opposite of monotonic order). + // The buffer should sort everything before emitting. + let _ = trace_mapper::map_traces(&db, &converter, super::buffer_emit, &dummy_plugin(dummy_emit)); + let _ = log_mapper::map_logs(&db, &converter, super::buffer_emit, &dummy_plugin(dummy_emit)); + + let mut all: Vec<(u32, u64, Vec)> = Vec::new(); + EMIT_BUF.with(|buf| all = std::mem::take(&mut *buf.borrow_mut())); + all.sort_by_key(|(_, ts, _)| *ts); + + assert!(!all.is_empty()); + for w in all.windows(2) { + assert!(w[0].1 <= w[1].1, "non-monotonic across mappers: {} then {}", w[0].1, w[1].1); + } +} + +#[test] +fn realistic_timestamps_convert_without_truncation() { + let snaps = vec![crate::sqlite_reader::PerfettoClockSnapshot { ts: 0, clock_value: 1_700_000_000_000_000_000 }]; + let converter = timestamp::TimestampConverter::new(snaps, timestamp::TimestampPolicy::BestEffort); + + // Perfetto-scaled timestamp: typical trace time (~122 seconds in ns). + let trace_ts: i64 = 122_804_694_200; + let realtime = converter.to_realtime(trace_ts).unwrap().unwrap(); + assert_eq!(realtime, 1_700_000_000_000_000_000 + 122_804_694_200); + assert!(realtime > 1_700_000_000_000_000_000); +} + +#[test] +fn realistic_timestamps_maintain_monotonicity() { + let snaps = vec![crate::sqlite_reader::PerfettoClockSnapshot { ts: 0, clock_value: 1_700_000_000_000_000_000 }]; + let converter = timestamp::TimestampConverter::new(snaps, timestamp::TimestampPolicy::BestEffort); + + // Timestamps in increasing order as they would appear in a trace. + // These are in Perfetto trace scale (~10^14 = microseconds from trace start). + let times: [i64; 5] = [100_000_000, 200_000_000, 300_000_000, 500_000_000, 900_000_000]; + let mut prev: u64 = 0; + for ts in × { + let rt = converter.to_realtime(*ts).unwrap().unwrap(); + assert!(rt >= prev, "realtime {rt} should be >= prev {prev} for ts={ts}"); + prev = rt; + } +} + fn test_db() -> super::sqlite_reader::PerfettoDb { let conn = rusqlite::Connection::open_in_memory().unwrap(); From 5fe882f24e3eedb4ac35d97f30a48e6ef27d21e7 Mon Sep 17 00:00:00 2001 From: Bo Maryniuk Date: Thu, 7 May 2026 14:05:30 +0000 Subject: [PATCH 08/11] Bugfix: ljx view crash on "F" (filter) --- ljx/src/commands/view/render.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ljx/src/commands/view/render.rs b/ljx/src/commands/view/render.rs index 80ab4cd..5f08823 100644 --- a/ljx/src/commands/view/render.rs +++ b/ljx/src/commands/view/render.rs @@ -680,7 +680,8 @@ impl ViewApp { }; let body_height = filtered_sev.len().max(filtered_svc.len()).max(1) as u16; - let popup_h = (body_height + 4).clamp(20, screen.height * 60 / 100); + let max_h = (screen.height * 60 / 100).max(20); + let popup_h = (body_height + 4).min(max_h); let popup_w = (screen.width * 60 / 100).max(40); let area = Rect::new(screen.width.saturating_sub(popup_w) / 2, screen.height * 20 / 100, popup_w, popup_h); frame.render_widget(Clear, area); From 3cfa0fb3727a9d9fca73cad697488497c5741093 Mon Sep 17 00:00:00 2001 From: Bo Maryniuk Date: Thu, 7 May 2026 14:06:12 +0000 Subject: [PATCH 09/11] Drop garbage example --- demo/perfetto/linux-data-record/trace.pftrace | Bin 1121 -> 0 bytes 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 demo/perfetto/linux-data-record/trace.pftrace diff --git a/demo/perfetto/linux-data-record/trace.pftrace b/demo/perfetto/linux-data-record/trace.pftrace deleted file mode 100644 index 4439f9ba9b40ed0f78d202eeac98d889ca4f10f1..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1121 zcmd-|Fp3gjLu42Qo*X#< zmthvT-2&0W#lb4DdC`^LdqDkM9Gn8@C#~u|)Zie&5WvWVu;JWxVrECClAC7+baliE%%ny?{Jci0J3f$Y=@c<^nEO7QSL=VUh`SK)h*WW9aY<@!d}dx+z5s(nhq?lT6+-|+ z#)emnKNy*AFizinX4kpvua|^BU|hLm^|oKj7WRa5NrNmp#VR5I7vg}c;}UeZvHTIp zdym+egc!i_x}WhC)7HiY1|{c!)S|T1l9GIdGE-AM16}jP#8h*mR09hI4LH->GRZ92 z+|ta_(%2})+$hD+#KPRnFge*WG0`Z|(oEBWfh&QF)d%D+Asb~wLqpxrq|&^SQiWjO z0EK`cS6APFka(9!KSy6rXNBOz5(USCA_YT31tViCLo+J_BZbfqX9XhzBQqto3JbG% zGZQI6Gd&BSV?aiwr{<*=WhNUmC^RrO9Qd+?-+=?{13`y|b>P$m;;mv~5?}=S^ZC5J zplkpY;s6O9+qnYdb*PXBNa*o8aFjxYBtSx2uU|w`rNPKmw1bt=;ohDtAme7SL~wC( zv6tp$=jE5@QI!dF1Je(diIaFQFoB%$?$a|QXDBdo)q)&3i>ZuDD50UDK>(DB3V>LE zv4ataI64>=Iv5*(tOgb!267t&fEXm^0K^GE+`u@AQI3O2fkB9yfq{_$3Z#IXbQni~ cNrFj1M1#qI(Sp%|!2_7q5*RZW3m7UG0Vb<=a{vGU From feacfda7958e67d92bc3a0448c0114ad58d33ff6 Mon Sep 17 00:00:00 2001 From: Bo Maryniuk Date: Thu, 7 May 2026 14:13:04 +0000 Subject: [PATCH 10/11] Update perfetto demos docs and polish scripts --- demo/perfetto/README.md | 4 +-- {scripts => demo/perfetto}/build-perfetto.sh | 0 demo/perfetto/linux-data-record/README.md | 2 +- demo/perfetto/perfetto-to-logjet/README.md | 6 ++-- demo/perfetto/perfetto-to-logjet/run-demo.sh | 31 +++++++++++--------- doc/perfetto-ingest.md | 2 +- 6 files changed, 24 insertions(+), 21 deletions(-) rename {scripts => demo/perfetto}/build-perfetto.sh (100%) diff --git a/demo/perfetto/README.md b/demo/perfetto/README.md index 0f60ede..876182e 100644 --- a/demo/perfetto/README.md +++ b/demo/perfetto/README.md @@ -4,12 +4,12 @@ ```bash # From workspace root — downloads deps and builds all needed tools. -./scripts/build-perfetto.sh +./demo/perfetto/build-perfetto.sh ``` Or pass a custom source path: ```bash -./scripts/build-perfetto.sh /path/to/perfetto +./demo/perfetto/build-perfetto.sh /path/to/perfetto ``` The script installs missing system packages (git, python3, curl, tar), downloads diff --git a/scripts/build-perfetto.sh b/demo/perfetto/build-perfetto.sh similarity index 100% rename from scripts/build-perfetto.sh rename to demo/perfetto/build-perfetto.sh diff --git a/demo/perfetto/linux-data-record/README.md b/demo/perfetto/linux-data-record/README.md index 5667729..8f0c484 100644 --- a/demo/perfetto/linux-data-record/README.md +++ b/demo/perfetto/linux-data-record/README.md @@ -8,7 +8,7 @@ the trace processor for interactive inspection. Build the Perfetto tools (from workspace root): ```bash -./scripts/build-perfetto.sh +./demo/perfetto/build-perfetto.sh ``` The script finds tools via `PERFETTO_OUT` (default: `perfetto/out/linux_release`). diff --git a/demo/perfetto/perfetto-to-logjet/README.md b/demo/perfetto/perfetto-to-logjet/README.md index 1ebab43..5e81f4d 100644 --- a/demo/perfetto/perfetto-to-logjet/README.md +++ b/demo/perfetto/perfetto-to-logjet/README.md @@ -8,7 +8,7 @@ plugin into a `.logjet` spool, and view the result in `ljx view`. ```bash # From workspace root make dev -./scripts/build-perfetto.sh +./demo/perfetto/build-perfetto.sh ``` ## Run @@ -48,5 +48,5 @@ Requires sudo for ftrace access. - **0 records**: The trace needs ftrace events — they require root. The script uses `sudo tracebox`. If passwordless sudo isn't configured, run `sudo ./run-demo.sh`. -- **Fewer records than expected in ljx view**: Delete stale index cache: - `rm -rf ~/.cache/ljx && ./run-demo.sh` +- **ljx view shows fewer records than expected**: The ljx index builder bug was fixed + in this PR. If you still see fewer records, delete `~/.cache/ljx/` and re-run. diff --git a/demo/perfetto/perfetto-to-logjet/run-demo.sh b/demo/perfetto/perfetto-to-logjet/run-demo.sh index 0e010b2..d6cc76f 100755 --- a/demo/perfetto/perfetto-to-logjet/run-demo.sh +++ b/demo/perfetto/perfetto-to-logjet/run-demo.sh @@ -27,7 +27,7 @@ done for bin in "$TRACED" "$TRACED_PROBES" "$TRACEBOX" "$TP"; do if [ ! -x "$bin" ]; then echo "missing $bin" - echo "build perfetto first with: ./scripts/build-perfetto.sh" + echo "build perfetto first with: ./demo/perfetto/build-perfetto.sh" exit 1 fi done @@ -125,27 +125,30 @@ LJD_PID=$! cleanup_ljd() { kill "$LJD_PID" 2>/dev/null || true wait "$LJD_PID" 2>/dev/null || true + rm -f "$CONFIG_FILE" } trap cleanup_ljd EXIT INT TERM -# Give the plugin time to finish processing (SQLite export + mapping takes a few seconds). -sleep 10 +# Poll until records appear (plugin finishes), up to 60s. +echo "Waiting for import..." +elapsed=0 +while [ "$elapsed" -lt 60 ]; do + if [ -f "$SPOOL_DIR/perfetto.logjet" ]; then + COUNT=$("$LJX" count "$SPOOL_DIR/perfetto.logjet" 2>/dev/null || echo "0") + if [ "$COUNT" -gt 0 ] 2>/dev/null; then + echo "Imported $COUNT records into $SPOOL_DIR/perfetto.logjet" + break + fi + fi + sleep 1 + elapsed=$((elapsed + 1)) +done + kill "$LJD_PID" 2>/dev/null || true wait "$LJD_PID" 2>/dev/null || true trap - EXIT INT TERM -rm -f "$CONFIG_FILE" - -if [ ! -f "$SPOOL_DIR/perfetto.logjet" ]; then - echo "No .logjet file produced." - exit 1 -fi - -RECORDS=$("$LJX" count "$SPOOL_DIR/perfetto.logjet" | tail -1) -echo "Imported $RECORDS records into $SPOOL_DIR/perfetto.logjet" -echo "" - # ── View the result ─────────────────────────────────────────────────────────── echo "Opening ljx view..." diff --git a/doc/perfetto-ingest.md b/doc/perfetto-ingest.md index 9121f88..251dc9c 100644 --- a/doc/perfetto-ingest.md +++ b/doc/perfetto-ingest.md @@ -24,7 +24,7 @@ callback. - Perfetto trace processor binary (`trace_processor` or `trace_processor_shell`). Build it from the bundled Perfetto source: ```bash - ./scripts/build-perfetto.sh + ./demo/perfetto/build-perfetto.sh ``` - A `.pftrace` trace file to import. From ac5fccae9ba71e2697e4d9686f885585129a5753 Mon Sep 17 00:00:00 2001 From: Bo Maryniuk Date: Thu, 7 May 2026 14:17:51 +0000 Subject: [PATCH 11/11] Update docs --- doc/perfetto-ingest.md | 89 ++++++++++++++++++++++++++---------------- 1 file changed, 56 insertions(+), 33 deletions(-) diff --git a/doc/perfetto-ingest.md b/doc/perfetto-ingest.md index 251dc9c..ebe2bf0 100644 --- a/doc/perfetto-ingest.md +++ b/doc/perfetto-ingest.md @@ -1,23 +1,27 @@ # Perfetto Ingest Plugin (`lj-perfetto-ingest`) Imports Perfetto trace files (`.pftrace` / `.perfetto-trace`) into the logjet -ecosystem as OTel traces, metrics, logs, and events. +ecosystem as OTel logs, traces, and metrics. ## Architecture ``` .pftrace ──→ trace_processor (spawned as subprocess) - ├── export sqlite ──→ sqlite_reader ──→ trace_mapper ──→ OTel spans + ├── export sqlite ──→ sqlite_reader ──→ trace_mapper ──→ OTel spans └── --run-metrics ──→ metrics_reader ──→ metric_mapper ──→ OTel metrics - log_mapper ──→ OTel logs + log_mapper ──→ OTel logs + │ + ▼ + buffer & sort by ts │ ▼ ljd spool (.logjet) ``` The plugin is an **active source** (`mode: 1`). ljd calls `lj_ingest_fetch()` once, -which runs the full pipeline and streams OTel payloads through the generic record -callback. +which runs the full pipeline. All records from traces, logs, and metrics are +collected, sorted by timestamp, then streamed through the generic record callback +to guarantee monotonic timestamps in the logjet block format. ## Requirements @@ -32,17 +36,27 @@ callback. ```bash # Build the plugin and ljd: -make build +make dev + +# Create a config file (ljd uses YAML config, not CLI flags): +cat > /tmp/perfetto.conf <