diff --git a/crates/beava-bench/Cargo.toml b/crates/beava-bench/Cargo.toml index fbc72632..4b5c4684 100644 --- a/crates/beava-bench/Cargo.toml +++ b/crates/beava-bench/Cargo.toml @@ -56,6 +56,10 @@ path = "src/lib.rs" name = "beava-bench" path = "src/bin/beava-bench.rs" +[[bin]] +name = "memprofile" +path = "src/bin/memprofile.rs" + [[bench]] name = "blast_shape_bench" harness = false diff --git a/crates/beava-bench/src/bin/memprofile.rs b/crates/beava-bench/src/bin/memprofile.rs new file mode 100644 index 00000000..cf7b243f --- /dev/null +++ b/crates/beava-bench/src/bin/memprofile.rs @@ -0,0 +1,1132 @@ +//! Per-AggOp memory profile report for realistic workload configs. + +use anyhow::{anyhow, Context, Result}; +use beava_core::agg_op::{AggExtParams, AggKind, AggOp, AggOpDescriptor, SketchParams}; +use beava_core::mem_usage::{MemBreakdown, MemProfile, MemUsage}; +use beava_core::row::{json_value_to_beava_value, Row, Value}; +use clap::Parser; +use serde_json::Value as JsonValue; +use std::collections::{BTreeMap, BTreeSet}; +use std::fs; +use std::path::PathBuf; + +#[derive(Debug, Parser)] +#[command(name = "memprofile", about = "Profile per-AggOp memory usage")] +struct Args { + #[arg(long, default_value = "fraud")] + workload: String, + #[arg(long, default_value_t = 2_000)] + events: u64, + #[arg(long, default_value = "memory-profile-fraud-team.md")] + output: PathBuf, + #[arg(long, default_value_t = beava_server::http_admin::BYTES_PER_ENTITY_P99_V0_PLACEHOLDER)] + metrics_bytes_per_entity_p99: u64, + #[arg(long, default_value_t = 0.15)] + tolerance: f64, +} + +#[derive(Debug, Clone)] +struct FeatureSpec { + source_events: Vec, + derivation: String, + feature: String, + op_name: String, + key_path: Vec, + desc: AggOpDescriptor, +} + +#[derive(Debug, Clone)] +struct ProfileRow { + source_events: Vec, + derivation: String, + entity_key: String, + entity_events: u64, + feature: String, + op_name: String, + key_path: Vec, + events_applied: u64, + shape: ProfileShape, + window_ms: Option, + profile: MemProfile, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +enum ProfileShape { + Lifetime, + Windowed, +} + +impl ProfileShape { + fn as_str(self) -> &'static str { + match self { + Self::Lifetime => "lifetime", + Self::Windowed => "windowed", + } + } +} + +struct ReportInput<'a> { + workload: &'a str, + events_requested: u64, + events_generated: u64, + events_by_source: &'a BTreeMap, + derivation_count: usize, + feature_count: usize, + active_entity_count: usize, + table_profiles: &'a [TableProfile], + rows: &'a [ProfileRow], + bytes_per_entity_p99: usize, + metrics_placeholder: u64, + tolerance: f64, +} + +#[derive(Debug, Clone)] +struct TableSpec { + source_events: Vec, + derivation: String, + key_path: Vec, + features: Vec, +} + +struct TableState { + spec: TableSpec, + entities: BTreeMap, + events_applied: u64, +} + +struct EntityState { + events_applied: u64, + features: Vec, +} + +struct ProfileSlot { + spec: FeatureSpec, + op: AggOp, + events_applied: u64, +} + +impl ProfileSlot { + fn new(spec: FeatureSpec) -> Self { + Self { + op: AggOp::new(&spec.desc), + spec, + events_applied: 0, + } + } +} + +impl EntityState { + fn new(features: &[FeatureSpec]) -> Self { + Self { + events_applied: 0, + features: features.iter().cloned().map(ProfileSlot::new).collect(), + } + } +} + +#[derive(Debug, Clone)] +struct EntityProfile { + entity_key: String, + events_applied: u64, + profile: MemProfile, + features: Vec, +} + +#[derive(Debug, Clone)] +struct FeatureSummary { + feature: String, + op_name: String, + shape: ProfileShape, + stack_bytes: usize, + heap_p50: usize, + heap_p99: usize, + heap_max: usize, + total_p50: usize, + total_p99: usize, + total_max: usize, +} + +#[derive(Debug, Clone)] +struct TableProfile { + source_events: Vec, + derivation: String, + key_path: Vec, + configured_features: usize, + active_entities: usize, + events_applied: u64, + stack_p50: usize, + stack_p99: usize, + stack_max: usize, + heap_p50: usize, + heap_p99: usize, + heap_max: usize, + total_p50: usize, + total_p99: usize, + total_max: usize, + feature_summaries: Vec, + entities: Vec, +} + +fn main() -> Result<()> { + let args = Args::parse(); + let report = build_report(&args)?; + fs::write(&args.output, report).with_context(|| format!("write {}", args.output.display()))?; + eprintln!("memprofile: wrote {}", args.output.display()); + Ok(()) +} + +fn build_report(args: &Args) -> Result { + let workload = beava_bench::workloads::load_by_name(&args.workload) + .with_context(|| format!("load workload {:?}", args.workload))?; + let features = feature_specs_from_register(&workload.register_payload)?; + let feature_count = features.len(); + let mut tables = table_states_from_features(features)?; + + let mut events_generated = 0; + let mut events_by_source = BTreeMap::new(); + for (idx, event) in (workload.event_generator)(args.events).enumerate() { + events_generated += 1; + *events_by_source + .entry(event.event_name.clone()) + .or_insert(0) += 1; + let now_ms = event_time_ms(&event.fields).unwrap_or(1_000_000 + idx as i64 * 1_000); + let row = row_from_fields(event.fields); + for table in tables + .iter_mut() + .filter(|table| matches_source(&table.spec.source_events, &event.event_name)) + { + let Some(entity_key) = entity_key_from_row(&row, &table.spec.key_path) else { + continue; + }; + let entity = table + .entities + .entry(entity_key) + .or_insert_with(|| EntityState::new(&table.spec.features)); + entity.events_applied += 1; + table.events_applied += 1; + for slot in &mut entity.features { + let field = slot.spec.desc.field.as_deref(); + slot.op.update(&row, now_ms, field, true); + slot.events_applied += 1; + } + } + } + + let (mut table_profiles, mut rows) = collect_table_profiles(tables); + + rows.sort_by(compare_profile_rows); + + table_profiles.sort_by(compare_table_profiles); + let active_entity_count = table_profiles.iter().map(|t| t.active_entities).sum(); + let all_entity_totals = table_profiles + .iter() + .flat_map(|table| { + table + .entities + .iter() + .map(|entity| entity.profile.total_bytes()) + }) + .collect::>(); + let bytes_per_entity_p99 = percentile_usize(all_entity_totals, 0.99); + Ok(render_markdown(ReportInput { + workload: &args.workload, + events_requested: args.events, + events_generated, + events_by_source: &events_by_source, + derivation_count: workload.derivations.len(), + feature_count, + active_entity_count, + table_profiles: &table_profiles, + rows: &rows, + bytes_per_entity_p99, + metrics_placeholder: args.metrics_bytes_per_entity_p99, + tolerance: args.tolerance, + })) +} + +fn render_markdown(input: ReportInput<'_>) -> String { + let mut out = String::new(); + out.push_str("# AggOp Memory Profile: fraud-team\n\n"); + out.push_str("## Workload Summary\n\n"); + out.push_str(&format!("- Workload: `{}`\n", input.workload)); + out.push_str(&format!( + "- Events requested from generator: `{}`\n", + input.events_requested + )); + out.push_str(&format!( + "- Events replayed from generator: `{}`\n", + input.events_generated + )); + out.push_str("- Events by source:\n"); + for (source, count) in input.events_by_source { + out.push_str(&format!(" - `{source}`: `{count}`\n")); + } + out.push_str(&format!( + "- Derivations discovered: `{}`\n", + input.derivation_count + )); + out.push_str(&format!( + "- Aggregate features discovered: `{}`\n", + input.feature_count + )); + out.push_str(&format!( + "- Active entity rows profiled: `{}`\n", + input.active_entity_count + )); + out.push_str(&format!( + "- Bytes per active entity row p99: `{}` bytes\n\n", + input.bytes_per_entity_p99 + )); + + out.push_str("## Per-Entity Table Footprint\n\n"); + out.push_str("| Rank | Table | Source | group_by key | Active entities | Features/entity | Events applied | Stack p50 | Stack p99 | Stack max | Heap p50 | Heap p99 | Heap max | Total p50 | Total p99 | Total max | Top contributor |\n"); + out.push_str("|------|-------|--------|--------------|-----------------|-----------------|----------------|-----------|-----------|-----------|----------|----------|----------|-----------|-----------|-----------|-----------------|\n"); + for (idx, table) in input.table_profiles.iter().enumerate() { + out.push_str(&format!( + "| {} | `{}` | `{}` | `{}` | {} | {} | {} | {} | {} | {} | {} | {} | {} | {} | {} | {} | `{}` |\n", + idx + 1, + table.derivation, + format_sources(&table.source_events), + format_key_path(&table.key_path), + table.active_entities, + table.configured_features, + table.events_applied, + table.stack_p50, + table.stack_p99, + table.stack_max, + table.heap_p50, + table.heap_p99, + table.heap_max, + table.total_p50, + table.total_p99, + table.total_max, + table + .feature_summaries + .first() + .map(|feature| feature.feature.as_str()) + .unwrap_or("-") + )); + } + + out.push_str("\n## Per-Table Entity Details\n\n"); + for table in input.table_profiles { + out.push_str(&format!( + "### `{}` (`{}` by `{}`)\n\n", + table.derivation, + format_sources(&table.source_events), + format_key_path(&table.key_path) + )); + if table.active_entities == 0 { + out.push_str(&format!( + "No active entity rows. Configured features: `{}`. The workload generator emitted no events for this table's source.\n\n", + table.configured_features + )); + continue; + } + + out.push_str("#### Feature Columns Across Entities\n\n"); + out.push_str("| Feature | Op | Shape | Stack bytes | Heap p50 | Heap p99 | Heap max | Total p50 | Total p99 | Total max |\n"); + out.push_str("|---------|----|-------|-------------|----------|----------|----------|-----------|-----------|-----------|\n"); + for feature in &table.feature_summaries { + out.push_str(&format!( + "| `{}` | `{}` | `{}` | {} | {} | {} | {} | {} | {} | {} |\n", + feature.feature, + feature.op_name, + feature.shape.as_str(), + feature.stack_bytes, + feature.heap_p50, + feature.heap_p99, + feature.heap_max, + feature.total_p50, + feature.total_p99, + feature.total_max + )); + } + + out.push_str("\n#### Largest Entity Rows\n\n"); + out.push_str("| Entity key | Events | Stack bytes | Heap bytes | Total bytes | Top feature contributors |\n"); + out.push_str("|------------|--------|-------------|------------|-------------|--------------------------|\n"); + for entity in table.entities.iter().take(5) { + out.push_str(&format!( + "| `{}` | {} | {} | {} | {} | {} |\n", + entity.entity_key, + entity.events_applied, + entity.profile.stack_bytes, + entity.profile.heap_bytes, + entity.profile.total_bytes(), + format_top_features(&entity.features, 3) + )); + } + + if let Some(entity) = table.entities.first() { + out.push_str(&format!( + "\n#### Feature Breakdown For Largest Entity `{}`\n\n", + entity.entity_key + )); + out.push_str("| Feature | Op | Shape | Events applied | Stack bytes | enum_slot_bytes | payload_bytes | slack_bytes | Heap bytes | Total bytes |\n"); + out.push_str("|---------|----|-------|----------------|-------------|-----------------|---------------|-------------|------------|-------------|\n"); + for feature in &entity.features { + out.push_str(&format!( + "| `{}` | `{}` | `{}` | {} | {} | {} | {} | {} | {} | {} |\n", + feature.feature, + feature.op_name, + feature.shape.as_str(), + feature.events_applied, + feature.profile.stack_bytes, + feature.profile.enum_slot_bytes, + feature.profile.payload_bytes, + feature.profile.slack_bytes, + feature.profile.heap_bytes, + feature.profile.total_bytes() + )); + } + } + out.push('\n'); + } + + out.push_str("## Top 5 Offenders\n\n"); + out.push_str("One heaviest entity-feature example per unique op.\n\n"); + for (idx, row) in top_unique_op_rows(input.rows, 5).iter().enumerate() { + out.push_str(&format!( + "### {}. `{}` / `{}` / `{}` / `{}`\n\n", + idx + 1, + format_sources(&row.source_events), + row.derivation, + row.feature, + row.op_name + )); + out.push_str(&format!( + "- Path: `{}` -> `{}` -> `{}` -> `{}` -> `{}`\n", + format_sources(&row.source_events), + row.derivation, + row.feature, + row.op_name, + row.shape.as_str() + )); + out.push_str(&format!("- Entity key: `{}`\n", row.entity_key)); + out.push_str(&format!("- Entity events: `{}`\n", row.entity_events)); + out.push_str(&format!( + "- Key path: `{}`\n", + format_key_path(&row.key_path) + )); + out.push_str(&format!("- Events applied: `{}`\n", row.events_applied)); + out.push_str(&format!( + "- Bytes: stack={} (enum_slot_bytes={} payload_bytes={} slack_bytes={}) heap={} total={}\n", + row.profile.stack_bytes, + row.profile.enum_slot_bytes, + row.profile.payload_bytes, + row.profile.slack_bytes, + row.profile.heap_bytes, + row.profile.total_bytes() + )); + out.push_str(&format!( + "- Shape: `{}`{}\n", + row.shape.as_str(), + format_window_suffix(row.window_ms) + )); + if row.shape == ProfileShape::Windowed { + out.push_str("- Breakdown rollup:\n"); + for entry in windowed_rollup(&row.profile.breakdown) { + out.push_str(&format!( + " - `{}`: {} bytes ({}, {})\n", + entry.label, entry.bytes, entry.kind, entry.note + )); + } + out.push_str("- Raw breakdown:\n"); + } else { + out.push_str("- Breakdown:\n"); + } + for entry in top_breakdown(&row.profile.breakdown, 8) { + out.push_str(&format!( + " - `{}`: {} bytes ({}, {})\n", + entry.label, entry.bytes, entry.kind, entry.note + )); + } + out.push('\n'); + } + + out.push_str("## Metrics Coherence\n\n"); + let target = input.metrics_placeholder as f64; + let observed = input.bytes_per_entity_p99 as f64; + let delta = (observed - target).abs(); + let allowed = target * input.tolerance; + out.push_str(&format!( + "- `/metrics` `beava_bytes_per_entity_p99`: `{}` bytes\n", + input.metrics_placeholder + )); + out.push_str(&format!( + "- Profile bytes-per-active-entity-row p99: `{}` bytes\n", + input.bytes_per_entity_p99 + )); + out.push_str(&format!("- Tolerance: `{:.1}%`\n", input.tolerance * 100.0)); + if delta <= allowed { + out.push_str( + "- Assertion: PASS - profile estimate is coherent with metrics placeholder.\n", + ); + } else { + out.push_str(&format!( + "- Assertion: bytes_per_entity_p99 diverged by {:.0} bytes; file sibling work to replace the static placeholder with live sampling.\n", + delta + )); + } + + out.push_str("\n## Notes\n\n"); + out.push_str("- `stack_bytes` is the inline `AggOp` enum slot for each feature.\n"); + out.push_str("- `enum_slot_bytes` is the fixed-size `AggOp` enum slot charged to a row; parent rows sum this across child paths.\n"); + out.push_str("- `payload_bytes` is the active variant payload inside the enum slot. For boxed variants this is the inline `Box` pointer, while the boxed pointee remains in `heap_bytes`.\n"); + out.push_str("- `slack_bytes` is unused capacity in the fixed-size `AggOp` enum slot: `enum_slot_bytes - payload_bytes`.\n"); + out.push_str("- Heap entries are deterministic structural counts; map/table allocator overhead is labeled as an estimate.\n"); + out.push_str("- Primary grain is `derivation table -> entity row -> feature column`; top offenders list one concrete entity-feature row per unique op.\n"); + out +} + +fn table_states_from_features(features: Vec) -> Result> { + let mut grouped: BTreeMap = BTreeMap::new(); + for feature in features { + let entry = grouped + .entry(feature.derivation.clone()) + .or_insert_with(|| TableSpec { + source_events: feature.source_events.clone(), + derivation: feature.derivation.clone(), + key_path: feature.key_path.clone(), + features: Vec::new(), + }); + if entry.source_events != feature.source_events || entry.key_path != feature.key_path { + return Err(anyhow!( + "derivation {:?} has inconsistent source/key path", + feature.derivation + )); + } + entry.features.push(feature); + } + Ok(grouped + .into_values() + .map(|mut spec| { + spec.features.sort_by(|a, b| a.feature.cmp(&b.feature)); + TableState { + spec, + entities: BTreeMap::new(), + events_applied: 0, + } + }) + .collect()) +} + +fn collect_table_profiles(tables: Vec) -> (Vec, Vec) { + let mut all_rows = Vec::new(); + let mut table_profiles = Vec::new(); + for table in tables { + let mut entities = Vec::new(); + for (entity_key, entity) in table.entities { + let mut entity_profile = MemProfile::new(entity_key.clone(), 0); + let mut feature_rows = Vec::with_capacity(entity.features.len()); + for slot in entity.features { + let spec = slot.spec; + let mut profile = slot.op.mem_profile(); + profile.label = format!( + "{}::{}[{}]::{} ({})", + format_sources(&spec.source_events), + spec.derivation, + entity_key, + spec.feature, + spec.op_name + ); + add_profile_totals(&mut entity_profile, &profile); + let shape = profile_shape(&spec.desc); + let row = ProfileRow { + source_events: spec.source_events, + derivation: spec.derivation, + entity_key: entity_key.clone(), + entity_events: entity.events_applied, + feature: spec.feature, + op_name: spec.op_name, + key_path: spec.key_path, + events_applied: slot.events_applied, + window_ms: spec.desc.window_ms, + shape, + profile, + }; + feature_rows.push(row.clone()); + all_rows.push(row); + } + feature_rows.sort_by(compare_profile_rows); + entities.push(EntityProfile { + entity_key, + events_applied: entity.events_applied, + profile: entity_profile, + features: feature_rows, + }); + } + entities.sort_by(compare_entity_profiles); + let feature_summaries = feature_summaries_for_table(&entities); + let stack_values = entities + .iter() + .map(|entity| entity.profile.stack_bytes) + .collect::>(); + let heap_values = entities + .iter() + .map(|entity| entity.profile.heap_bytes) + .collect::>(); + let total_values = entities + .iter() + .map(|entity| entity.profile.total_bytes()) + .collect::>(); + table_profiles.push(TableProfile { + source_events: table.spec.source_events, + derivation: table.spec.derivation, + key_path: table.spec.key_path, + configured_features: table.spec.features.len(), + active_entities: entities.len(), + events_applied: table.events_applied, + stack_p50: percentile_usize(stack_values.clone(), 0.50), + stack_p99: percentile_usize(stack_values.clone(), 0.99), + stack_max: stack_values.into_iter().max().unwrap_or(0), + heap_p50: percentile_usize(heap_values.clone(), 0.50), + heap_p99: percentile_usize(heap_values.clone(), 0.99), + heap_max: heap_values.into_iter().max().unwrap_or(0), + total_p50: percentile_usize(total_values.clone(), 0.50), + total_p99: percentile_usize(total_values.clone(), 0.99), + total_max: total_values.into_iter().max().unwrap_or(0), + feature_summaries, + entities, + }); + } + (table_profiles, all_rows) +} + +fn feature_summaries_for_table(entities: &[EntityProfile]) -> Vec { + let mut grouped: BTreeMap<(String, String, ProfileShape), Vec<&ProfileRow>> = BTreeMap::new(); + for entity in entities { + for feature in &entity.features { + grouped + .entry(( + feature.feature.clone(), + feature.op_name.clone(), + feature.shape, + )) + .or_default() + .push(feature); + } + } + let mut summaries = grouped + .into_iter() + .map(|((feature, op_name, shape), rows)| { + let stack_values = rows + .iter() + .map(|row| row.profile.stack_bytes) + .collect::>(); + let heap_values = rows + .iter() + .map(|row| row.profile.heap_bytes) + .collect::>(); + let total_values = rows + .iter() + .map(|row| row.profile.total_bytes()) + .collect::>(); + FeatureSummary { + feature, + op_name, + shape, + stack_bytes: percentile_usize(stack_values, 0.99), + heap_p50: percentile_usize(heap_values.clone(), 0.50), + heap_p99: percentile_usize(heap_values.clone(), 0.99), + heap_max: heap_values.into_iter().max().unwrap_or(0), + total_p50: percentile_usize(total_values.clone(), 0.50), + total_p99: percentile_usize(total_values.clone(), 0.99), + total_max: total_values.into_iter().max().unwrap_or(0), + } + }) + .collect::>(); + summaries.sort_by(|a, b| { + b.total_p99 + .cmp(&a.total_p99) + .then_with(|| b.heap_p99.cmp(&a.heap_p99)) + .then_with(|| a.feature.cmp(&b.feature)) + }); + summaries +} + +fn add_profile_totals(total: &mut MemProfile, profile: &MemProfile) { + total.stack_bytes += profile.stack_bytes; + total.enum_slot_bytes += profile.enum_slot_bytes; + total.payload_bytes += profile.payload_bytes; + total.slack_bytes += profile.slack_bytes; + total.heap_bytes += profile.heap_bytes; + total.breakdown.extend(profile.breakdown.clone()); +} + +fn percentile_usize(mut values: Vec, q: f64) -> usize { + if values.is_empty() { + return 0; + } + values.sort_unstable(); + let rank = ((values.len() as f64) * q).ceil() as usize; + let idx = rank.saturating_sub(1).min(values.len() - 1); + values[idx] +} + +fn compare_table_profiles(a: &TableProfile, b: &TableProfile) -> std::cmp::Ordering { + b.total_p99 + .cmp(&a.total_p99) + .then_with(|| b.heap_p99.cmp(&a.heap_p99)) + .then_with(|| b.active_entities.cmp(&a.active_entities)) + .then_with(|| a.derivation.cmp(&b.derivation)) +} + +fn compare_entity_profiles(a: &EntityProfile, b: &EntityProfile) -> std::cmp::Ordering { + b.profile + .total_bytes() + .cmp(&a.profile.total_bytes()) + .then_with(|| b.events_applied.cmp(&a.events_applied)) + .then_with(|| a.entity_key.cmp(&b.entity_key)) +} + +fn matches_source(sources: &[String], event_name: &str) -> bool { + sources.is_empty() || sources.iter().any(|source| source == event_name) +} + +fn entity_key_from_row(row: &Row, key_path: &[String]) -> Option { + if key_path.is_empty() { + return Some("".to_string()); + } + key_path + .iter() + .map(|key| row.get(key).map(value_key_part)) + .collect::>>() + .map(|parts| parts.join("|")) +} + +fn value_key_part(value: &Value) -> String { + match value { + Value::Null => "null".to_string(), + Value::Str(s) => s.to_string(), + Value::I64(v) => v.to_string(), + Value::F64(v) => format!("{v}"), + Value::Bool(v) => v.to_string(), + Value::Bytes(bytes) => format!("<{} bytes>", bytes.len()), + Value::Datetime(v) => v.to_string(), + Value::Json(v) => v.to_string(), + Value::List(values) => format!("", values.len()), + Value::Map(values) => format!("", values.len()), + } +} + +fn format_sources(sources: &[String]) -> String { + if sources.is_empty() { + "*".to_string() + } else { + sources.join("+") + } +} + +fn format_key_path(keys: &[String]) -> String { + if keys.is_empty() { + "-".to_string() + } else { + keys.join("+") + } +} + +fn format_top_features(features: &[ProfileRow], limit: usize) -> String { + features + .iter() + .take(limit) + .map(|feature| { + format!( + "`{}`={} bytes", + feature.feature, + feature.profile.total_bytes() + ) + }) + .collect::>() + .join(", ") +} + +fn top_breakdown(entries: &[MemBreakdown], limit: usize) -> Vec { + let mut entries = entries.to_vec(); + entries.sort_by(|a, b| b.bytes.cmp(&a.bytes).then_with(|| a.label.cmp(&b.label))); + entries.truncate(limit); + entries +} + +fn compare_profile_rows(a: &ProfileRow, b: &ProfileRow) -> std::cmp::Ordering { + b.profile + .total_bytes() + .cmp(&a.profile.total_bytes()) + .then_with(|| b.events_applied.cmp(&a.events_applied)) + .then_with(|| format_sources(&a.source_events).cmp(&format_sources(&b.source_events))) + .then_with(|| a.derivation.cmp(&b.derivation)) + .then_with(|| a.entity_key.cmp(&b.entity_key)) + .then_with(|| a.feature.cmp(&b.feature)) + .then_with(|| a.op_name.cmp(&b.op_name)) +} + +fn top_unique_op_rows(rows: &[ProfileRow], limit: usize) -> Vec<&ProfileRow> { + let mut seen = BTreeSet::new(); + let mut out = Vec::new(); + for row in rows { + if seen.insert(row.op_name.as_str()) { + out.push(row); + if out.len() == limit { + break; + } + } + } + out +} + +fn windowed_rollup(entries: &[MemBreakdown]) -> Vec { + let mut grouped: BTreeMap = BTreeMap::new(); + for entry in entries { + let Some((label, kind, note)) = windowed_rollup_bucket(entry) else { + continue; + }; + let slot = grouped.entry(label.clone()).or_insert(MemBreakdown { + label, + bytes: 0, + kind, + note, + }); + slot.bytes = slot.bytes.saturating_add(entry.bytes); + } + let mut rolled = grouped.into_values().collect::>(); + rolled.sort_by(|a, b| b.bytes.cmp(&a.bytes).then_with(|| a.label.cmp(&b.label))); + rolled +} + +fn windowed_rollup_bucket(entry: &MemBreakdown) -> Option<(String, String, String)> { + if entry.label == "Box" || entry.label == "WindowedOp spilled bucket SmallVec" { + return Some(( + "Windowed wrapper overhead".to_string(), + "WindowedOp".to_string(), + "summed boxed WindowedOp payload and spilled bucket storage".to_string(), + )); + } + if entry.label.starts_with("Windowed bucket ") && entry.label.ends_with(" Box") { + return Some(( + "Windowed bucket shell overhead".to_string(), + "Box".to_string(), + "summed boxed AggOp enum slots across active buckets".to_string(), + )); + } + let (_, nested) = entry.label.split_once(" / ")?; + Some(( + format!("{nested} across buckets"), + entry.kind.clone(), + "summed across active window buckets".to_string(), + )) +} + +fn profile_shape(desc: &AggOpDescriptor) -> ProfileShape { + if desc.window_ms.is_some() { + ProfileShape::Windowed + } else { + ProfileShape::Lifetime + } +} + +fn format_window_suffix(window_ms: Option) -> String { + window_ms + .map(|ms| format!(" ({})", format_duration_ms(ms))) + .unwrap_or_default() +} + +fn format_duration_ms(ms: u64) -> String { + match ms { + ms if ms % 86_400_000 == 0 => format!("{}d", ms / 86_400_000), + ms if ms % 3_600_000 == 0 => format!("{}h", ms / 3_600_000), + ms if ms % 60_000 == 0 => format!("{}m", ms / 60_000), + ms if ms % 1_000 == 0 => format!("{}s", ms / 1_000), + ms => format!("{ms}ms"), + } +} + +fn feature_specs_from_register(register: &JsonValue) -> Result> { + let nodes = register + .get("nodes") + .and_then(JsonValue::as_array) + .ok_or_else(|| anyhow!("register payload missing nodes[]"))?; + let mut out = Vec::new(); + for node in nodes + .iter() + .filter(|n| n.get("kind") == Some(&JsonValue::String("derivation".into()))) + { + let derivation = node + .get("name") + .and_then(JsonValue::as_str) + .unwrap_or("unknown_derivation") + .to_string(); + let source_events = string_array_field(node, "upstreams"); + let table_key_path = string_array_field(node, "table_primary_key"); + let Some(ops) = node.get("ops").and_then(JsonValue::as_array) else { + continue; + }; + for step in ops { + let Some(agg) = step.get("agg").and_then(JsonValue::as_object) else { + continue; + }; + let key_path = if table_key_path.is_empty() { + string_array_field(step, "keys") + } else { + table_key_path.clone() + }; + for (feature, spec) in agg { + let op_name = spec + .get("op") + .and_then(JsonValue::as_str) + .ok_or_else(|| anyhow!("feature {feature} missing op"))? + .to_string(); + let params = spec.get("params").unwrap_or(&JsonValue::Null); + out.push(FeatureSpec { + source_events: source_events.clone(), + derivation: derivation.clone(), + feature: feature.clone(), + desc: descriptor_from_op(&op_name, params)?, + op_name, + key_path: key_path.clone(), + }); + } + } + } + Ok(out) +} + +fn descriptor_from_op(op_name: &str, params: &JsonValue) -> Result { + let kind = agg_kind_from_name(op_name)?; + let mut desc = AggOpDescriptor { + kind, + field: string_param(params, "field").or_else(|| string_param(params, "expr")), + window_ms: duration_param(params, "window")?, + n: number_param(params, "n").map(|n| n as u32), + half_life_ms: duration_param(params, "half_life")?, + sub_window_ms: duration_param(params, "sub_window")?, + sigma: float_param(params, "sigma"), + sketch_params: Some(SketchParams { + percentile_q: float_param(params, "q"), + top_k_k: number_param(params, "k"), + bloom_capacity: number_param(params, "capacity"), + bloom_fpr: float_param(params, "fpr"), + }), + ext: AggExtParams { + buckets: array_f64_param(params, "buckets"), + n: number_param(params, "n"), + k: number_param(params, "k"), + precision: number_param(params, "precision").map(|n| n as u32), + lat_field: string_param(params, "lat"), + lon_field: string_param(params, "lon"), + samples: number_param(params, "samples"), + categories: string_array_param(params, "categories"), + max_categories: number_param(params, "max_categories"), + ..Default::default() + }, + ..Default::default() + }; + if desc.field.is_none() && matches!(kind, AggKind::BloomMember | AggKind::Entropy) { + desc.field = Some("__expr".into()); + } + Ok(desc) +} + +fn agg_kind_from_name(name: &str) -> Result { + let kind = match name { + "count" => AggKind::Count, + "sum" => AggKind::Sum, + "mean" => AggKind::Avg, + "min" => AggKind::Min, + "max" => AggKind::Max, + "var" => AggKind::Variance, + "std" => AggKind::StdDev, + "quantile" => AggKind::Percentile, + "n_unique" => AggKind::CountDistinct, + "top_k" => AggKind::TopK, + "bloom_member" => AggKind::BloomMember, + "entropy" => AggKind::Entropy, + "first" => AggKind::First, + "last" => AggKind::Last, + "first_n" => AggKind::FirstN, + "last_n" => AggKind::LastN, + "lag" => AggKind::Lag, + "first_seen" => AggKind::FirstSeen, + "last_seen" => AggKind::LastSeen, + "age" => AggKind::Age, + "has_seen" => AggKind::HasSeen, + "time_since" => AggKind::TimeSince, + "time_since_last_n" => AggKind::TimeSinceLastN, + "first_seen_in_window" => AggKind::FirstSeenInWindow, + "streak" => AggKind::Streak, + "max_streak" => AggKind::MaxStreak, + "negative_streak" => AggKind::NegativeStreak, + "ewma" => AggKind::Ewma, + "ewvar" => AggKind::EwVar, + "ew_zscore" => AggKind::EwZScore, + "decayed_sum" => AggKind::DecayedSum, + "decayed_count" => AggKind::DecayedCount, + "twa" => AggKind::Twa, + "rate_of_change" => AggKind::RateOfChange, + "inter_arrival_stats" => AggKind::InterArrivalStats, + "burst_count" => AggKind::BurstCount, + "delta_from_prev" => AggKind::DeltaFromPrev, + "trend" => AggKind::Trend, + "trend_residual" => AggKind::TrendResidual, + "outlier_count" => AggKind::OutlierCount, + "value_change_count" => AggKind::ValueChangeCount, + "z_score" => AggKind::ZScore, + "histogram" => AggKind::Histogram, + "hour_of_day_histogram" => AggKind::HourOfDayHistogram, + "dow_hour_histogram" => AggKind::DowHourHistogram, + "seasonal_deviation" => AggKind::SeasonalDeviation, + "event_type_mix" => AggKind::EventTypeMix, + "most_recent_n" => AggKind::MostRecentN, + "reservoir_sample" => AggKind::ReservoirSample, + "geo_velocity" => AggKind::GeoVelocity, + "geo_distance" => AggKind::GeoDistance, + "geo_spread" => AggKind::GeoSpread, + "distance_from_home" => AggKind::DistanceFromHome, + other => return Err(anyhow!("unsupported agg op {other:?}")), + }; + Ok(kind) +} + +fn row_from_fields(fields: serde_json::Map) -> Row { + fields.into_iter().fold(Row::new(), |row, (field, value)| { + row.with_field(&field, json_value_to_beava_value(value)) + }) +} + +fn event_time_ms(fields: &serde_json::Map) -> Option { + fields.get("event_time").and_then(JsonValue::as_i64) +} + +fn string_array_field(value: &JsonValue, key: &str) -> Vec { + value + .get(key) + .and_then(JsonValue::as_array) + .map(|items| { + items + .iter() + .filter_map(JsonValue::as_str) + .map(str::to_string) + .collect() + }) + .unwrap_or_default() +} + +fn string_param(params: &JsonValue, key: &str) -> Option { + params + .get(key) + .and_then(JsonValue::as_str) + .map(str::to_string) +} + +fn number_param(params: &JsonValue, key: &str) -> Option { + params + .get(key) + .and_then(JsonValue::as_u64) + .map(|n| n as usize) +} + +fn float_param(params: &JsonValue, key: &str) -> Option { + params.get(key).and_then(JsonValue::as_f64) +} + +fn array_f64_param(params: &JsonValue, key: &str) -> Option> { + params.get(key)?.as_array().map(|items| { + items + .iter() + .filter_map(JsonValue::as_f64) + .collect::>() + }) +} + +fn string_array_param(params: &JsonValue, key: &str) -> Option> { + params.get(key)?.as_array().map(|items| { + items + .iter() + .filter_map(JsonValue::as_str) + .map(str::to_string) + .collect::>() + }) +} + +fn duration_param(params: &JsonValue, key: &str) -> Result> { + let Some(raw) = params.get(key).and_then(JsonValue::as_str) else { + return Ok(None); + }; + parse_duration_ms(raw) + .map(Some) + .with_context(|| format!("parse duration param {key}={raw:?}")) +} + +fn parse_duration_ms(raw: &str) -> Result { + let raw = raw.trim(); + let split_at = raw + .find(|c: char| !c.is_ascii_digit()) + .ok_or_else(|| anyhow!("duration missing unit: {raw}"))?; + let (n, unit) = raw.split_at(split_at); + let n: u64 = n.parse()?; + let multiplier = match unit { + "ms" => 1, + "s" => 1_000, + "m" => 60_000, + "h" => 3_600_000, + "d" => 86_400_000, + _ => return Err(anyhow!("unsupported duration unit {unit:?}")), + }; + Ok(n.saturating_mul(multiplier)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn memprofile_duration_parser_handles_fraud_units() { + assert_eq!(parse_duration_ms("10s").unwrap(), 10_000); + assert_eq!(parse_duration_ms("5m").unwrap(), 300_000); + assert_eq!(parse_duration_ms("24h").unwrap(), 86_400_000); + assert_eq!(parse_duration_ms("30d").unwrap(), 2_592_000_000); + } + + #[test] + fn memprofile_report_contains_required_sections() { + let args = Args { + workload: "fraud".into(), + events: 5, + output: PathBuf::from("/tmp/unused.md"), + metrics_bytes_per_entity_p99: 7_000, + tolerance: 0.15, + }; + let report = build_report(&args).unwrap(); + assert!(report.contains("# AggOp Memory Profile: fraud-team")); + assert!(report.contains("Events requested from generator: `5`")); + assert!(report.contains("Events replayed from generator: `5`")); + assert!(report.contains(" - `Txn`: `5`")); + assert!(!report.contains("Events replayed per op")); + assert!(report.contains("Active entity rows profiled:")); + assert!(report.contains("Bytes per active entity row p99:")); + assert!(report.contains("## Per-Entity Table Footprint")); + assert!(report.contains( + "| Rank | Table | Source | group_by key | Active entities | Features/entity | Events applied | Stack p50 | Stack p99 | Stack max | Heap p50 | Heap p99 | Heap max | Total p50 | Total p99 | Total max | Top contributor |" + )); + assert!(report.contains("`TxnByUser` | `Txn` | `user_id`")); + assert!(report.contains("## Per-Table Entity Details")); + assert!(report.contains("### `TxnByUser` (`Txn` by `user_id`)")); + assert!(report.contains("#### Feature Columns Across Entities")); + assert!(report.contains("#### Largest Entity Rows")); + assert!(report.contains("#### Feature Breakdown For Largest Entity")); + assert!( + report.contains("The workload generator emitted no events for this table's source.") + ); + assert!(!report.contains("## Sorted Op Table")); + assert!(!report.contains("## Sorted Op Entity-Feature Details")); + assert!(report.contains("## Top 5 Offenders")); + assert!(report.contains("One heaviest entity-feature example per unique op.")); + assert!(!report.contains("- Recommendation:")); + assert!(report.contains("## Metrics Coherence")); + assert!(report.contains("Aggregate features discovered: `111`")); + assert!(report.contains("`txn_count_lifetime` | `count` | `lifetime` | 1 |")); + assert!(report.contains("- Entity key:")); + assert!(report.contains("- Entity events:")); + assert!(report.contains("- Events applied: `1`")); + } +} diff --git a/crates/beava-bench/tests/memprofile_smoke.rs b/crates/beava-bench/tests/memprofile_smoke.rs new file mode 100644 index 00000000..29f1cda5 --- /dev/null +++ b/crates/beava-bench/tests/memprofile_smoke.rs @@ -0,0 +1,64 @@ +use assert_cmd::Command; + +#[test] +fn memprofile_smoke_writes_required_sections() { + let temp = tempfile::tempdir().expect("tempdir"); + let output = temp.path().join("memory-profile-fraud-team.md"); + + Command::cargo_bin("memprofile") + .expect("memprofile binary") + .args([ + "--workload", + "fraud", + "--events", + "50", + "--output", + output.to_str().expect("utf8 path"), + ]) + .assert() + .success(); + + let report = std::fs::read_to_string(output).expect("read report"); + assert!(report.contains("# AggOp Memory Profile: fraud-team")); + assert!(report.contains("Events requested from generator: `50`")); + assert!(report.contains("Events replayed from generator: `50`")); + assert!(report.contains(" - `Txn`: `50`")); + assert!(!report.contains("Events replayed per op")); + assert!(report.contains("Active entity rows profiled:")); + assert!(report.contains("Bytes per active entity row p99:")); + assert!(report.contains("## Per-Entity Table Footprint")); + assert!(report.contains( + "| Rank | Table | Source | group_by key | Active entities | Features/entity | Events applied | Stack p50 | Stack p99 | Stack max | Heap p50 | Heap p99 | Heap max | Total p50 | Total p99 | Total max | Top contributor |" + )); + assert!(report.contains("`TxnByUser` | `Txn` | `user_id`")); + assert!(report.contains("## Per-Table Entity Details")); + assert!(report.contains("### `TxnByUser` (`Txn` by `user_id`)")); + assert!(report.contains("#### Feature Columns Across Entities")); + assert!(report.contains("#### Largest Entity Rows")); + assert!(report.contains("#### Feature Breakdown For Largest Entity")); + assert!(report.contains("The workload generator emitted no events for this table's source.")); + assert!(!report.contains("## Sorted Op Table")); + assert!(!report.contains("## Sorted Op Entity-Feature Details")); + assert!(report.contains("## Top 5 Offenders")); + assert!(report.contains("One heaviest entity-feature example per unique op.")); + assert!(!report.contains("- Recommendation:")); + assert!(report.contains("## Metrics Coherence")); + assert!(report.contains("Aggregate features discovered: `111`")); + assert!(report.contains("enum_slot_bytes")); + assert!(report.contains("payload_bytes")); + assert!(report.contains("slack_bytes")); + assert!(report + .contains("`txn_count_lifetime` | `count` | `lifetime` | 1 | 80 | 80 | 8 | 72 | 0 | 80 |")); + assert!(report.contains("- Path: `Txn` ->")); + assert!(report.contains("- Entity key:")); + assert!(report.contains("- Entity events:")); + assert!(report.contains("- Events applied: `1`")); + assert!(report.contains("stack=80 (enum_slot_bytes=80 payload_bytes=")); + assert!(report.contains("`windowed`")); + assert!(report.contains("`lifetime`")); + assert!(report.contains("- Breakdown rollup:")); + assert!(report.contains("Windowed wrapper overhead")); + assert!(report.contains("`slack_bytes` is unused capacity in the fixed-size `AggOp` enum slot")); + assert!(!report.contains("CountDistinct owned internals")); + assert!(!report.contains("enum overhead")); +} diff --git a/crates/beava-core/src/lib.rs b/crates/beava-core/src/lib.rs index b7a097e9..2694dd24 100644 --- a/crates/beava-core/src/lib.rs +++ b/crates/beava-core/src/lib.rs @@ -19,6 +19,7 @@ pub mod defaults; pub mod eval; pub mod expr; pub mod expr_builtins; +pub mod mem_usage; pub mod op_chain; pub mod op_node; pub mod register_validate; diff --git a/crates/beava-core/src/mem_usage.rs b/crates/beava-core/src/mem_usage.rs new file mode 100644 index 00000000..da9f23cc --- /dev/null +++ b/crates/beava-core/src/mem_usage.rs @@ -0,0 +1,942 @@ +//! Deterministic structural memory accounting for aggregation state. +//! +//! This is intentionally not allocator instrumentation. It reports stable, +//! platform-independent estimates from owned state shape: inline stack slots, +//! `Box` payloads, vector capacity, sketch backing stores, and documented +//! map overhead estimates. + +use crate::agg_op::AggOp; +use crate::agg_state::{ + BloomMemberStateWrap, CountDistinctStateWrap, EntropyStateWrap, PercentileStateWrap, + TopKStateWrap, +}; +use crate::row::Value; +use crate::sketches::cms::TopKValue; +use crate::sketches::count_distinct::CountDistinctState; +use crate::sketches::percentile::PercentileState; +use crate::sketches::top_k::TopKState; +use serde::Serialize; +use std::collections::VecDeque; +use std::mem::{size_of, size_of_val}; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct MemBreakdown { + pub label: String, + pub bytes: usize, + pub kind: String, + pub note: String, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct MemProfile { + pub label: String, + pub stack_bytes: usize, + pub enum_slot_bytes: usize, + pub payload_bytes: usize, + pub slack_bytes: usize, + pub heap_bytes: usize, + pub breakdown: Vec, +} + +impl MemProfile { + pub fn new(label: impl Into, stack_bytes: usize) -> Self { + Self { + label: label.into(), + stack_bytes, + enum_slot_bytes: stack_bytes, + payload_bytes: stack_bytes, + slack_bytes: 0, + heap_bytes: 0, + breakdown: Vec::new(), + } + } + + pub fn with_stack_composition(mut self, enum_slot_bytes: usize, payload_bytes: usize) -> Self { + self.stack_bytes = enum_slot_bytes; + self.enum_slot_bytes = enum_slot_bytes; + self.payload_bytes = payload_bytes; + self.slack_bytes = enum_slot_bytes.saturating_sub(payload_bytes); + self + } + + pub fn total_bytes(&self) -> usize { + self.stack_bytes + self.heap_bytes + } + + pub fn add_breakdown( + &mut self, + label: impl Into, + bytes: usize, + kind: impl Into, + note: impl Into, + ) { + if bytes == 0 { + return; + } + self.heap_bytes = self.heap_bytes.saturating_add(bytes); + self.breakdown.push(MemBreakdown { + label: label.into(), + bytes, + kind: kind.into(), + note: note.into(), + }); + } + + pub fn absorb_nested(&mut self, prefix: &str, nested: MemProfile) { + self.add_breakdown( + format!("{prefix} stack payload"), + nested.stack_bytes, + "nested_stack", + "nested AggOp stored inside an owned heap allocation", + ); + for entry in nested.breakdown { + self.add_breakdown( + format!("{prefix} / {}", entry.label), + entry.bytes, + entry.kind, + entry.note, + ); + } + } +} + +pub trait MemUsage { + fn mem_profile(&self) -> MemProfile; +} + +pub fn sort_profiles_desc(rows: &mut [MemProfile]) { + rows.sort_by(|a, b| { + b.total_bytes() + .cmp(&a.total_bytes()) + .then_with(|| a.label.cmp(&b.label)) + }); +} + +pub fn vec_heap_bytes(vec: &Vec) -> usize { + vec.capacity().saturating_mul(size_of::()) +} + +pub fn vecdeque_heap_bytes(deque: &VecDeque) -> usize { + deque.capacity().saturating_mul(size_of::()) +} + +pub fn serialized_heap_estimate(value: &T) -> usize { + serde_json::to_vec(value).map(|v| v.len()).unwrap_or(0) +} + +pub fn estimated_btree_map_heap_bytes(len: usize, key_value_bytes: usize) -> usize { + const BTREE_ENTRY_OVERHEAD: usize = 32; + len.saturating_mul(key_value_bytes.saturating_add(BTREE_ENTRY_OVERHEAD)) +} + +pub fn estimated_hash_map_heap_bytes(capacity: usize, key_value_bytes: usize) -> usize { + const HASH_ENTRY_OVERHEAD: usize = 24; + capacity.saturating_mul(key_value_bytes.saturating_add(HASH_ENTRY_OVERHEAD)) +} + +#[cfg(test)] +fn serde_profile(label: &str, value: &T) -> MemProfile { + let mut profile = MemProfile::new(label, size_of_val(value)); + profile.add_breakdown( + format!("{label} serialized owned state estimate"), + serialized_heap_estimate(value), + "estimate", + "deterministic serialized-size proxy for private owned heap fields", + ); + profile +} + +fn value_vec_breakdown(profile: &mut MemProfile, label: &str, values: &Vec) { + profile.add_breakdown( + label, + vec_heap_bytes(values), + "Vec", + "capacity * size_of::()", + ); +} + +fn value_deque_breakdown(profile: &mut MemProfile, label: &str, values: &VecDeque) { + profile.add_breakdown( + label, + vecdeque_heap_bytes(values), + "VecDeque", + "capacity * size_of::()", + ); +} + +fn add_box_allocation( + profile: &mut MemProfile, + label: impl Into, + bytes: usize, + note: impl Into, +) { + profile.add_breakdown(label, bytes, "Box", note); +} + +fn add_string_breakdown(profile: &mut MemProfile, label: impl Into, value: &String) { + profile.add_breakdown( + label, + value.capacity(), + "String", + "capacity bytes for owned string buffer", + ); +} + +fn add_count_distinct_breakdown(profile: &mut MemProfile, state: &CountDistinctStateWrap) { + add_box_allocation( + profile, + "Box", + size_of_val(state), + "heap allocation for boxed CountDistinct wrapper", + ); + match &state.inner { + CountDistinctState::ExactArray { values } => profile.add_breakdown( + "CountDistinct exact-array values", + vec_heap_bytes(values), + "Vec", + "capacity * size_of::() for exact distinct hashes", + ), + CountDistinctState::HashSet { .. } => profile.add_breakdown( + "CountDistinct hash-set slots", + state + .inner + .hash_set_capacity() + .unwrap_or(0) + .saturating_mul(16), + "HashSet", + "estimated hashbrown slot cost for u64 distinct hashes", + ), + CountDistinctState::Hll { sketch } => profile.add_breakdown( + "CountDistinct HLL registers", + sketch.register_capacity().saturating_mul(size_of::()), + "Vec", + "capacity * size_of::() for dense HLL registers", + ), + } +} + +fn add_percentile_breakdown(profile: &mut MemProfile, state: &PercentileStateWrap) { + add_box_allocation( + profile, + "Box", + size_of_val(state), + "heap allocation for boxed Percentile wrapper", + ); + match &state.inner { + PercentileState::Exact { values, .. } => profile.add_breakdown( + "Percentile exact samples", + vec_heap_bytes(values), + "Vec", + "capacity * size_of::() for exact percentile samples", + ), + PercentileState::Sketch { sketch } => { + profile.add_breakdown( + "UDDSketch positive buckets", + sketch + .positive_bucket_capacity() + .saturating_mul(size_of::<(i32, u64)>()), + "Vec", + "capacity * size_of::<(i32, u64)>() for positive UDDSketch buckets", + ); + profile.add_breakdown( + "UDDSketch negative buckets", + sketch + .negative_bucket_capacity() + .saturating_mul(size_of::<(i32, u64)>()), + "Vec", + "capacity * size_of::<(i32, u64)>() for negative UDDSketch buckets", + ); + } + } +} + +fn add_top_k_breakdown(profile: &mut MemProfile, state: &TopKStateWrap) { + add_box_allocation( + profile, + "Box", + size_of_val(state), + "heap allocation for boxed TopK wrapper", + ); + match &state.inner { + TopKState::Exact { counts, .. } => profile.add_breakdown( + "TopK exact BTreeMap entries", + estimated_btree_map_heap_bytes(counts.len(), 64), + "BTreeMap", + "estimated node overhead plus TopKValue/u64 payloads", + ), + TopKState::Hybrid { cms, heap, .. } => { + profile.add_breakdown( + "TopK count-min counters", + cms.counter_capacity().saturating_mul(size_of::()), + "Vec", + "capacity * size_of::() for count-min sketch counters", + ); + profile.add_breakdown( + "TopK heap entries", + heap.heap_capacity() + .saturating_mul(size_of::<(u64, TopKValue)>()), + "Vec", + "capacity * size_of::<(u64, TopKValue)>() for bounded top-k heap entries", + ); + profile.add_breakdown( + "TopK heap index map", + estimated_hash_map_heap_bytes( + heap.index_capacity_estimate(), + size_of::<(TopKValue, usize)>(), + ), + "AHashMap", + "estimated slot cost for TopK heap-position side index", + ); + } + } +} + +fn add_bloom_breakdown(profile: &mut MemProfile, state: &BloomMemberStateWrap) { + add_box_allocation( + profile, + "Box", + size_of_val(state), + "heap allocation for boxed Bloom wrapper", + ); + profile.add_breakdown( + "Bloom filter words", + state.inner.word_capacity().saturating_mul(size_of::()), + "Vec", + "capacity * size_of::() for bloom bit-array storage", + ); +} + +fn add_entropy_breakdown(profile: &mut MemProfile, state: &EntropyStateWrap) { + add_box_allocation( + profile, + "Box", + size_of_val(state), + "heap allocation for boxed Entropy wrapper", + ); + profile.add_breakdown( + "Entropy category map entries", + estimated_btree_map_heap_bytes(state.inner.category_count(), 48), + "BTreeMap", + "estimated node overhead plus String/u64 category payloads", + ); + profile.add_breakdown( + "Entropy category string capacity", + state.inner.key_capacity_bytes(), + "String", + "sum of tracked category string capacities", + ); +} + +impl MemUsage for AggOp { + fn mem_profile(&self) -> MemProfile { + let enum_slot_bytes = size_of::(); + let mut profile = MemProfile::new(aggop_label(self), enum_slot_bytes) + .with_stack_composition(enum_slot_bytes, aggop_payload_bytes(self)); + match self { + AggOp::Count(_) + | AggOp::Sum(_) + | AggOp::Avg(_) + | AggOp::Min(_) + | AggOp::Max(_) + | AggOp::Variance(_) + | AggOp::StdDev(_) + | AggOp::Ratio(_) + | AggOp::First(_) + | AggOp::Last(_) + | AggOp::FirstSeen(_) + | AggOp::LastSeen(_) + | AggOp::Age(_) + | AggOp::HasSeen(_) + | AggOp::TimeSince(_) + | AggOp::Streak(_) + | AggOp::MaxStreak(_) + | AggOp::NegativeStreak(_) + | AggOp::FirstSeenInWindow(_) + | AggOp::Ewma(_) + | AggOp::EwVar(_) + | AggOp::EwZScore(_) + | AggOp::DecayedSum(_) + | AggOp::DecayedCount(_) + | AggOp::Twa(_) + | AggOp::RateOfChange(_) + | AggOp::InterArrivalStats(_) + | AggOp::DeltaFromPrev(_) + | AggOp::Trend(_) + | AggOp::TrendResidual(_) + | AggOp::OutlierCount(_) + | AggOp::ValueChangeCount(_) + | AggOp::ZScore(_) => {} + + AggOp::FirstN(s) => value_vec_breakdown(&mut profile, "FirstN values", &s.values), + AggOp::LastN(s) => value_deque_breakdown(&mut profile, "LastN values", &s.values), + AggOp::Lag(s) => value_deque_breakdown(&mut profile, "Lag values", &s.values), + AggOp::TimeSinceLastN(s) => profile.add_breakdown( + "TimeSinceLastN timestamps", + vecdeque_heap_bytes(&s.times_ms), + "VecDeque", + "capacity * size_of::()", + ), + AggOp::BurstCount(op) => { + profile.add_breakdown( + "BurstCount buckets", + vec_heap_bytes(&op.state.buckets), + "Vec", + "capacity * size_of::()", + ); + profile.add_breakdown( + "BurstCount bucket_epoch", + vec_heap_bytes(&op.state.bucket_epoch), + "Vec", + "capacity * size_of::()", + ); + } + AggOp::Histogram(s) => { + profile.add_breakdown( + "Histogram split points", + vec_heap_bytes(&s.buckets), + "Vec", + "capacity * size_of::()", + ); + profile.add_breakdown( + "Histogram counts", + vec_heap_bytes(&s.counts), + "Vec", + "capacity * size_of::()", + ); + } + AggOp::DowHourHistogram(s) => profile.add_breakdown( + "DowHourHistogram counts", + vec_heap_bytes(&s.counts), + "Vec", + "capacity * size_of::()", + ), + AggOp::MostRecentN(s) => { + value_vec_breakdown(&mut profile, "MostRecentN buffer", &s.buf) + } + AggOp::ReservoirSample(s) => { + value_vec_breakdown(&mut profile, "ReservoirSample reservoir", &s.reservoir) + } + AggOp::EventTypeMix(s) => { + let boxed_bytes = size_of_val(&**s); + profile.add_breakdown( + "Box", + boxed_bytes, + "Box", + "heap allocation for boxed payload", + ); + profile.add_breakdown( + "EventTypeMix BTreeMap entries", + estimated_btree_map_heap_bytes(s.counts.len(), 48), + "BTreeMap", + "estimated node overhead plus String/u64 payload", + ); + if let Some(allowed) = &s.allowed { + profile.add_breakdown( + "EventTypeMix allowed categories", + vec_heap_bytes(allowed), + "Vec", + "capacity * size_of::()", + ); + } + } + AggOp::HourOfDayHistogram(s) => add_box_allocation( + &mut profile, + "Box", + size_of_val(&**s), + "heap allocation for fixed inline hour-of-day counts", + ), + AggOp::SeasonalDeviation(s) => add_box_allocation( + &mut profile, + "Box", + size_of_val(&**s), + "heap allocation for fixed inline per-hour buckets", + ), + AggOp::GeoVelocity(s) => { + add_box_allocation( + &mut profile, + "Box", + size_of_val(&**s), + "heap allocation for boxed payload", + ); + add_string_breakdown(&mut profile, "GeoVelocity lat_field", &s.lat_field); + add_string_breakdown(&mut profile, "GeoVelocity lon_field", &s.lon_field); + } + AggOp::GeoDistance(s) => { + add_box_allocation( + &mut profile, + "Box", + size_of_val(&**s), + "heap allocation for boxed payload", + ); + add_string_breakdown(&mut profile, "GeoDistance lat_field", &s.lat_field); + add_string_breakdown(&mut profile, "GeoDistance lon_field", &s.lon_field); + } + AggOp::GeoSpread(s) => { + add_box_allocation( + &mut profile, + "Box", + size_of_val(&**s), + "heap allocation for boxed payload", + ); + add_string_breakdown(&mut profile, "GeoSpread lat_field", &s.lat_field); + add_string_breakdown(&mut profile, "GeoSpread lon_field", &s.lon_field); + } + AggOp::DistanceFromHome(s) => { + profile.add_breakdown( + "Box", + size_of_val(&**s), + "Box", + "heap allocation for boxed payload", + ); + profile.add_breakdown( + "DistanceFromHome coordinate buffer", + vec_heap_bytes(&s.buf), + "Vec", + "capacity * size_of::<(f64, f64)>()", + ); + } + + AggOp::CountDistinct(s) => add_count_distinct_breakdown(&mut profile, s), + AggOp::Percentile(s) => add_percentile_breakdown(&mut profile, s), + AggOp::TopK(s) => add_top_k_breakdown(&mut profile, s), + AggOp::BloomMember(s) => add_bloom_breakdown(&mut profile, s), + AggOp::Entropy(s) => add_entropy_breakdown(&mut profile, s), + + AggOp::Windowed(w) => { + profile.add_breakdown( + "Box", + size_of_val(&**w), + "Box", + "heap allocation for boxed WindowedOp payload", + ); + if w.buckets.spilled() { + profile.add_breakdown( + "WindowedOp spilled bucket SmallVec", + w.buckets + .capacity() + .saturating_mul(size_of::<(i64, Box)>()), + "SmallVec", + "spilled capacity * size_of::<(i64, Box)>()", + ); + } + for (idx, (_, bucket)) in w.buckets.iter().enumerate() { + let nested = bucket.mem_profile(); + profile.add_breakdown( + format!("Windowed bucket {idx} Box"), + size_of::(), + "Box", + "heap allocation for bucket AggOp enum slot", + ); + for entry in nested.breakdown { + profile.add_breakdown( + format!("Windowed bucket {idx} / {}", entry.label), + entry.bytes, + entry.kind, + entry.note, + ); + } + } + } + } + profile + } +} + +fn aggop_payload_bytes(op: &AggOp) -> usize { + match op { + AggOp::Count(s) => size_of_val(s), + AggOp::Sum(s) => size_of_val(s), + AggOp::Avg(s) => size_of_val(s), + AggOp::Min(s) => size_of_val(s), + AggOp::Max(s) => size_of_val(s), + AggOp::Variance(s) => size_of_val(s), + AggOp::StdDev(s) => size_of_val(s), + AggOp::Ratio(s) => size_of_val(s), + AggOp::CountDistinct(s) => size_of_val(s), + AggOp::Percentile(s) => size_of_val(s), + AggOp::TopK(s) => size_of_val(s), + AggOp::BloomMember(s) => size_of_val(s), + AggOp::Entropy(s) => size_of_val(s), + AggOp::Windowed(s) => size_of_val(s), + AggOp::First(s) => size_of_val(s), + AggOp::Last(s) => size_of_val(s), + AggOp::FirstN(s) => size_of_val(s), + AggOp::LastN(s) => size_of_val(s), + AggOp::Lag(s) => size_of_val(s), + AggOp::FirstSeen(s) => size_of_val(s), + AggOp::LastSeen(s) => size_of_val(s), + AggOp::Age(s) => size_of_val(s), + AggOp::HasSeen(s) => size_of_val(s), + AggOp::TimeSince(s) => size_of_val(s), + AggOp::TimeSinceLastN(s) => size_of_val(s), + AggOp::Streak(s) => size_of_val(s), + AggOp::MaxStreak(s) => size_of_val(s), + AggOp::NegativeStreak(s) => size_of_val(s), + AggOp::FirstSeenInWindow(s) => size_of_val(s), + AggOp::Ewma(s) => size_of_val(s), + AggOp::EwVar(s) => size_of_val(s), + AggOp::EwZScore(s) => size_of_val(s), + AggOp::DecayedSum(s) => size_of_val(s), + AggOp::DecayedCount(s) => size_of_val(s), + AggOp::Twa(s) => size_of_val(s), + AggOp::RateOfChange(s) => size_of_val(s), + AggOp::InterArrivalStats(s) => size_of_val(s), + AggOp::BurstCount(s) => size_of_val(s), + AggOp::DeltaFromPrev(s) => size_of_val(s), + AggOp::Trend(s) => size_of_val(s), + AggOp::TrendResidual(s) => size_of_val(s), + AggOp::OutlierCount(s) => size_of_val(s), + AggOp::ValueChangeCount(s) => size_of_val(s), + AggOp::ZScore(s) => size_of_val(s), + AggOp::Histogram(s) => size_of_val(s), + AggOp::HourOfDayHistogram(s) => size_of_val(s), + AggOp::DowHourHistogram(s) => size_of_val(s), + AggOp::SeasonalDeviation(s) => size_of_val(s), + AggOp::EventTypeMix(s) => size_of_val(s), + AggOp::MostRecentN(s) => size_of_val(s), + AggOp::ReservoirSample(s) => size_of_val(s), + AggOp::GeoVelocity(s) => size_of_val(s), + AggOp::GeoDistance(s) => size_of_val(s), + AggOp::GeoSpread(s) => size_of_val(s), + AggOp::DistanceFromHome(s) => size_of_val(s), + } +} + +fn aggop_label(op: &AggOp) -> String { + match op { + AggOp::Count(_) => "Count", + AggOp::Sum(_) => "Sum", + AggOp::Avg(_) => "Avg", + AggOp::Min(_) => "Min", + AggOp::Max(_) => "Max", + AggOp::Variance(_) => "Variance", + AggOp::StdDev(_) => "StdDev", + AggOp::Ratio(_) => "Ratio", + AggOp::CountDistinct(_) => "CountDistinct", + AggOp::Percentile(_) => "Percentile", + AggOp::TopK(_) => "TopK", + AggOp::BloomMember(_) => "BloomMember", + AggOp::Entropy(_) => "Entropy", + AggOp::Windowed(_) => "Windowed", + AggOp::First(_) => "First", + AggOp::Last(_) => "Last", + AggOp::FirstN(_) => "FirstN", + AggOp::LastN(_) => "LastN", + AggOp::Lag(_) => "Lag", + AggOp::FirstSeen(_) => "FirstSeen", + AggOp::LastSeen(_) => "LastSeen", + AggOp::Age(_) => "Age", + AggOp::HasSeen(_) => "HasSeen", + AggOp::TimeSince(_) => "TimeSince", + AggOp::TimeSinceLastN(_) => "TimeSinceLastN", + AggOp::Streak(_) => "Streak", + AggOp::MaxStreak(_) => "MaxStreak", + AggOp::NegativeStreak(_) => "NegativeStreak", + AggOp::FirstSeenInWindow(_) => "FirstSeenInWindow", + AggOp::Ewma(_) => "Ewma", + AggOp::EwVar(_) => "EwVar", + AggOp::EwZScore(_) => "EwZScore", + AggOp::DecayedSum(_) => "DecayedSum", + AggOp::DecayedCount(_) => "DecayedCount", + AggOp::Twa(_) => "Twa", + AggOp::RateOfChange(_) => "RateOfChange", + AggOp::InterArrivalStats(_) => "InterArrivalStats", + AggOp::BurstCount(_) => "BurstCount", + AggOp::DeltaFromPrev(_) => "DeltaFromPrev", + AggOp::Trend(_) => "Trend", + AggOp::TrendResidual(_) => "TrendResidual", + AggOp::OutlierCount(_) => "OutlierCount", + AggOp::ValueChangeCount(_) => "ValueChangeCount", + AggOp::ZScore(_) => "ZScore", + AggOp::Histogram(_) => "Histogram", + AggOp::HourOfDayHistogram(_) => "HourOfDayHistogram", + AggOp::DowHourHistogram(_) => "DowHourHistogram", + AggOp::SeasonalDeviation(_) => "SeasonalDeviation", + AggOp::EventTypeMix(_) => "EventTypeMix", + AggOp::MostRecentN(_) => "MostRecentN", + AggOp::ReservoirSample(_) => "ReservoirSample", + AggOp::GeoVelocity(_) => "GeoVelocity", + AggOp::GeoDistance(_) => "GeoDistance", + AggOp::GeoSpread(_) => "GeoSpread", + AggOp::DistanceFromHome(_) => "DistanceFromHome", + } + .to_string() +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::agg_buffer::{HourOfDayHistogramState, SeasonalDeviationState}; + use crate::agg_geo::{GeoDistanceState, GeoSpreadState, GeoVelocityState}; + use crate::agg_op::{AggKind, AggOp, AggOpDescriptor}; + use crate::agg_state::{CountDistinctStateWrap, CountState, SumState}; + use crate::agg_state_velocity::TrendResidualState; + use crate::row::{Row, Value}; + + #[test] + fn mem_usage_total_bytes_adds_stack_and_heap() { + let mut profile = MemProfile::new("sample", 80); + profile.add_breakdown("vec", 32, "Vec", "test"); + profile.add_breakdown("box", 16, "Box", "test"); + assert_eq!(profile.total_bytes(), 128); + } + + #[test] + fn mem_usage_scalar_aggop_reports_enum_stack_slot() { + let profile = AggOp::Count(Default::default()).mem_profile(); + assert_eq!(profile.stack_bytes, size_of::()); + } + + #[test] + fn mem_usage_stack_composition_tracks_payload_and_slack() { + let profile = MemProfile::new("sample", 80).with_stack_composition(80, 8); + assert_eq!(profile.stack_bytes, 80); + assert_eq!(profile.enum_slot_bytes, 80); + assert_eq!(profile.payload_bytes, 8); + assert_eq!(profile.slack_bytes, 72); + assert_eq!(profile.total_bytes(), 80); + } + + #[test] + fn mem_usage_aggop_payload_size_uses_active_variant_payload() { + let count = AggOp::Count(CountState::default()).mem_profile(); + assert_eq!(count.stack_bytes, size_of::()); + assert_eq!(count.enum_slot_bytes, size_of::()); + assert_eq!(count.payload_bytes, size_of::()); + assert_eq!( + count.slack_bytes, + size_of::() - size_of::() + ); + + let sum = AggOp::Sum(SumState::default()).mem_profile(); + assert_eq!(sum.payload_bytes, size_of::()); + assert!(sum.slack_bytes > 0); + + let trend_residual = AggOp::TrendResidual(TrendResidualState::default()).mem_profile(); + assert_eq!( + trend_residual.payload_bytes, + size_of::() + ); + } + + #[test] + fn mem_usage_boxed_aggop_payload_is_pointer_not_pointee() { + let profile = + AggOp::CountDistinct(Box::new(CountDistinctStateWrap::default())).mem_profile(); + assert_eq!(profile.stack_bytes, size_of::()); + assert_eq!( + profile.payload_bytes, + size_of::>() + ); + assert!(profile.slack_bytes > 0); + assert!(profile + .breakdown + .iter() + .any(|b| b.label == "Box")); + } + + #[test] + fn mem_usage_boxed_sketch_reports_box_breakdown() { + let profile = AggOp::new(&AggOpDescriptor { + kind: AggKind::Percentile, + ..Default::default() + }) + .mem_profile(); + assert!(profile.breakdown.iter().any(|b| b.label.contains("Box"))); + } + + #[test] + fn mem_usage_fixed_boxed_ops_do_not_use_serialized_proxy() { + let ops = [ + AggOp::HourOfDayHistogram(Box::::default()), + AggOp::SeasonalDeviation(Box::::default()), + AggOp::GeoVelocity(Box::new(GeoVelocityState::with_fields( + "lat".into(), + "lon".into(), + ))), + AggOp::GeoDistance(Box::new(GeoDistanceState::with_fields( + "lat".into(), + "lon".into(), + ))), + AggOp::GeoSpread(Box::new(GeoSpreadState::with_fields( + "lat".into(), + "lon".into(), + ))), + ]; + for op in ops { + let profile = op.mem_profile(); + assert!( + !profile.breakdown.iter().any(|entry| { + entry.kind == "estimate" || entry.label.contains("owned internals") + }), + "{} should use field-aware exact accounting: {:?}", + profile.label, + profile.breakdown + ); + assert!( + profile.breakdown.iter().any(|entry| entry.kind == "Box"), + "{} should still report the boxed payload", + profile.label + ); + } + } + + #[test] + fn mem_usage_count_distinct_reports_mode_specific_components() { + let mut op = AggOp::new(&AggOpDescriptor { + kind: AggKind::CountDistinct, + field: Some("merchant_id".into()), + ..Default::default() + }); + for i in 0..32 { + let row = Row::new().with_field("merchant_id", Value::Str(format!("m{i}").into())); + op.update(&row, i as i64, Some("merchant_id"), true); + } + let profile = op.mem_profile(); + assert!(profile + .breakdown + .iter() + .any(|b| b.label == "CountDistinct hash-set slots")); + } + + #[test] + fn mem_usage_percentile_sketch_reports_uddsketch_vectors() { + let mut op = AggOp::new(&AggOpDescriptor { + kind: AggKind::Percentile, + field: Some("amount".into()), + ..Default::default() + }); + for i in 0..300 { + let row = Row::new().with_field("amount", Value::F64(i as f64 + 1.0)); + op.update(&row, i as i64, Some("amount"), true); + } + let profile = op.mem_profile(); + assert!(profile + .breakdown + .iter() + .any(|b| b.label == "UDDSketch positive buckets")); + } + + #[test] + fn mem_usage_top_k_hybrid_reports_cms_and_heap_components() { + let mut op = AggOp::new(&AggOpDescriptor { + kind: AggKind::TopK, + field: Some("merchant_id".into()), + ..Default::default() + }); + for i in 0..1100 { + let row = Row::new().with_field("merchant_id", Value::Str(format!("m{i}").into())); + op.update(&row, i as i64, Some("merchant_id"), true); + } + let profile = op.mem_profile(); + assert!(profile + .breakdown + .iter() + .any(|b| b.label == "TopK count-min counters")); + assert!(profile + .breakdown + .iter() + .any(|b| b.label == "TopK heap index map")); + } + + #[test] + fn mem_usage_bloom_reports_filter_words() { + let mut op = AggOp::new(&AggOpDescriptor { + kind: AggKind::BloomMember, + field: Some("email_domain".into()), + ..Default::default() + }); + let row = Row::new().with_field("email_domain", Value::Str("risk.test".into())); + op.update(&row, 1, Some("email_domain"), true); + let profile = op.mem_profile(); + assert!(profile + .breakdown + .iter() + .any(|b| b.label == "Bloom filter words")); + } + + #[test] + fn mem_usage_entropy_reports_category_map_components() { + let mut op = AggOp::new(&AggOpDescriptor { + kind: AggKind::Entropy, + field: Some("mcc".into()), + ..Default::default() + }); + for value in ["5411", "5732", "5812"] { + let row = Row::new().with_field("mcc", Value::Str(value.into())); + op.update(&row, 1, Some("mcc"), true); + } + let profile = op.mem_profile(); + assert!(profile + .breakdown + .iter() + .any(|b| b.label == "Entropy category map entries")); + assert!(profile + .breakdown + .iter() + .any(|b| b.label == "Entropy category string capacity")); + } + + #[test] + fn mem_usage_vector_backed_state_reports_capacity_bytes() { + let mut op = AggOp::new(&AggOpDescriptor { + kind: AggKind::FirstN, + field: Some("merchant_id".into()), + n: Some(5), + ..Default::default() + }); + let row = Row::new().with_field("merchant_id", Value::Str("m1".into())); + op.update(&row, 1, Some("merchant_id"), true); + let profile = op.mem_profile(); + assert!(profile + .breakdown + .iter() + .any(|b| b.label == "FirstN values" && b.bytes >= size_of::())); + } + + #[test] + fn mem_usage_windowed_state_reports_nested_bucket() { + let mut op = AggOp::new(&AggOpDescriptor { + kind: AggKind::Sum, + field: Some("amount".into()), + window_ms: Some(60_000), + ..Default::default() + }); + let row = Row::new().with_field("amount", Value::F64(42.0)); + op.update(&row, 1_000, Some("amount"), true); + let profile = op.mem_profile(); + assert!(profile + .breakdown + .iter() + .any(|b| b.label.contains("Windowed bucket 0 Box"))); + } + + #[test] + fn mem_usage_map_backed_state_labels_estimate() { + let mut op = AggOp::new(&AggOpDescriptor { + kind: AggKind::EventTypeMix, + field: Some("mcc".into()), + ..Default::default() + }); + let row = Row::new().with_field("mcc", Value::Str("5411".into())); + op.update(&row, 1, Some("mcc"), true); + let profile = op.mem_profile(); + assert!(profile + .breakdown + .iter() + .any(|b| b.note.contains("estimated"))); + } + + #[test] + fn mem_usage_sort_profiles_desc_orders_by_total_bytes() { + let mut rows = vec![MemProfile::new("small", 1), MemProfile::new("large", 10)]; + sort_profiles_desc(&mut rows); + assert_eq!(rows[0].label, "large"); + } + + #[test] + fn serde_profile_estimate_is_deterministic() { + let profile = serde_profile("value", &serde_json::json!({"a": 1})); + assert!(profile.heap_bytes > 0); + } +} diff --git a/crates/beava-core/src/sketches/bloom.rs b/crates/beava-core/src/sketches/bloom.rs index 9eb0079d..0462f1ae 100644 --- a/crates/beava-core/src/sketches/bloom.rs +++ b/crates/beava-core/src/sketches/bloom.rs @@ -50,6 +50,9 @@ impl BloomFilter { pub fn num_hashes(&self) -> u32 { self.num_hashes } + pub fn word_capacity(&self) -> usize { + self.words.capacity() + } fn base_hashes(&self, value: &str) -> (u64, u64) { // Use process-static RandomState instead of per-call diff --git a/crates/beava-core/src/sketches/cms.rs b/crates/beava-core/src/sketches/cms.rs index 068e842c..ec6b9c55 100644 --- a/crates/beava-core/src/sketches/cms.rs +++ b/crates/beava-core/src/sketches/cms.rs @@ -121,6 +121,10 @@ impl CountMinSketch { self.total } + pub fn counter_capacity(&self) -> usize { + self.counters.capacity() + } + #[inline] fn rehash(hash: u64, seed: u64) -> u64 { let mut h = hash ^ seed; @@ -357,6 +361,26 @@ impl TopKHeap { out } + pub fn heap_capacity(&self) -> usize { + self.heap.capacity() + } + + pub fn index_entry_count_estimate(&self) -> usize { + if self.index_ready { + self.index.len() + } else { + self.heap.len() + } + } + + pub fn index_capacity_estimate(&self) -> usize { + if self.index_ready { + self.index.capacity() + } else { + self.heap.capacity() + } + } + pub fn estimated_bytes(&self) -> usize { std::mem::size_of::() + self.heap.len() * (std::mem::size_of::() + 32) diff --git a/crates/beava-core/src/sketches/count_distinct.rs b/crates/beava-core/src/sketches/count_distinct.rs index 735388af..94caab69 100644 --- a/crates/beava-core/src/sketches/count_distinct.rs +++ b/crates/beava-core/src/sketches/count_distinct.rs @@ -88,6 +88,13 @@ impl CountDistinctState { } } + pub fn hash_set_capacity(&self) -> Option { + match self { + CountDistinctState::HashSet { hashes } => Some(hashes.capacity()), + _ => None, + } + } + /// Insert a precomputed u64 hash. Promotes mode if threshold exceeded. /// /// The input u64 is expected to come from a FxHasher-backed hasher; diff --git a/crates/beava-core/src/sketches/entropy.rs b/crates/beava-core/src/sketches/entropy.rs index 28c574f9..37237f5a 100644 --- a/crates/beava-core/src/sketches/entropy.rs +++ b/crates/beava-core/src/sketches/entropy.rs @@ -57,6 +57,10 @@ impl EntropyHistogram { self.counts.len() } + pub fn key_capacity_bytes(&self) -> usize { + self.counts.keys().map(|k| k.capacity()).sum() + } + pub fn insert(&mut self, value: &str) { self.total = self.total.saturating_add(1); // 1) Already-tracked key → bump and return. diff --git a/crates/beava-core/src/sketches/hll.rs b/crates/beava-core/src/sketches/hll.rs index 11458e40..84e2f9c1 100644 --- a/crates/beava-core/src/sketches/hll.rs +++ b/crates/beava-core/src/sketches/hll.rs @@ -105,6 +105,10 @@ impl Hll { Self::default() } + pub fn register_capacity(&self) -> usize { + self.registers.capacity() + } + /// SplitMix64 — improves the avalanche / uniform-distribution properties /// of the input hash, which is critical for HLL accuracy when callers /// pass hashes from a non-cryptographic hasher (ahash etc.). diff --git a/crates/beava-core/src/sketches/uddsketch.rs b/crates/beava-core/src/sketches/uddsketch.rs index 7f915de0..6f306726 100644 --- a/crates/beava-core/src/sketches/uddsketch.rs +++ b/crates/beava-core/src/sketches/uddsketch.rs @@ -106,6 +106,31 @@ impl UDDSketch { self.total_count } + #[inline] + pub fn positive_bucket_capacity(&self) -> usize { + self.pos_buckets.capacity() + } + + #[inline] + pub fn negative_bucket_capacity(&self) -> usize { + self.neg_buckets.capacity() + } + + #[inline] + pub fn positive_bucket_len(&self) -> usize { + self.pos_buckets.len() + } + + #[inline] + pub fn negative_bucket_len(&self) -> usize { + self.neg_buckets.len() + } + + #[inline] + pub fn zero_count(&self) -> u64 { + self.zero_count + } + #[inline] pub fn is_empty(&self) -> bool { self.total_count == 0 diff --git a/crates/beava-server/src/http_admin.rs b/crates/beava-server/src/http_admin.rs index 8314f115..4bafe24d 100644 --- a/crates/beava-server/src/http_admin.rs +++ b/crates/beava-server/src/http_admin.rs @@ -47,6 +47,10 @@ pub struct RegistrySnapshot { pub type SharedRegistrySnapshot = Arc>; +/// Static v0 estimate from PROJECT.md "Memory" budget (~7 KB per entity +/// for a rich 30-feature pack). A periodic resampler is post-v0 work. +pub const BYTES_PER_ENTITY_P99_V0_PLACEHOLDER: u64 = 7000; + #[derive(Clone)] struct AdminState { snapshot: SharedRegistrySnapshot, @@ -86,9 +90,6 @@ async fn metrics_handler(State(state): State) -> impl IntoResponse { let cold_evictions = ColdEntityEvictionCounter::count(); let bucket_reclaims = BucketReclaimCounter::count(); let entity_count = EntityCountResidentSnapshot::load(); - // Static v0 estimate from PROJECT.md "Memory" budget (~7 KB per entity - // for a rich 30-feature pack). A periodic resampler is post-v0 work. - const BYTES_PER_ENTITY_P99_V0_PLACEHOLDER: u64 = 7000; // The lifetime-op aggregate counter currently aliases entropy // `categories_capped`; top-k displacement and histogram bucket drops // join when those operator internals expose hooks. diff --git a/memory-profile-fraud-team.md b/memory-profile-fraud-team.md new file mode 100644 index 00000000..65d588d3 --- /dev/null +++ b/memory-profile-fraud-team.md @@ -0,0 +1,440 @@ +# AggOp Memory Profile: fraud-team + +## Workload Summary + +- Workload: `fraud` +- Events requested from generator: `100000` +- Events replayed from generator: `100000` +- Events by source: + - `Txn`: `100000` +- Derivations discovered: `9` +- Aggregate features discovered: `111` +- Active entity rows profiled: `67311` +- Bytes per active entity row p99: `98336` bytes + +## Per-Entity Table Footprint + +| Rank | Table | Source | group_by key | Active entities | Features/entity | Events applied | Stack p50 | Stack p99 | Stack max | Heap p50 | Heap p99 | Heap max | Total p50 | Total p99 | Total max | Top contributor | +|------|-------|--------|--------------|-----------------|-----------------|----------------|-----------|-----------|-----------|----------|----------|----------|-----------|-----------|-----------|-----------------| +| 1 | `TxnByIp` | `Txn` | `ip_address` | 1000 | 8 | 100000 | 640 | 640 | 640 | 98176 | 128448 | 129984 | 98816 | 129088 | 130624 | `cards_per_ip_1h` | +| 2 | `TxnByDevice` | `Txn` | `device_id` | 1000 | 6 | 100000 | 480 | 480 | 480 | 58192 | 58192 | 58192 | 58672 | 58672 | 58672 | `cards_per_device_24h` | +| 3 | `TxnByMerchant` | `Txn` | `merchant_id` | 1000 | 4 | 100000 | 320 | 320 | 320 | 31640 | 31640 | 31640 | 31960 | 31960 | 31960 | `users_per_merchant_24h` | +| 4 | `TxnByCard` | `Txn` | `card_fp` | 1000 | 8 | 100000 | 640 | 640 | 640 | 31080 | 31080 | 31080 | 31720 | 31720 | 31720 | `merchants_per_card_24h` | +| 5 | `TxnByUser` | `Txn` | `user_id` | 63311 | 62 | 100000 | 4960 | 4960 | 4960 | 20494 | 21673 | 23288 | 25454 | 26633 | 28248 | `amount_p95_24h` | +| 6 | `CardAddByDevice` | `CardAdd` | `device_id` | 0 | 3 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | `-` | +| 7 | `LoginByUser` | `Login` | `user_id` | 0 | 8 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | `-` | +| 8 | `RefundByUser` | `Refund` | `user_id` | 0 | 8 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | `-` | +| 9 | `SignupByIp` | `Signup` | `ip_address` | 0 | 4 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | `-` | + +## Per-Table Entity Details + +### `TxnByIp` (`Txn` by `ip_address`) + +#### Feature Columns Across Entities + +| Feature | Op | Shape | Stack bytes | Heap p50 | Heap p99 | Heap max | Total p50 | Total p99 | Total max | +|---------|----|-------|-------------|----------|----------|----------|-----------|-----------|-----------| +| `cards_per_ip_1h` | `n_unique` | `windowed` | 80 | 58008 | 86552 | 86552 | 58088 | 86632 | 86632 | +| `users_per_ip_24h` | `n_unique` | `windowed` | 80 | 28968 | 28968 | 28968 | 29048 | 29048 | 29048 | +| `ip_top_users` | `top_k` | `windowed` | 80 | 10016 | 12128 | 13376 | 10096 | 12208 | 13456 | +| `amount_sum_per_ip_1h` | `sum` | `windowed` | 80 | 416 | 416 | 416 | 496 | 496 | 496 | +| `txn_per_ip_1h` | `count` | `windowed` | 80 | 416 | 416 | 416 | 496 | 496 | 496 | +| `txn_per_ip_24h` | `count` | `windowed` | 80 | 256 | 256 | 256 | 336 | 336 | 336 | +| `ip_age` | `age` | `lifetime` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | +| `ip_first_seen` | `first_seen` | `lifetime` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | + +#### Largest Entity Rows + +| Entity key | Events | Stack bytes | Heap bytes | Total bytes | Top feature contributors | +|------------|--------|-------------|------------|-------------|--------------------------| +| `s533` | 135 | 640 | 129984 | 130624 | `cards_per_ip_1h`=86632 bytes, `users_per_ip_24h`=29048 bytes, `ip_top_users`=13456 bytes | +| `s891` | 127 | 640 | 129216 | 129856 | `cards_per_ip_1h`=86632 bytes, `users_per_ip_24h`=29048 bytes, `ip_top_users`=12688 bytes | +| `s390` | 125 | 640 | 129024 | 129664 | `cards_per_ip_1h`=86632 bytes, `users_per_ip_24h`=29048 bytes, `ip_top_users`=12496 bytes | +| `s58` | 124 | 640 | 128928 | 129568 | `cards_per_ip_1h`=86632 bytes, `users_per_ip_24h`=29048 bytes, `ip_top_users`=12400 bytes | +| `s279` | 122 | 640 | 128640 | 129280 | `cards_per_ip_1h`=86632 bytes, `users_per_ip_24h`=29048 bytes, `ip_top_users`=12112 bytes | + +#### Feature Breakdown For Largest Entity `s533` + +| Feature | Op | Shape | Events applied | Stack bytes | enum_slot_bytes | payload_bytes | slack_bytes | Heap bytes | Total bytes | +|---------|----|-------|----------------|-------------|-----------------|---------------|-------------|------------|-------------| +| `cards_per_ip_1h` | `n_unique` | `windowed` | 135 | 80 | 80 | 8 | 72 | 86552 | 86632 | +| `users_per_ip_24h` | `n_unique` | `windowed` | 135 | 80 | 80 | 8 | 72 | 28968 | 29048 | +| `ip_top_users` | `top_k` | `windowed` | 135 | 80 | 80 | 8 | 72 | 13376 | 13456 | +| `amount_sum_per_ip_1h` | `sum` | `windowed` | 135 | 80 | 80 | 8 | 72 | 416 | 496 | +| `txn_per_ip_1h` | `count` | `windowed` | 135 | 80 | 80 | 8 | 72 | 416 | 496 | +| `txn_per_ip_24h` | `count` | `windowed` | 135 | 80 | 80 | 8 | 72 | 256 | 336 | +| `ip_age` | `age` | `lifetime` | 135 | 80 | 80 | 32 | 48 | 0 | 80 | +| `ip_first_seen` | `first_seen` | `lifetime` | 135 | 80 | 80 | 32 | 48 | 0 | 80 | + +### `TxnByDevice` (`Txn` by `device_id`) + +#### Feature Columns Across Entities + +| Feature | Op | Shape | Stack bytes | Heap p50 | Heap p99 | Heap max | Total p50 | Total p99 | Total max | +|---------|----|-------|-------------|----------|----------|----------|-----------|-----------|-----------| +| `cards_per_device_24h` | `n_unique` | `windowed` | 80 | 28968 | 28968 | 28968 | 29048 | 29048 | 29048 | +| `users_per_device_24h` | `n_unique` | `windowed` | 80 | 28968 | 28968 | 28968 | 29048 | 29048 | 29048 | +| `device_txn_count_24h` | `count` | `windowed` | 80 | 256 | 256 | 256 | 336 | 336 | 336 | +| `device_age` | `age` | `lifetime` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | +| `device_first_seen` | `first_seen` | `lifetime` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | +| `device_last_seen` | `last_seen` | `lifetime` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | + +#### Largest Entity Rows + +| Entity key | Events | Stack bytes | Heap bytes | Total bytes | Top feature contributors | +|------------|--------|-------------|------------|-------------|--------------------------| +| `s885` | 134 | 480 | 58192 | 58672 | `cards_per_device_24h`=29048 bytes, `users_per_device_24h`=29048 bytes, `device_txn_count_24h`=336 bytes | +| `s632` | 128 | 480 | 58192 | 58672 | `cards_per_device_24h`=29048 bytes, `users_per_device_24h`=29048 bytes, `device_txn_count_24h`=336 bytes | +| `s641` | 128 | 480 | 58192 | 58672 | `cards_per_device_24h`=29048 bytes, `users_per_device_24h`=29048 bytes, `device_txn_count_24h`=336 bytes | +| `s966` | 128 | 480 | 58192 | 58672 | `cards_per_device_24h`=29048 bytes, `users_per_device_24h`=29048 bytes, `device_txn_count_24h`=336 bytes | +| `s605` | 127 | 480 | 58192 | 58672 | `cards_per_device_24h`=29048 bytes, `users_per_device_24h`=29048 bytes, `device_txn_count_24h`=336 bytes | + +#### Feature Breakdown For Largest Entity `s885` + +| Feature | Op | Shape | Events applied | Stack bytes | enum_slot_bytes | payload_bytes | slack_bytes | Heap bytes | Total bytes | +|---------|----|-------|----------------|-------------|-----------------|---------------|-------------|------------|-------------| +| `cards_per_device_24h` | `n_unique` | `windowed` | 134 | 80 | 80 | 8 | 72 | 28968 | 29048 | +| `users_per_device_24h` | `n_unique` | `windowed` | 134 | 80 | 80 | 8 | 72 | 28968 | 29048 | +| `device_txn_count_24h` | `count` | `windowed` | 134 | 80 | 80 | 8 | 72 | 256 | 336 | +| `device_age` | `age` | `lifetime` | 134 | 80 | 80 | 32 | 48 | 0 | 80 | +| `device_first_seen` | `first_seen` | `lifetime` | 134 | 80 | 80 | 32 | 48 | 0 | 80 | +| `device_last_seen` | `last_seen` | `lifetime` | 134 | 80 | 80 | 32 | 48 | 0 | 80 | + +### `TxnByMerchant` (`Txn` by `merchant_id`) + +#### Feature Columns Across Entities + +| Feature | Op | Shape | Stack bytes | Heap p50 | Heap p99 | Heap max | Total p50 | Total p99 | Total max | +|---------|----|-------|-------------|----------|----------|----------|-----------|-----------|-----------| +| `users_per_merchant_24h` | `n_unique` | `windowed` | 80 | 28968 | 28968 | 28968 | 29048 | 29048 | 29048 | +| `merchant_amount_p99_24h` | `quantile` | `windowed` | 80 | 2416 | 2416 | 2416 | 2496 | 2496 | 2496 | +| `txn_per_merchant_24h` | `count` | `windowed` | 80 | 256 | 256 | 256 | 336 | 336 | 336 | +| `merchant_first_seen` | `first_seen` | `lifetime` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | + +#### Largest Entity Rows + +| Entity key | Events | Stack bytes | Heap bytes | Total bytes | Top feature contributors | +|------------|--------|-------------|------------|-------------|--------------------------| +| `s969` | 135 | 320 | 31640 | 31960 | `users_per_merchant_24h`=29048 bytes, `merchant_amount_p99_24h`=2496 bytes, `txn_per_merchant_24h`=336 bytes | +| `s428` | 132 | 320 | 31640 | 31960 | `users_per_merchant_24h`=29048 bytes, `merchant_amount_p99_24h`=2496 bytes, `txn_per_merchant_24h`=336 bytes | +| `s676` | 132 | 320 | 31640 | 31960 | `users_per_merchant_24h`=29048 bytes, `merchant_amount_p99_24h`=2496 bytes, `txn_per_merchant_24h`=336 bytes | +| `s725` | 129 | 320 | 31640 | 31960 | `users_per_merchant_24h`=29048 bytes, `merchant_amount_p99_24h`=2496 bytes, `txn_per_merchant_24h`=336 bytes | +| `s806` | 129 | 320 | 31640 | 31960 | `users_per_merchant_24h`=29048 bytes, `merchant_amount_p99_24h`=2496 bytes, `txn_per_merchant_24h`=336 bytes | + +#### Feature Breakdown For Largest Entity `s969` + +| Feature | Op | Shape | Events applied | Stack bytes | enum_slot_bytes | payload_bytes | slack_bytes | Heap bytes | Total bytes | +|---------|----|-------|----------------|-------------|-----------------|---------------|-------------|------------|-------------| +| `users_per_merchant_24h` | `n_unique` | `windowed` | 135 | 80 | 80 | 8 | 72 | 28968 | 29048 | +| `merchant_amount_p99_24h` | `quantile` | `windowed` | 135 | 80 | 80 | 8 | 72 | 2416 | 2496 | +| `txn_per_merchant_24h` | `count` | `windowed` | 135 | 80 | 80 | 8 | 72 | 256 | 336 | +| `merchant_first_seen` | `first_seen` | `lifetime` | 135 | 80 | 80 | 32 | 48 | 0 | 80 | + +### `TxnByCard` (`Txn` by `card_fp`) + +#### Feature Columns Across Entities + +| Feature | Op | Shape | Stack bytes | Heap p50 | Heap p99 | Heap max | Total p50 | Total p99 | Total max | +|---------|----|-------|-------------|----------|----------|----------|-----------|-----------|-----------| +| `merchants_per_card_24h` | `n_unique` | `windowed` | 80 | 28968 | 28968 | 28968 | 29048 | 29048 | 29048 | +| `small_amt_burst_5m` | `burst_count` | `windowed` | 80 | 1024 | 1024 | 1024 | 1104 | 1104 | 1104 | +| `decline_count_1h` | `count` | `windowed` | 80 | 416 | 416 | 416 | 496 | 496 | 496 | +| `txn_per_card_1h` | `count` | `windowed` | 80 | 416 | 416 | 416 | 496 | 496 | 496 | +| `txn_per_card_24h` | `count` | `windowed` | 80 | 256 | 256 | 256 | 336 | 336 | 336 | +| `card_age` | `age` | `lifetime` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | +| `card_first_seen` | `first_seen` | `lifetime` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | +| `decline_streak_card` | `negative_streak` | `lifetime` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | + +#### Largest Entity Rows + +| Entity key | Events | Stack bytes | Heap bytes | Total bytes | Top feature contributors | +|------------|--------|-------------|------------|-------------|--------------------------| +| `s180` | 130 | 640 | 31080 | 31720 | `merchants_per_card_24h`=29048 bytes, `small_amt_burst_5m`=1104 bytes, `decline_count_1h`=496 bytes | +| `s560` | 130 | 640 | 31080 | 31720 | `merchants_per_card_24h`=29048 bytes, `small_amt_burst_5m`=1104 bytes, `decline_count_1h`=496 bytes | +| `s272` | 129 | 640 | 31080 | 31720 | `merchants_per_card_24h`=29048 bytes, `small_amt_burst_5m`=1104 bytes, `decline_count_1h`=496 bytes | +| `s362` | 129 | 640 | 31080 | 31720 | `merchants_per_card_24h`=29048 bytes, `small_amt_burst_5m`=1104 bytes, `decline_count_1h`=496 bytes | +| `s42` | 129 | 640 | 31080 | 31720 | `merchants_per_card_24h`=29048 bytes, `small_amt_burst_5m`=1104 bytes, `decline_count_1h`=496 bytes | + +#### Feature Breakdown For Largest Entity `s180` + +| Feature | Op | Shape | Events applied | Stack bytes | enum_slot_bytes | payload_bytes | slack_bytes | Heap bytes | Total bytes | +|---------|----|-------|----------------|-------------|-----------------|---------------|-------------|------------|-------------| +| `merchants_per_card_24h` | `n_unique` | `windowed` | 130 | 80 | 80 | 8 | 72 | 28968 | 29048 | +| `small_amt_burst_5m` | `burst_count` | `windowed` | 130 | 80 | 80 | 72 | 8 | 1024 | 1104 | +| `decline_count_1h` | `count` | `windowed` | 130 | 80 | 80 | 8 | 72 | 416 | 496 | +| `txn_per_card_1h` | `count` | `windowed` | 130 | 80 | 80 | 8 | 72 | 416 | 496 | +| `txn_per_card_24h` | `count` | `windowed` | 130 | 80 | 80 | 8 | 72 | 256 | 336 | +| `card_age` | `age` | `lifetime` | 130 | 80 | 80 | 32 | 48 | 0 | 80 | +| `card_first_seen` | `first_seen` | `lifetime` | 130 | 80 | 80 | 32 | 48 | 0 | 80 | +| `decline_streak_card` | `negative_streak` | `lifetime` | 130 | 80 | 80 | 8 | 72 | 0 | 80 | + +### `TxnByUser` (`Txn` by `user_id`) + +#### Feature Columns Across Entities + +| Feature | Op | Shape | Stack bytes | Heap p50 | Heap p99 | Heap max | Total p50 | Total p99 | Total max | +|---------|----|-------|-------------|----------|----------|----------|-----------|-----------|-----------| +| `amount_p95_24h` | `quantile` | `windowed` | 80 | 2416 | 2416 | 2416 | 2496 | 2496 | 2496 | +| `p50_amount_24h` | `quantile` | `windowed` | 80 | 2416 | 2416 | 2416 | 2496 | 2496 | 2496 | +| `p99_amount_24h` | `quantile` | `windowed` | 80 | 2416 | 2416 | 2416 | 2496 | 2496 | 2496 | +| `dist_from_home` | `distance_from_home` | `lifetime` | 80 | 1720 | 1720 | 1720 | 1800 | 1800 | 1800 | +| `reservoir_50` | `reservoir_sample` | `lifetime` | 80 | 1600 | 1600 | 1600 | 1680 | 1680 | 1680 | +| `dow_hour_hist_30d` | `dow_hour_histogram` | `lifetime` | 80 | 1344 | 1344 | 1344 | 1424 | 1424 | 1424 | +| `device_seen` | `bloom_member` | `lifetime` | 80 | 1280 | 1280 | 1280 | 1360 | 1360 | 1360 | +| `burst_count_5m` | `burst_count` | `windowed` | 80 | 1024 | 1024 | 1024 | 1104 | 1104 | 1104 | +| `top_merchants_24h` | `top_k` | `windowed` | 80 | 512 | 800 | 1184 | 592 | 880 | 1264 | +| `mcc_entropy_24h` | `entropy` | `windowed` | 80 | 396 | 648 | 984 | 476 | 728 | 1064 | +| `seasonal_dev` | `seasonal_deviation` | `lifetime` | 80 | 600 | 600 | 600 | 680 | 680 | 680 | +| `txn_count_5m` | `count` | `windowed` | 80 | 256 | 496 | 944 | 336 | 576 | 1024 | +| `event_mix_24h` | `event_type_mix` | `lifetime` | 80 | 208 | 448 | 768 | 288 | 528 | 848 | +| `countries_distinct_7d` | `n_unique` | `windowed` | 80 | 424 | 424 | 424 | 504 | 504 | 504 | +| `ips_distinct_24h` | `n_unique` | `windowed` | 80 | 424 | 424 | 424 | 504 | 504 | 504 | +| `merchants_distinct_24h` | `n_unique` | `windowed` | 80 | 424 | 424 | 424 | 504 | 504 | 504 | +| `txn_count_1h` | `count` | `windowed` | 80 | 256 | 416 | 416 | 336 | 496 | 496 | +| `avg_amount_24h` | `mean` | `windowed` | 80 | 256 | 256 | 256 | 336 | 336 | 336 | +| `min_amount_24h` | `min` | `windowed` | 80 | 256 | 256 | 256 | 336 | 336 | 336 | +| `std_amount_24h` | `std` | `windowed` | 80 | 256 | 256 | 256 | 336 | 336 | 336 | +| `sum_amount_24h` | `sum` | `windowed` | 80 | 256 | 256 | 256 | 336 | 336 | 336 | +| `txn_count_24h` | `count` | `windowed` | 80 | 256 | 256 | 256 | 336 | 336 | 336 | +| `var_amount_24h` | `var` | `windowed` | 80 | 256 | 256 | 256 | 336 | 336 | 336 | +| `hour_hist_30d` | `hour_of_day_histogram` | `lifetime` | 80 | 192 | 192 | 192 | 272 | 272 | 272 | +| `unique_cells_24h` | `n_unique` | `lifetime` | 80 | 168 | 168 | 168 | 248 | 248 | 248 | +| `last_5_amounts` | `last_n` | `lifetime` | 80 | 160 | 160 | 160 | 240 | 240 | 240 | +| `recent_5_amts` | `most_recent_n` | `lifetime` | 80 | 160 | 160 | 160 | 240 | 240 | 240 | +| `first_5_merchants` | `first_n` | `lifetime` | 80 | 128 | 128 | 256 | 208 | 208 | 336 | +| `geo_kmh` | `geo_velocity` | `lifetime` | 80 | 94 | 94 | 94 | 174 | 174 | 174 | +| `geo_spread_24h` | `geo_spread` | `lifetime` | 80 | 94 | 94 | 94 | 174 | 174 | 174 | +| `geo_dist_last` | `geo_distance` | `lifetime` | 80 | 86 | 86 | 86 | 166 | 166 | 166 | +| `amount_lag1` | `lag` | `lifetime` | 80 | 64 | 64 | 64 | 144 | 144 | 144 | +| `geo_entropy_24h` | `entropy` | `lifetime` | 80 | 56 | 56 | 56 | 136 | 136 | 136 | +| `time_since_last_5` | `time_since_last_n` | `lifetime` | 80 | 40 | 40 | 40 | 120 | 120 | 120 | +| `age` | `age` | `lifetime` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | +| `amount_decayed_sum_24h` | `decayed_sum` | `lifetime` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | +| `amount_delta` | `delta_from_prev` | `lifetime` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | +| `amount_ew_zscore` | `ew_zscore` | `lifetime` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | +| `amount_ewma_1h` | `ewma` | `lifetime` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | +| `amount_ewvar_1h` | `ewvar` | `lifetime` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | +| `amount_outliers_5m` | `outlier_count` | `windowed` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | +| `amount_rate_5m` | `rate_of_change` | `windowed` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | +| `amount_trend_5m` | `trend` | `windowed` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | +| `amount_trend_resid_5m` | `trend_residual` | `windowed` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | +| `amount_twa_5m` | `twa` | `windowed` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | +| `amount_z_score` | `z_score` | `windowed` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | +| `decline_streak` | `negative_streak` | `lifetime` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | +| `device_change_count_5m` | `value_change_count` | `windowed` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | +| `first_amount` | `first` | `lifetime` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | +| `first_in_24h` | `first_seen_in_window` | `windowed` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | +| `first_seen` | `first_seen` | `lifetime` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | +| `has_seen` | `has_seen` | `lifetime` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | +| `inter_arrival_1h` | `inter_arrival_stats` | `windowed` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | +| `last_amount` | `last` | `lifetime` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | +| `last_seen` | `last_seen` | `lifetime` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | +| `max_amount_lifetime` | `max` | `lifetime` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | +| `max_streak` | `max_streak` | `lifetime` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | +| `sum_amount_lifetime` | `sum` | `lifetime` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | +| `time_since_last` | `time_since` | `lifetime` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | +| `txn_count_lifetime` | `count` | `lifetime` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | +| `txn_decayed_count_24h` | `decayed_count` | `lifetime` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | +| `txn_streak` | `streak` | `lifetime` | 80 | 0 | 0 | 0 | 80 | 80 | 80 | + +#### Largest Entity Rows + +| Entity key | Events | Stack bytes | Heap bytes | Total bytes | Top feature contributors | +|------------|--------|-------------|------------|-------------|--------------------------| +| `k00021309` | 8 | 4960 | 23288 | 28248 | `amount_p95_24h`=2496 bytes, `p50_amount_24h`=2496 bytes, `p99_amount_24h`=2496 bytes | +| `k00074547` | 8 | 4960 | 23288 | 28248 | `amount_p95_24h`=2496 bytes, `p50_amount_24h`=2496 bytes, `p99_amount_24h`=2496 bytes | +| `k00088034` | 8 | 4960 | 23130 | 28090 | `amount_p95_24h`=2496 bytes, `p50_amount_24h`=2496 bytes, `p99_amount_24h`=2496 bytes | +| `k00089103` | 8 | 4960 | 23129 | 28089 | `amount_p95_24h`=2496 bytes, `p50_amount_24h`=2496 bytes, `p99_amount_24h`=2496 bytes | +| `k00013686` | 7 | 4960 | 22950 | 27910 | `amount_p95_24h`=2496 bytes, `p50_amount_24h`=2496 bytes, `p99_amount_24h`=2496 bytes | + +#### Feature Breakdown For Largest Entity `k00021309` + +| Feature | Op | Shape | Events applied | Stack bytes | enum_slot_bytes | payload_bytes | slack_bytes | Heap bytes | Total bytes | +|---------|----|-------|----------------|-------------|-----------------|---------------|-------------|------------|-------------| +| `amount_p95_24h` | `quantile` | `windowed` | 8 | 80 | 80 | 8 | 72 | 2416 | 2496 | +| `p50_amount_24h` | `quantile` | `windowed` | 8 | 80 | 80 | 8 | 72 | 2416 | 2496 | +| `p99_amount_24h` | `quantile` | `windowed` | 8 | 80 | 80 | 8 | 72 | 2416 | 2496 | +| `dist_from_home` | `distance_from_home` | `lifetime` | 8 | 80 | 80 | 8 | 72 | 1720 | 1800 | +| `reservoir_50` | `reservoir_sample` | `lifetime` | 8 | 80 | 80 | 40 | 40 | 1600 | 1680 | +| `dow_hour_hist_30d` | `dow_hour_histogram` | `lifetime` | 8 | 80 | 80 | 24 | 56 | 1344 | 1424 | +| `device_seen` | `bloom_member` | `lifetime` | 8 | 80 | 80 | 8 | 72 | 1280 | 1360 | +| `top_merchants_24h` | `top_k` | `windowed` | 8 | 80 | 80 | 8 | 72 | 1184 | 1264 | +| `burst_count_5m` | `burst_count` | `windowed` | 8 | 80 | 80 | 72 | 8 | 1024 | 1104 | +| `mcc_entropy_24h` | `entropy` | `windowed` | 8 | 80 | 80 | 8 | 72 | 982 | 1062 | +| `txn_count_5m` | `count` | `windowed` | 8 | 80 | 80 | 8 | 72 | 944 | 1024 | +| `event_mix_24h` | `event_type_mix` | `lifetime` | 8 | 80 | 80 | 8 | 72 | 768 | 848 | +| `seasonal_dev` | `seasonal_deviation` | `lifetime` | 8 | 80 | 80 | 8 | 72 | 600 | 680 | +| `countries_distinct_7d` | `n_unique` | `windowed` | 8 | 80 | 80 | 8 | 72 | 424 | 504 | +| `ips_distinct_24h` | `n_unique` | `windowed` | 8 | 80 | 80 | 8 | 72 | 424 | 504 | +| `merchants_distinct_24h` | `n_unique` | `windowed` | 8 | 80 | 80 | 8 | 72 | 424 | 504 | +| `txn_count_1h` | `count` | `windowed` | 8 | 80 | 80 | 8 | 72 | 416 | 496 | +| `avg_amount_24h` | `mean` | `windowed` | 8 | 80 | 80 | 8 | 72 | 256 | 336 | +| `first_5_merchants` | `first_n` | `lifetime` | 8 | 80 | 80 | 32 | 48 | 256 | 336 | +| `min_amount_24h` | `min` | `windowed` | 8 | 80 | 80 | 8 | 72 | 256 | 336 | +| `std_amount_24h` | `std` | `windowed` | 8 | 80 | 80 | 8 | 72 | 256 | 336 | +| `sum_amount_24h` | `sum` | `windowed` | 8 | 80 | 80 | 8 | 72 | 256 | 336 | +| `txn_count_24h` | `count` | `windowed` | 8 | 80 | 80 | 8 | 72 | 256 | 336 | +| `var_amount_24h` | `var` | `windowed` | 8 | 80 | 80 | 8 | 72 | 256 | 336 | +| `hour_hist_30d` | `hour_of_day_histogram` | `lifetime` | 8 | 80 | 80 | 8 | 72 | 192 | 272 | +| `unique_cells_24h` | `n_unique` | `lifetime` | 8 | 80 | 80 | 8 | 72 | 168 | 248 | +| `last_5_amounts` | `last_n` | `lifetime` | 8 | 80 | 80 | 40 | 40 | 160 | 240 | +| `recent_5_amts` | `most_recent_n` | `lifetime` | 8 | 80 | 80 | 48 | 32 | 160 | 240 | +| `geo_kmh` | `geo_velocity` | `lifetime` | 8 | 80 | 80 | 8 | 72 | 94 | 174 | +| `geo_spread_24h` | `geo_spread` | `lifetime` | 8 | 80 | 80 | 8 | 72 | 94 | 174 | +| `geo_dist_last` | `geo_distance` | `lifetime` | 8 | 80 | 80 | 8 | 72 | 86 | 166 | +| `amount_lag1` | `lag` | `lifetime` | 8 | 80 | 80 | 40 | 40 | 64 | 144 | +| `geo_entropy_24h` | `entropy` | `lifetime` | 8 | 80 | 80 | 8 | 72 | 56 | 136 | +| `time_since_last_5` | `time_since_last_n` | `lifetime` | 8 | 80 | 80 | 40 | 40 | 40 | 120 | +| `age` | `age` | `lifetime` | 8 | 80 | 80 | 32 | 48 | 0 | 80 | +| `amount_decayed_sum_24h` | `decayed_sum` | `lifetime` | 8 | 80 | 80 | 32 | 48 | 0 | 80 | +| `amount_delta` | `delta_from_prev` | `lifetime` | 8 | 80 | 80 | 24 | 56 | 0 | 80 | +| `amount_ew_zscore` | `ew_zscore` | `lifetime` | 8 | 80 | 80 | 48 | 32 | 0 | 80 | +| `amount_ewma_1h` | `ewma` | `lifetime` | 8 | 80 | 80 | 32 | 48 | 0 | 80 | +| `amount_ewvar_1h` | `ewvar` | `lifetime` | 8 | 80 | 80 | 48 | 32 | 0 | 80 | +| `amount_outliers_5m` | `outlier_count` | `windowed` | 8 | 80 | 80 | 40 | 40 | 0 | 80 | +| `amount_rate_5m` | `rate_of_change` | `windowed` | 8 | 80 | 80 | 32 | 48 | 0 | 80 | +| `amount_trend_5m` | `trend` | `windowed` | 8 | 80 | 80 | 48 | 32 | 0 | 80 | +| `amount_trend_resid_5m` | `trend_residual` | `windowed` | 8 | 80 | 80 | 72 | 8 | 0 | 80 | +| `amount_twa_5m` | `twa` | `windowed` | 8 | 80 | 80 | 40 | 40 | 0 | 80 | +| `amount_z_score` | `z_score` | `windowed` | 8 | 80 | 80 | 40 | 40 | 0 | 80 | +| `decline_streak` | `negative_streak` | `lifetime` | 8 | 80 | 80 | 8 | 72 | 0 | 80 | +| `device_change_count_5m` | `value_change_count` | `windowed` | 8 | 80 | 80 | 40 | 40 | 0 | 80 | +| `first_amount` | `first` | `lifetime` | 8 | 80 | 80 | 32 | 48 | 0 | 80 | +| `first_in_24h` | `first_seen_in_window` | `windowed` | 8 | 80 | 80 | 24 | 56 | 0 | 80 | +| `first_seen` | `first_seen` | `lifetime` | 8 | 80 | 80 | 32 | 48 | 0 | 80 | +| `has_seen` | `has_seen` | `lifetime` | 8 | 80 | 80 | 32 | 48 | 0 | 80 | +| `inter_arrival_1h` | `inter_arrival_stats` | `windowed` | 8 | 80 | 80 | 40 | 40 | 0 | 80 | +| `last_amount` | `last` | `lifetime` | 8 | 80 | 80 | 32 | 48 | 0 | 80 | +| `last_seen` | `last_seen` | `lifetime` | 8 | 80 | 80 | 32 | 48 | 0 | 80 | +| `max_amount_lifetime` | `max` | `lifetime` | 8 | 80 | 80 | 32 | 48 | 0 | 80 | +| `max_streak` | `max_streak` | `lifetime` | 8 | 80 | 80 | 16 | 64 | 0 | 80 | +| `sum_amount_lifetime` | `sum` | `lifetime` | 8 | 80 | 80 | 16 | 64 | 0 | 80 | +| `time_since_last` | `time_since` | `lifetime` | 8 | 80 | 80 | 32 | 48 | 0 | 80 | +| `txn_count_lifetime` | `count` | `lifetime` | 8 | 80 | 80 | 8 | 72 | 0 | 80 | +| `txn_decayed_count_24h` | `decayed_count` | `lifetime` | 8 | 80 | 80 | 32 | 48 | 0 | 80 | +| `txn_streak` | `streak` | `lifetime` | 8 | 80 | 80 | 16 | 64 | 0 | 80 | + +### `CardAddByDevice` (`CardAdd` by `device_id`) + +No active entity rows. Configured features: `3`. The workload generator emitted no events for this table's source. + +### `LoginByUser` (`Login` by `user_id`) + +No active entity rows. Configured features: `8`. The workload generator emitted no events for this table's source. + +### `RefundByUser` (`Refund` by `user_id`) + +No active entity rows. Configured features: `8`. The workload generator emitted no events for this table's source. + +### `SignupByIp` (`Signup` by `ip_address`) + +No active entity rows. Configured features: `4`. The workload generator emitted no events for this table's source. + +## Top 5 Offenders + +One heaviest entity-feature example per unique op. + +### 1. `Txn` / `TxnByIp` / `cards_per_ip_1h` / `n_unique` + +- Path: `Txn` -> `TxnByIp` -> `cards_per_ip_1h` -> `n_unique` -> `windowed` +- Entity key: `s533` +- Entity events: `135` +- Key path: `ip_address` +- Events applied: `135` +- Bytes: stack=80 (enum_slot_bytes=80 payload_bytes=8 slack_bytes=72) heap=86552 total=86632 +- Shape: `windowed` (1h) +- Breakdown rollup: + - `CountDistinct hash-set slots across buckets`: 86016 bytes (HashSet, summed across active window buckets) + - `Windowed bucket shell overhead`: 240 bytes (Box, summed boxed AggOp enum slots across active buckets) + - `Windowed wrapper overhead`: 176 bytes (WindowedOp, summed boxed WindowedOp payload and spilled bucket storage) + - `Box across buckets`: 120 bytes (Box, summed across active window buckets) +- Raw breakdown: + - `Windowed bucket 0 / CountDistinct hash-set slots`: 28672 bytes (HashSet, estimated hashbrown slot cost for u64 distinct hashes) + - `Windowed bucket 1 / CountDistinct hash-set slots`: 28672 bytes (HashSet, estimated hashbrown slot cost for u64 distinct hashes) + - `Windowed bucket 2 / CountDistinct hash-set slots`: 28672 bytes (HashSet, estimated hashbrown slot cost for u64 distinct hashes) + - `Box`: 176 bytes (Box, heap allocation for boxed WindowedOp payload) + - `Windowed bucket 0 Box`: 80 bytes (Box, heap allocation for bucket AggOp enum slot) + - `Windowed bucket 1 Box`: 80 bytes (Box, heap allocation for bucket AggOp enum slot) + - `Windowed bucket 2 Box`: 80 bytes (Box, heap allocation for bucket AggOp enum slot) + - `Windowed bucket 0 / Box`: 40 bytes (Box, heap allocation for boxed CountDistinct wrapper) + +### 2. `Txn` / `TxnByIp` / `ip_top_users` / `top_k` + +- Path: `Txn` -> `TxnByIp` -> `ip_top_users` -> `top_k` -> `windowed` +- Entity key: `s533` +- Entity events: `135` +- Key path: `ip_address` +- Events applied: `135` +- Bytes: stack=80 (enum_slot_bytes=80 payload_bytes=8 slack_bytes=72) heap=13376 total=13456 +- Shape: `windowed` (1d) +- Breakdown rollup: + - `TopK exact BTreeMap entries across buckets`: 12960 bytes (BTreeMap, summed across active window buckets) + - `Windowed wrapper overhead`: 176 bytes (WindowedOp, summed boxed WindowedOp payload and spilled bucket storage) + - `Box across buckets`: 160 bytes (Box, summed across active window buckets) + - `Windowed bucket shell overhead`: 80 bytes (Box, summed boxed AggOp enum slots across active buckets) +- Raw breakdown: + - `Windowed bucket 0 / TopK exact BTreeMap entries`: 12960 bytes (BTreeMap, estimated node overhead plus TopKValue/u64 payloads) + - `Box`: 176 bytes (Box, heap allocation for boxed WindowedOp payload) + - `Windowed bucket 0 / Box`: 160 bytes (Box, heap allocation for boxed TopK wrapper) + - `Windowed bucket 0 Box`: 80 bytes (Box, heap allocation for bucket AggOp enum slot) + +### 3. `Txn` / `TxnByMerchant` / `merchant_amount_p99_24h` / `quantile` + +- Path: `Txn` -> `TxnByMerchant` -> `merchant_amount_p99_24h` -> `quantile` -> `windowed` +- Entity key: `s969` +- Entity events: `135` +- Key path: `merchant_id` +- Events applied: `135` +- Bytes: stack=80 (enum_slot_bytes=80 payload_bytes=8 slack_bytes=72) heap=2416 total=2496 +- Shape: `windowed` (1d) +- Breakdown rollup: + - `Percentile exact samples across buckets`: 2048 bytes (Vec, summed across active window buckets) + - `Windowed wrapper overhead`: 176 bytes (WindowedOp, summed boxed WindowedOp payload and spilled bucket storage) + - `Box across buckets`: 112 bytes (Box, summed across active window buckets) + - `Windowed bucket shell overhead`: 80 bytes (Box, summed boxed AggOp enum slots across active buckets) +- Raw breakdown: + - `Windowed bucket 0 / Percentile exact samples`: 2048 bytes (Vec, capacity * size_of::() for exact percentile samples) + - `Box`: 176 bytes (Box, heap allocation for boxed WindowedOp payload) + - `Windowed bucket 0 / Box`: 112 bytes (Box, heap allocation for boxed Percentile wrapper) + - `Windowed bucket 0 Box`: 80 bytes (Box, heap allocation for bucket AggOp enum slot) + +### 4. `Txn` / `TxnByUser` / `dist_from_home` / `distance_from_home` + +- Path: `Txn` -> `TxnByUser` -> `dist_from_home` -> `distance_from_home` -> `lifetime` +- Entity key: `k00021309` +- Entity events: `8` +- Key path: `user_id` +- Events applied: `8` +- Bytes: stack=80 (enum_slot_bytes=80 payload_bytes=8 slack_bytes=72) heap=1720 total=1800 +- Shape: `lifetime` +- Breakdown: + - `DistanceFromHome coordinate buffer`: 1600 bytes (Vec, capacity * size_of::<(f64, f64)>()) + - `Box`: 120 bytes (Box, heap allocation for boxed payload) + +### 5. `Txn` / `TxnByUser` / `reservoir_50` / `reservoir_sample` + +- Path: `Txn` -> `TxnByUser` -> `reservoir_50` -> `reservoir_sample` -> `lifetime` +- Entity key: `k00021309` +- Entity events: `8` +- Key path: `user_id` +- Events applied: `8` +- Bytes: stack=80 (enum_slot_bytes=80 payload_bytes=40 slack_bytes=40) heap=1600 total=1680 +- Shape: `lifetime` +- Breakdown: + - `ReservoirSample reservoir`: 1600 bytes (Vec, capacity * size_of::()) + +## Metrics Coherence + +- `/metrics` `beava_bytes_per_entity_p99`: `7000` bytes +- Profile bytes-per-active-entity-row p99: `98336` bytes +- Tolerance: `15.0%` +- Assertion: bytes_per_entity_p99 diverged by 91336 bytes; file sibling work to replace the static placeholder with live sampling. + +## Notes + +- `stack_bytes` is the inline `AggOp` enum slot for each feature. +- `enum_slot_bytes` is the fixed-size `AggOp` enum slot charged to a row; parent rows sum this across child paths. +- `payload_bytes` is the active variant payload inside the enum slot. For boxed variants this is the inline `Box` pointer, while the boxed pointee remains in `heap_bytes`. +- `slack_bytes` is unused capacity in the fixed-size `AggOp` enum slot: `enum_slot_bytes - payload_bytes`. +- Heap entries are deterministic structural counts; map/table allocator overhead is labeled as an estimate. +- Primary grain is `derivation table -> entity row -> feature column`; top offenders list one concrete entity-feature row per unique op.