From f152cb428037f13b29d12ecf5a10a81345ac2d30 Mon Sep 17 00:00:00 2001 From: Bo Maryniuk Date: Sat, 16 May 2026 17:38:07 +0200 Subject: [PATCH 1/5] Add clarifying comments --- logjetd/src/daemon.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/logjetd/src/daemon.rs b/logjetd/src/daemon.rs index 72b6893..2925dde 100644 --- a/logjetd/src/daemon.rs +++ b/logjetd/src/daemon.rs @@ -300,6 +300,11 @@ fn ingest_loop( if is_metrics { match ExportMetricsServiceRequest::decode(body.as_slice()) { Ok(batch) => { + // Metrics have no severity concept in OTLP, so they always classify as + // BatchPriority::Unknown (lowest priority). This is semantically correct: + // during overload, severity-aware shedding protects high-priority logs + // while metrics are treated as best-effort. If metrics must survive + // overload, increase ingest.max-batches-per-second or use buffer/file mode. let decision = ingest_policy.decide(BatchPriority::Unknown)?; if matches!(decision, IngestDecision::RejectRateLimited) { let response = Response::from_string("rate limit exceeded").with_status_code(StatusCode(429)); @@ -325,6 +330,11 @@ fn ingest_loop( } else if is_traces { match ExportTraceServiceRequest::decode(body.as_slice()) { Ok(batch) => { + // Traces have no severity concept in OTLP, so they always classify as + // BatchPriority::Unknown (lowest priority). This is semantically correct: + // during overload, severity-aware shedding protects high-priority logs + // while traces are treated as best-effort. If traces must survive + // overload, increase ingest.max-batches-per-second or use buffer/file mode. let decision = ingest_policy.decide(BatchPriority::Unknown)?; if matches!(decision, IngestDecision::RejectRateLimited) { let response = Response::from_string("rate limit exceeded").with_status_code(StatusCode(429)); @@ -955,6 +965,9 @@ struct OtlpGrpcTracesService { impl TraceService for OtlpGrpcTracesService { async fn export(&self, request: Request) -> Result, Status> { let batch = request.into_inner(); + // Traces have no severity concept in OTLP, so they always classify as + // BatchPriority::Unknown (lowest priority). See HTTP ingest comment above + // for the full rationale on metrics/traces rate-limiting policy. match self.ingest_policy.decide(BatchPriority::Unknown).map_err(|err| Status::internal(err.to_string()))? { IngestDecision::Accept | IngestDecision::AcceptPriorityBypass => {} IngestDecision::RejectRateLimited => { @@ -986,6 +999,9 @@ struct OtlpGrpcMetricsService { impl MetricsService for OtlpGrpcMetricsService { async fn export(&self, request: Request) -> Result, Status> { let batch = request.into_inner(); + // Metrics have no severity concept in OTLP, so they always classify as + // BatchPriority::Unknown (lowest priority). See HTTP ingest comment above + // for the full rationale on metrics/traces rate-limiting policy. match self.ingest_policy.decide(BatchPriority::Unknown).map_err(|err| Status::internal(err.to_string()))? { IngestDecision::Accept | IngestDecision::AcceptPriorityBypass => {} IngestDecision::RejectRateLimited => { From a2fa0c384863b70841c3d6d31029367c15cd7f2c Mon Sep 17 00:00:00 2001 From: Bo Maryniuk Date: Sat, 16 May 2026 17:38:36 +0200 Subject: [PATCH 2/5] Add metrics rate limiting --- logjetd/src/replay.rs | 59 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 57 insertions(+), 2 deletions(-) diff --git a/logjetd/src/replay.rs b/logjetd/src/replay.rs index 6f79ad9..c538224 100644 --- a/logjetd/src/replay.rs +++ b/logjetd/src/replay.rs @@ -140,23 +140,34 @@ fn bridge_transport( let worker_transport = collector_transport.clone(); let exporter = thread::spawn(move || export_worker(worker_transport, task_rx, result_tx)); let mut pending = std::collections::VecDeque::new(); + let mut stats = BridgeStats::new(); while let Some(record) = read_record(transport)? { flush_ready_results(transport, state, state_file, consume, &mut pending, &result_rx, false)?; + stats.note_record(record.record_type); let seq = record.seq; match enqueue_export_task(&task_tx, collector_transport, ExportTask { seq, record_type: record.record_type, payload: record.payload }) { Ok(EnqueueOutcome::Queued) => pending.push_back(PendingExport::Queued(seq)), - Ok(EnqueueOutcome::DroppedNewest) => pending.push_back(PendingExport::Dropped(seq)), + Ok(EnqueueOutcome::DroppedNewest) => { + stats.note_drop(record.record_type); + pending.push_back(PendingExport::Dropped(seq)); + } Err(err) => { + stats.log_summary(); drop(task_tx); let _ = exporter.join(); return Err(err); } } + + if stats.should_log() { + eprintln!("bridge backlog depth={} records_read={} {}", pending.len(), stats.records_read, stats.drop_summary()); + } } drop(task_tx); + stats.log_summary(); flush_ready_results(transport, state, state_file, consume, &mut pending, &result_rx, true)?; match exporter.join() { Ok(Ok(())) => Ok(()), @@ -181,7 +192,7 @@ fn enqueue_export_task( BackpressureMode::DropNewest => match task_tx.try_send(task) { Ok(()) => Ok(EnqueueOutcome::Queued), Err(mpsc::TrySendError::Full(task)) => { - eprintln!("bridge dropping seq={} because collector export buffer is full (mode=drop-newest)", task.seq); + eprintln!("bridge dropping seq={} type={:?} because collector export buffer is full (mode=drop-newest)", task.seq, task.record_type); Ok(EnqueueOutcome::DroppedNewest) } Err(mpsc::TrySendError::Disconnected(_)) => Err(io::Error::new(io::ErrorKind::BrokenPipe, "collector export worker stopped")), @@ -302,6 +313,50 @@ struct BridgeState { last_seq: u64, } +struct BridgeStats { + records_read: u64, + drops_logs: u64, + drops_metrics: u64, + drops_traces: u64, + drops_events: u64, +} + +impl BridgeStats { + fn new() -> Self { + Self { records_read: 0, drops_logs: 0, drops_metrics: 0, drops_traces: 0, drops_events: 0 } + } + + fn note_record(&mut self, _record_type: logjet::RecordType) { + self.records_read += 1; + } + + fn note_drop(&mut self, record_type: logjet::RecordType) { + match record_type { + logjet::RecordType::Logs => self.drops_logs += 1, + logjet::RecordType::Metrics => self.drops_metrics += 1, + logjet::RecordType::Traces => self.drops_traces += 1, + logjet::RecordType::Events => self.drops_events += 1, + } + } + + fn should_log(&self) -> bool { + self.records_read.is_multiple_of(1_000) + } + + fn drop_summary(&self) -> String { + let mut parts = Vec::new(); + if self.drops_logs > 0 { parts.push(format!("logs_drops={}", self.drops_logs)); } + if self.drops_metrics > 0 { parts.push(format!("metrics_drops={}", self.drops_metrics)); } + if self.drops_traces > 0 { parts.push(format!("traces_drops={}", self.drops_traces)); } + if self.drops_events > 0 { parts.push(format!("events_drops={}", self.drops_events)); } + if parts.is_empty() { "drops=0".to_string() } else { parts.join(" ") } + } + + fn log_summary(&self) { + eprintln!("bridge stats: records_read={} {}", self.records_read, self.drop_summary()); + } +} + fn read_bridge_state(path: Option<&Path>) -> io::Result { let Some(path) = path else { return Ok(BridgeState { stream_id: None, last_seq: 0 }); From 944451719fc3a2a4693c2c95c97509d9b1af709d Mon Sep 17 00:00:00 2001 From: Bo Maryniuk Date: Sat, 16 May 2026 17:49:04 +0200 Subject: [PATCH 3/5] Update docs --- doc/configuration.md | 1 + doc/daemon.md | 1 + doc/features.md | 1 + doc/manpage/ljd.1.md | 3 +- doc/overview.md | 1 + doc/parquet/export-parquet.md | 90 +++++++++++++++++++++++++++++++++++ 6 files changed, 96 insertions(+), 1 deletion(-) create mode 100644 doc/parquet/export-parquet.md diff --git a/doc/configuration.md b/doc/configuration.md index 6e04f2c..568c44c 100644 --- a/doc/configuration.md +++ b/doc/configuration.md @@ -360,6 +360,7 @@ Important: - default is `error` - applies to decoded OTLP log batches +- metrics and traces do not carry severity in OTLP, so they are always treated as lowest priority during overload - wire ingest records do not carry OTLP severity, so they are treated as low priority during overload ### `ingest.overload-report-ms` diff --git a/doc/daemon.md b/doc/daemon.md index c85d346..e73dc72 100644 --- a/doc/daemon.md +++ b/doc/daemon.md @@ -141,6 +141,7 @@ upstream. - `backpressure.mode: block` waits for the collector reply instead of timing out - `backpressure.mode: drop-newest` keeps the bridge live and drops newest records when the export queue is full - `backpressure.max-buffered-records` caps the bridge-side exporter queue per bridge connection +- emits periodic backlog depth and per-signal drop counters to stderr when `backpressure.mode: drop-newest` triggers - can optionally use TLS with `tls.*` - collector export can use OTLP/HTTP, HTTPS, and plain OTLP/gRPC - `collector.*` TLS settings apply to HTTPS collector export diff --git a/doc/features.md b/doc/features.md index c39d516..35a04e0 100644 --- a/doc/features.md +++ b/doc/features.md @@ -121,6 +121,7 @@ Current behaviour: - bridge export can block or disconnect when the collector is too slow, when `backpressure.enabled: true` - bridge export can also drop newest records explicitly when `backpressure.mode: drop-newest` - bridge export queue depth can be capped through `backpressure.max-buffered-records` +- emits periodic backlog depth and per-signal drop counters to stderr when records are dropped This is the current path for: diff --git a/doc/manpage/ljd.1.md b/doc/manpage/ljd.1.md index 84e88aa..2dccf2f 100644 --- a/doc/manpage/ljd.1.md +++ b/doc/manpage/ljd.1.md @@ -275,7 +275,7 @@ Rules: - `ingest.max-batch-bytes` rejects oversized OTLP or wire payloads before they are stored - `ingest.max-clients` caps concurrent ingest handling - `ingest.max-batches-per-second` caps accepted ingest batches per second -- `ingest.priority-severity-at-least` lets higher-severity OTLP log batches bypass overload shedding +- `ingest.priority-severity-at-least` lets higher-severity OTLP log batches bypass overload shedding; metrics and traces do not carry severity and are treated as lowest priority - `ingest.overload-report-ms` controls operator-visible overload summaries on stderr - `replay.max-clients` caps concurrent replay clients - `replay.client-timeout-ms` caps how long one replay client can block on socket I/O @@ -291,6 +291,7 @@ Rules: - `backpressure.enabled` enables bridge backpressure policy handling - `backpressure.mode` configures whether bridge export blocks, disconnects, or drops newest records when the collector is too slow - `backpressure.max-buffered-records` caps the bridge-side exporter queue per bridge connection +- bridge emits periodic backlog depth and per-signal drop counters to stderr when records are dropped - `upstream.replay` configures the default bridge source - `upstream.mode` configures whether bridge keeps or drains upstream retained records - `upstream.state-file` stores persisted bridge resume state diff --git a/doc/overview.md b/doc/overview.md index d9791ee..dfdf590 100644 --- a/doc/overview.md +++ b/doc/overview.md @@ -31,6 +31,7 @@ The daemon provides: - optional persisted bridge resume state - basic backpressure policy on bridge export - bounded bridge-side exporter queue with `block`, `disconnect`, and `drop-newest` policy +- periodic backlog depth and per-signal drop counters on bridge export - basic ingest guardrails for payload size and concurrent clients - ingest rate limiting with severity-aware overload shedding - basic replay-client caps diff --git a/doc/parquet/export-parquet.md b/doc/parquet/export-parquet.md new file mode 100644 index 0000000..d21dc47 --- /dev/null +++ b/doc/parquet/export-parquet.md @@ -0,0 +1,90 @@ +# `ljx --export parquet` + +`ljx --export parquet` converts one `.logjet` file into a Parquet file through the external exporter plugin. + +## Plugin Discovery + +The Parquet exporter is a shared-library plugin (`libljx_parquet_exporter.so`) that implements the stable `liblogjet::export` C ABI. `ljx` discovers it via: + +1. `LJX_EXPORTER_PATH` environment variable (explicit paths or directories) +2. `./exporters` relative to working directory +3. Paths relative to the `ljx` executable +4. On Unix: `/usr/lib/logjet/exporters` and `/usr/lib/logjet` + +## Usage + +```bash +# Export one .logjet file to Parquet +ljx --export parquet input.logjet -o output.parquet --force + +# With plugin explicitly specified +LJX_EXPORTER_PATH=/usr/lib/logjet/exporters/libljx_parquet_exporter.so ljx --export parquet input.logjet -o output.parquet +``` + +## Unified Schema + +The Parquet exporter uses a single fixed schema that covers all three OTLP signal types. When a signal does not use a column, it is simply null. This guarantees a predictable output schema regardless of input content. + +| Column | Type | Signal | Description | +|---|---|---|---| +| `sequence` | `UInt64` (required) | All | Internal logjet sequence number | +| `timestamp_unix_ns` | `UInt64` (nullable) | All | Record timestamp in nanoseconds since epoch | +| `signal_type` | `Utf8` (required) | All | `"logs"`, `"metrics"`, or `"traces"` | +| `observed_timestamp_unix_ns` | `UInt64` (nullable) | Logs | Log record observed timestamp | +| `trace_id` | `Utf8` (nullable) | Logs, Traces | Hex-encoded trace ID | +| `span_id` | `Utf8` (nullable) | Logs, Traces | Hex-encoded span ID | +| `trace_flags` | `UInt32` (nullable) | Logs, Traces | Trace flags | +| `severity_number` | `Int32` (nullable) | Logs | OTLP severity number | +| `severity_text` | `Utf8` (nullable) | Logs | Severity text (e.g. `"INFO"`) | +| `body_kind` | `Utf8` (nullable) | Logs | `string`, `int`, `bool`, `double`, `bytes`, `array`, `kvlist`, `empty` | +| `body_string` | `Utf8` (nullable) | Logs | Body when it is a plain string | +| `body_json` | `Utf8` (nullable) | Logs | JSON representation of non-string bodies | +| `metric_name` | `Utf8` (nullable) | Metrics | Metric instrument name | +| `metric_description` | `Utf8` (nullable) | Metrics | Metric description | +| `metric_unit` | `Utf8` (nullable) | Metrics | Metric unit | +| `metric_type` | `Utf8` (nullable) | Metrics | `Gauge`, `Sum`, `Histogram`, `ExponentialHistogram`, `Summary`, `Unknown` | +| `metric_value_number` | `Float64` (nullable) | Metrics | Value for Gauge/Sum datapoints | +| `metric_value_count` | `UInt64` (nullable) | Metrics | Count for Histogram/Summary datapoints | +| `metric_value_sum` | `Float64` (nullable) | Metrics | Sum for Histogram/Summary datapoints | +| `is_monotonic` | `Boolean` (nullable) | Metrics | True for monotonic Sum metrics | +| `aggregation_temporality` | `Int32` (nullable) | Metrics | OTLP aggregation temporality code | +| `span_name` | `Utf8` (nullable) | Traces | Span name | +| `span_kind` | `Utf8` (nullable) | Traces | `Internal`, `Server`, `Client`, `Producer`, `Consumer` | +| `parent_span_id` | `Utf8` (nullable) | Traces | Hex-encoded parent span ID | +| `start_time_unix_ns` | `UInt64` (nullable) | Traces, Metrics | Start time in nanoseconds | +| `end_time_unix_ns` | `UInt64` (nullable) | Traces | End time in nanoseconds | +| `duration_ns` | `UInt64` (nullable) | Traces | Computed duration (`end - start`) | +| `status_code` | `Int32` (nullable) | Traces | OTLP span status code | +| `status_message` | `Utf8` (nullable) | Traces | Span status message | +| `service_name` | `Utf8` (nullable) | All | Extracted from `service.name` resource attribute | +| `scope_name` | `Utf8` (nullable) | All | Instrumentation scope name | +| `scope_version` | `Utf8` (nullable) | All | Instrumentation scope version | +| `resource_attributes_json` | `Utf8` (nullable) | All | JSON object of resource attributes | +| `scope_attributes_json` | `Utf8` (nullable) | All | JSON object of scope attributes | +| `log_attributes_json` | `Utf8` (nullable) | Logs | JSON object of log record attributes | +| `span_attributes_json` | `Utf8` (nullable) | Traces | JSON object of span attributes | +| `event_name` | `Utf8` (nullable) | Logs | Log record event name | + +## Design Decisions + +- **One row per datapoint / span / log record**: A single `.logjet` batch may produce multiple Parquet rows. This is the same granularity as NDJSON export. +- **Fixed schema**: All columns are always present. Null-only columns cost negligible space in Parquet due to run-length encoding of definition levels, and a fixed schema is far more convenient for downstream analytics tools. +- **No aggregation or enrichment**: Values are extracted as-is from the stored OTLP protobuf. No unit conversion, no histogram bucketing, no semantic interpretation. + +## Writer Options + +- `output.row-group-rows`: target rows per Parquet row group (default 8192) +- `output.compression`: `zstd` (default) or `uncompressed` + +## Limits + +- Histogram bucket boundaries and counts are not extracted into Parquet rows. Only `count` and `sum` are preserved. Use raw OTLP replay if you need full histogram detail. +- Metrics exemplars are not extracted. + +## Build + +```bash +cargo build -p ljx-parquet-exporter +``` + +The plugin is a `cdylib` and must be discoverable by `ljx` at runtime. From 0d501ad38648af184c4aa7e4044395ff009f7347 Mon Sep 17 00:00:00 2001 From: Bo Maryniuk Date: Sat, 16 May 2026 17:49:18 +0200 Subject: [PATCH 4/5] Lintfix --- logjetd/src/replay.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logjetd/src/replay.rs b/logjetd/src/replay.rs index c538224..5b6d6c1 100644 --- a/logjetd/src/replay.rs +++ b/logjetd/src/replay.rs @@ -340,7 +340,7 @@ impl BridgeStats { } fn should_log(&self) -> bool { - self.records_read.is_multiple_of(1_000) + self.records_read > 0 && self.records_read.is_multiple_of(1_000) } fn drop_summary(&self) -> String { From 3ce971fb21ff9685418052eebbb28f78173fddac Mon Sep 17 00:00:00 2001 From: Bo Maryniuk Date: Sat, 16 May 2026 17:49:28 +0200 Subject: [PATCH 5/5] Add replay UT --- logjetd/tests/unit/replay_utst.rs | 55 +++++++++++++++++++++++++++++-- 1 file changed, 53 insertions(+), 2 deletions(-) diff --git a/logjetd/tests/unit/replay_utst.rs b/logjetd/tests/unit/replay_utst.rs index de52e4b..e8bd98e 100644 --- a/logjetd/tests/unit/replay_utst.rs +++ b/logjetd/tests/unit/replay_utst.rs @@ -1,6 +1,6 @@ use super::{ - BridgeState, CollectorEndpoint, CollectorTransport, EnqueueOutcome, ExportTask, enqueue_export_task, parse_bridge_state, parse_content_length, - read_bridge_state, read_http_response, reconcile_bridge_state, signal_path_for_endpoint, write_bridge_state, + BridgeState, BridgeStats, CollectorEndpoint, CollectorTransport, EnqueueOutcome, ExportTask, enqueue_export_task, parse_bridge_state, + parse_content_length, read_bridge_state, read_http_response, reconcile_bridge_state, signal_path_for_endpoint, write_bridge_state, }; use crate::config::{BackpressureMode, CollectorConfig, UpstreamMode}; use crate::protocol::ReplayHello; @@ -214,3 +214,54 @@ fn signal_path_for_endpoint_defaults_from_empty_path() { assert_eq!(signal_path_for_endpoint("", logjet::RecordType::Metrics), "/v1/metrics"); assert_eq!(signal_path_for_endpoint("", logjet::RecordType::Traces), "/v1/traces"); } + +#[test] +fn bridge_stats_tracks_records_and_drops_by_signal() { + let mut stats = BridgeStats::new(); + assert_eq!(stats.records_read, 0); + assert_eq!(stats.drop_summary(), "drops=0"); + + stats.note_record(RecordType::Logs); + stats.note_record(RecordType::Metrics); + stats.note_record(RecordType::Traces); + assert_eq!(stats.records_read, 3); + + stats.note_drop(RecordType::Logs); + stats.note_drop(RecordType::Metrics); + stats.note_drop(RecordType::Traces); + stats.note_drop(RecordType::Logs); + assert_eq!(stats.drops_logs, 2); + assert_eq!(stats.drops_metrics, 1); + assert_eq!(stats.drops_traces, 1); + assert_eq!(stats.drops_events, 0); +} + +#[test] +fn bridge_stats_should_log_every_1000_records() { + let mut stats = BridgeStats::new(); + assert!(!stats.should_log()); + for _ in 0..999 { + stats.note_record(RecordType::Logs); + } + assert!(!stats.should_log()); + stats.note_record(RecordType::Logs); + assert!(stats.should_log()); +} + +#[test] +fn bridge_stats_drop_summary_includes_only_nonzero_signals() { + let mut stats = BridgeStats::new(); + stats.note_drop(RecordType::Metrics); + stats.note_drop(RecordType::Traces); + let summary = stats.drop_summary(); + assert!(summary.contains("metrics_drops=1")); + assert!(summary.contains("traces_drops=1")); + assert!(!summary.contains("logs_drops")); + assert!(!summary.contains("events_drops")); +} + +#[test] +fn bridge_stats_drop_summary_reports_zero_when_no_drops() { + let stats = BridgeStats::new(); + assert_eq!(stats.drop_summary(), "drops=0"); +}