Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ Important:

- default is `error`
- applies to decoded OTLP log batches
- metrics and traces do not carry severity in OTLP, so they are always treated as lowest priority during overload
- wire ingest records do not carry OTLP severity, so they are treated as low priority during overload

### `ingest.overload-report-ms`
Expand Down
1 change: 1 addition & 0 deletions doc/daemon.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ upstream.
- `backpressure.mode: block` waits for the collector reply instead of timing out
- `backpressure.mode: drop-newest` keeps the bridge live and drops newest records when the export queue is full
- `backpressure.max-buffered-records` caps the bridge-side exporter queue per bridge connection
- emits periodic backlog depth and per-signal drop counters to stderr when `backpressure.mode: drop-newest` triggers
- can optionally use TLS with `tls.*`
- collector export can use OTLP/HTTP, HTTPS, and plain OTLP/gRPC
- `collector.*` TLS settings apply to HTTPS collector export
Expand Down
1 change: 1 addition & 0 deletions doc/features.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ Current behaviour:
- bridge export can block or disconnect when the collector is too slow, when `backpressure.enabled: true`
- bridge export can also drop newest records explicitly when `backpressure.mode: drop-newest`
- bridge export queue depth can be capped through `backpressure.max-buffered-records`
- emits periodic backlog depth and per-signal drop counters to stderr when records are dropped

This is the current path for:

Expand Down
3 changes: 2 additions & 1 deletion doc/manpage/ljd.1.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ Rules:
- `ingest.max-batch-bytes` rejects oversized OTLP or wire payloads before they are stored
- `ingest.max-clients` caps concurrent ingest handling
- `ingest.max-batches-per-second` caps accepted ingest batches per second
- `ingest.priority-severity-at-least` lets higher-severity OTLP log batches bypass overload shedding
- `ingest.priority-severity-at-least` lets higher-severity OTLP log batches bypass overload shedding; metrics and traces do not carry severity and are treated as lowest priority
- `ingest.overload-report-ms` controls operator-visible overload summaries on stderr
- `replay.max-clients` caps concurrent replay clients
- `replay.client-timeout-ms` caps how long one replay client can block on socket I/O
Expand All @@ -291,6 +291,7 @@ Rules:
- `backpressure.enabled` enables bridge backpressure policy handling
- `backpressure.mode` configures whether bridge export blocks, disconnects, or drops newest records when the collector is too slow
- `backpressure.max-buffered-records` caps the bridge-side exporter queue per bridge connection
- bridge emits periodic backlog depth and per-signal drop counters to stderr when records are dropped
- `upstream.replay` configures the default bridge source
- `upstream.mode` configures whether bridge keeps or drains upstream retained records
- `upstream.state-file` stores persisted bridge resume state
Expand Down
1 change: 1 addition & 0 deletions doc/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ The daemon provides:
- optional persisted bridge resume state
- basic backpressure policy on bridge export
- bounded bridge-side exporter queue with `block`, `disconnect`, and `drop-newest` policy
- periodic backlog depth and per-signal drop counters on bridge export
- basic ingest guardrails for payload size and concurrent clients
- ingest rate limiting with severity-aware overload shedding
- basic replay-client caps
Expand Down
90 changes: 90 additions & 0 deletions doc/parquet/export-parquet.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# `ljx --export parquet`

`ljx --export parquet` converts one `.logjet` file into a Parquet file through the external exporter plugin.

## Plugin Discovery

The Parquet exporter is a shared-library plugin (`libljx_parquet_exporter.so`) that implements the stable `liblogjet::export` C ABI. `ljx` discovers it via:

1. `LJX_EXPORTER_PATH` environment variable (explicit paths or directories)
2. `./exporters` relative to working directory
3. Paths relative to the `ljx` executable
4. On Unix: `/usr/lib/logjet/exporters` and `/usr/lib/logjet`

## Usage

```bash
# Export one .logjet file to Parquet
ljx --export parquet input.logjet -o output.parquet --force

# With plugin explicitly specified
LJX_EXPORTER_PATH=/usr/lib/logjet/exporters/libljx_parquet_exporter.so ljx --export parquet input.logjet -o output.parquet
```

## Unified Schema

The Parquet exporter uses a single fixed schema that covers all three OTLP signal types. When a signal does not use a column, it is simply null. This guarantees a predictable output schema regardless of input content.

| Column | Type | Signal | Description |
|---|---|---|---|
| `sequence` | `UInt64` (required) | All | Internal logjet sequence number |
| `timestamp_unix_ns` | `UInt64` (nullable) | All | Record timestamp in nanoseconds since epoch |
| `signal_type` | `Utf8` (required) | All | `"logs"`, `"metrics"`, or `"traces"` |
| `observed_timestamp_unix_ns` | `UInt64` (nullable) | Logs | Log record observed timestamp |
| `trace_id` | `Utf8` (nullable) | Logs, Traces | Hex-encoded trace ID |
| `span_id` | `Utf8` (nullable) | Logs, Traces | Hex-encoded span ID |
| `trace_flags` | `UInt32` (nullable) | Logs, Traces | Trace flags |
| `severity_number` | `Int32` (nullable) | Logs | OTLP severity number |
| `severity_text` | `Utf8` (nullable) | Logs | Severity text (e.g. `"INFO"`) |
| `body_kind` | `Utf8` (nullable) | Logs | `string`, `int`, `bool`, `double`, `bytes`, `array`, `kvlist`, `empty` |
| `body_string` | `Utf8` (nullable) | Logs | Body when it is a plain string |
| `body_json` | `Utf8` (nullable) | Logs | JSON representation of non-string bodies |
| `metric_name` | `Utf8` (nullable) | Metrics | Metric instrument name |
| `metric_description` | `Utf8` (nullable) | Metrics | Metric description |
| `metric_unit` | `Utf8` (nullable) | Metrics | Metric unit |
| `metric_type` | `Utf8` (nullable) | Metrics | `Gauge`, `Sum`, `Histogram`, `ExponentialHistogram`, `Summary`, `Unknown` |
| `metric_value_number` | `Float64` (nullable) | Metrics | Value for Gauge/Sum datapoints |
| `metric_value_count` | `UInt64` (nullable) | Metrics | Count for Histogram/Summary datapoints |
| `metric_value_sum` | `Float64` (nullable) | Metrics | Sum for Histogram/Summary datapoints |
| `is_monotonic` | `Boolean` (nullable) | Metrics | True for monotonic Sum metrics |
| `aggregation_temporality` | `Int32` (nullable) | Metrics | OTLP aggregation temporality code |
| `span_name` | `Utf8` (nullable) | Traces | Span name |
| `span_kind` | `Utf8` (nullable) | Traces | `Internal`, `Server`, `Client`, `Producer`, `Consumer` |
| `parent_span_id` | `Utf8` (nullable) | Traces | Hex-encoded parent span ID |
| `start_time_unix_ns` | `UInt64` (nullable) | Traces, Metrics | Start time in nanoseconds |
| `end_time_unix_ns` | `UInt64` (nullable) | Traces | End time in nanoseconds |
| `duration_ns` | `UInt64` (nullable) | Traces | Computed duration (`end - start`) |
| `status_code` | `Int32` (nullable) | Traces | OTLP span status code |
| `status_message` | `Utf8` (nullable) | Traces | Span status message |
| `service_name` | `Utf8` (nullable) | All | Extracted from `service.name` resource attribute |
| `scope_name` | `Utf8` (nullable) | All | Instrumentation scope name |
| `scope_version` | `Utf8` (nullable) | All | Instrumentation scope version |
| `resource_attributes_json` | `Utf8` (nullable) | All | JSON object of resource attributes |
| `scope_attributes_json` | `Utf8` (nullable) | All | JSON object of scope attributes |
| `log_attributes_json` | `Utf8` (nullable) | Logs | JSON object of log record attributes |
| `span_attributes_json` | `Utf8` (nullable) | Traces | JSON object of span attributes |
| `event_name` | `Utf8` (nullable) | Logs | Log record event name |

## Design Decisions

- **One row per datapoint / span / log record**: A single `.logjet` batch may produce multiple Parquet rows. This is the same granularity as NDJSON export.
- **Fixed schema**: All columns are always present. Null-only columns cost negligible space in Parquet due to run-length encoding of definition levels, and a fixed schema is far more convenient for downstream analytics tools.
- **No aggregation or enrichment**: Values are extracted as-is from the stored OTLP protobuf. No unit conversion, no histogram bucketing, no semantic interpretation.

## Writer Options

- `output.row-group-rows`: target rows per Parquet row group (default 8192)
- `output.compression`: `zstd` (default) or `uncompressed`

## Limits

- Histogram bucket boundaries and counts are not extracted into Parquet rows. Only `count` and `sum` are preserved. Use raw OTLP replay if you need full histogram detail.
- Metrics exemplars are not extracted.

## Build

```bash
cargo build -p ljx-parquet-exporter
```

The plugin is a `cdylib` and must be discoverable by `ljx` at runtime.
16 changes: 16 additions & 0 deletions logjetd/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,11 @@ fn ingest_loop(
if is_metrics {
match ExportMetricsServiceRequest::decode(body.as_slice()) {
Ok(batch) => {
// Metrics have no severity concept in OTLP, so they always classify as
// BatchPriority::Unknown (lowest priority). This is semantically correct:
// during overload, severity-aware shedding protects high-priority logs
// while metrics are treated as best-effort. If metrics must survive
// overload, increase ingest.max-batches-per-second or use buffer/file mode.
let decision = ingest_policy.decide(BatchPriority::Unknown)?;
if matches!(decision, IngestDecision::RejectRateLimited) {
let response = Response::from_string("rate limit exceeded").with_status_code(StatusCode(429));
Expand All @@ -325,6 +330,11 @@ fn ingest_loop(
} else if is_traces {
match ExportTraceServiceRequest::decode(body.as_slice()) {
Ok(batch) => {
// Traces have no severity concept in OTLP, so they always classify as
// BatchPriority::Unknown (lowest priority). This is semantically correct:
// during overload, severity-aware shedding protects high-priority logs
// while traces are treated as best-effort. If traces must survive
// overload, increase ingest.max-batches-per-second or use buffer/file mode.
let decision = ingest_policy.decide(BatchPriority::Unknown)?;
if matches!(decision, IngestDecision::RejectRateLimited) {
let response = Response::from_string("rate limit exceeded").with_status_code(StatusCode(429));
Expand Down Expand Up @@ -955,6 +965,9 @@ struct OtlpGrpcTracesService {
impl TraceService for OtlpGrpcTracesService {
async fn export(&self, request: Request<ExportTraceServiceRequest>) -> Result<GrpcResponse<ExportTraceServiceResponse>, Status> {
let batch = request.into_inner();
// Traces have no severity concept in OTLP, so they always classify as
// BatchPriority::Unknown (lowest priority). See HTTP ingest comment above
// for the full rationale on metrics/traces rate-limiting policy.
match self.ingest_policy.decide(BatchPriority::Unknown).map_err(|err| Status::internal(err.to_string()))? {
IngestDecision::Accept | IngestDecision::AcceptPriorityBypass => {}
IngestDecision::RejectRateLimited => {
Expand Down Expand Up @@ -986,6 +999,9 @@ struct OtlpGrpcMetricsService {
impl MetricsService for OtlpGrpcMetricsService {
async fn export(&self, request: Request<ExportMetricsServiceRequest>) -> Result<GrpcResponse<ExportMetricsServiceResponse>, Status> {
let batch = request.into_inner();
// Metrics have no severity concept in OTLP, so they always classify as
// BatchPriority::Unknown (lowest priority). See HTTP ingest comment above
// for the full rationale on metrics/traces rate-limiting policy.
match self.ingest_policy.decide(BatchPriority::Unknown).map_err(|err| Status::internal(err.to_string()))? {
IngestDecision::Accept | IngestDecision::AcceptPriorityBypass => {}
IngestDecision::RejectRateLimited => {
Expand Down
59 changes: 57 additions & 2 deletions logjetd/src/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,23 +140,34 @@ fn bridge_transport<T: io::Read + io::Write>(
let worker_transport = collector_transport.clone();
let exporter = thread::spawn(move || export_worker(worker_transport, task_rx, result_tx));
let mut pending = std::collections::VecDeque::new();
let mut stats = BridgeStats::new();

while let Some(record) = read_record(transport)? {
flush_ready_results(transport, state, state_file, consume, &mut pending, &result_rx, false)?;
stats.note_record(record.record_type);

let seq = record.seq;
match enqueue_export_task(&task_tx, collector_transport, ExportTask { seq, record_type: record.record_type, payload: record.payload }) {
Ok(EnqueueOutcome::Queued) => pending.push_back(PendingExport::Queued(seq)),
Ok(EnqueueOutcome::DroppedNewest) => pending.push_back(PendingExport::Dropped(seq)),
Ok(EnqueueOutcome::DroppedNewest) => {
stats.note_drop(record.record_type);
pending.push_back(PendingExport::Dropped(seq));
}
Err(err) => {
stats.log_summary();
drop(task_tx);
let _ = exporter.join();
return Err(err);
}
}

if stats.should_log() {
eprintln!("bridge backlog depth={} records_read={} {}", pending.len(), stats.records_read, stats.drop_summary());
}
}

drop(task_tx);
stats.log_summary();
flush_ready_results(transport, state, state_file, consume, &mut pending, &result_rx, true)?;
match exporter.join() {
Ok(Ok(())) => Ok(()),
Expand All @@ -181,7 +192,7 @@ fn enqueue_export_task(
BackpressureMode::DropNewest => match task_tx.try_send(task) {
Ok(()) => Ok(EnqueueOutcome::Queued),
Err(mpsc::TrySendError::Full(task)) => {
eprintln!("bridge dropping seq={} because collector export buffer is full (mode=drop-newest)", task.seq);
eprintln!("bridge dropping seq={} type={:?} because collector export buffer is full (mode=drop-newest)", task.seq, task.record_type);
Ok(EnqueueOutcome::DroppedNewest)
}
Err(mpsc::TrySendError::Disconnected(_)) => Err(io::Error::new(io::ErrorKind::BrokenPipe, "collector export worker stopped")),
Expand Down Expand Up @@ -302,6 +313,50 @@ struct BridgeState {
last_seq: u64,
}

struct BridgeStats {
records_read: u64,
drops_logs: u64,
drops_metrics: u64,
drops_traces: u64,
drops_events: u64,
}

impl BridgeStats {
fn new() -> Self {
Self { records_read: 0, drops_logs: 0, drops_metrics: 0, drops_traces: 0, drops_events: 0 }
}

fn note_record(&mut self, _record_type: logjet::RecordType) {
self.records_read += 1;
}

fn note_drop(&mut self, record_type: logjet::RecordType) {
match record_type {
logjet::RecordType::Logs => self.drops_logs += 1,
logjet::RecordType::Metrics => self.drops_metrics += 1,
logjet::RecordType::Traces => self.drops_traces += 1,
logjet::RecordType::Events => self.drops_events += 1,
}
}

fn should_log(&self) -> bool {
self.records_read > 0 && self.records_read.is_multiple_of(1_000)
}

fn drop_summary(&self) -> String {
let mut parts = Vec::new();
if self.drops_logs > 0 { parts.push(format!("logs_drops={}", self.drops_logs)); }
if self.drops_metrics > 0 { parts.push(format!("metrics_drops={}", self.drops_metrics)); }
if self.drops_traces > 0 { parts.push(format!("traces_drops={}", self.drops_traces)); }
if self.drops_events > 0 { parts.push(format!("events_drops={}", self.drops_events)); }
if parts.is_empty() { "drops=0".to_string() } else { parts.join(" ") }
}

fn log_summary(&self) {
eprintln!("bridge stats: records_read={} {}", self.records_read, self.drop_summary());
}
}

fn read_bridge_state(path: Option<&Path>) -> io::Result<BridgeState> {
let Some(path) = path else {
return Ok(BridgeState { stream_id: None, last_seq: 0 });
Expand Down
55 changes: 53 additions & 2 deletions logjetd/tests/unit/replay_utst.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{
BridgeState, CollectorEndpoint, CollectorTransport, EnqueueOutcome, ExportTask, enqueue_export_task, parse_bridge_state, parse_content_length,
read_bridge_state, read_http_response, reconcile_bridge_state, signal_path_for_endpoint, write_bridge_state,
BridgeState, BridgeStats, CollectorEndpoint, CollectorTransport, EnqueueOutcome, ExportTask, enqueue_export_task, parse_bridge_state,
parse_content_length, read_bridge_state, read_http_response, reconcile_bridge_state, signal_path_for_endpoint, write_bridge_state,
};
use crate::config::{BackpressureMode, CollectorConfig, UpstreamMode};
use crate::protocol::ReplayHello;
Expand Down Expand Up @@ -214,3 +214,54 @@ fn signal_path_for_endpoint_defaults_from_empty_path() {
assert_eq!(signal_path_for_endpoint("", logjet::RecordType::Metrics), "/v1/metrics");
assert_eq!(signal_path_for_endpoint("", logjet::RecordType::Traces), "/v1/traces");
}

#[test]
fn bridge_stats_tracks_records_and_drops_by_signal() {
let mut stats = BridgeStats::new();
assert_eq!(stats.records_read, 0);
assert_eq!(stats.drop_summary(), "drops=0");

stats.note_record(RecordType::Logs);
stats.note_record(RecordType::Metrics);
stats.note_record(RecordType::Traces);
assert_eq!(stats.records_read, 3);

stats.note_drop(RecordType::Logs);
stats.note_drop(RecordType::Metrics);
stats.note_drop(RecordType::Traces);
stats.note_drop(RecordType::Logs);
assert_eq!(stats.drops_logs, 2);
assert_eq!(stats.drops_metrics, 1);
assert_eq!(stats.drops_traces, 1);
assert_eq!(stats.drops_events, 0);
}

#[test]
fn bridge_stats_should_log_every_1000_records() {
let mut stats = BridgeStats::new();
assert!(!stats.should_log());
for _ in 0..999 {
stats.note_record(RecordType::Logs);
}
assert!(!stats.should_log());
stats.note_record(RecordType::Logs);
assert!(stats.should_log());
}

#[test]
fn bridge_stats_drop_summary_includes_only_nonzero_signals() {
let mut stats = BridgeStats::new();
stats.note_drop(RecordType::Metrics);
stats.note_drop(RecordType::Traces);
let summary = stats.drop_summary();
assert!(summary.contains("metrics_drops=1"));
assert!(summary.contains("traces_drops=1"));
assert!(!summary.contains("logs_drops"));
assert!(!summary.contains("events_drops"));
}

#[test]
fn bridge_stats_drop_summary_reports_zero_when_no_drops() {
let stats = BridgeStats::new();
assert_eq!(stats.drop_summary(), "drops=0");
}
Loading