Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions demo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ It also contains scenario demos under subdirectories:
- inspect rotated file segments and prune archived files deliberately
- [`parquet-export`](./parquet-export)
- generate about 5K BOFH log entries, then export that `.logjet` file to Parquet through the external exporter plugin
- [`parquet-metrics-export`](./parquet-metrics-export)
- generate OTLP metrics batches, ingest them into `ljd`, then export the `.logjet` file to Parquet
- [`parquet-traces-export`](./parquet-traces-export)
- generate OTLP traces batches, ingest them into `ljd`, then export the `.logjet` file to Parquet
- [`tui-view`](./tui-view)
- generate 1000 randomized log entries and open `ljx view` on the result
- [`metrics-view`](./metrics-view)
Expand Down
47 changes: 47 additions & 0 deletions demo/parquet-metrics-export/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Parquet Metrics Export Demo

Demo that captures OTLP metrics through `ljd`, stores them in a `.logjet` file, and exports that file to Parquet through the external exporter plugin.

It uses:

- `target/debug/ljd`
- `target/debug/metrics-emitter`
- `target/debug/ljx`
- `target/debug/libljx_parquet_exporter.so`

## Build

From the project root:

```bash
make demo
```

## Run

From this directory:

```bash
sh ./run-demo.sh
```

The script:

1. starts `ljd` in file mode listening on `127.0.0.1:4318`
2. sends 15 metric batches via `metrics-emitter`
3. stops `ljd` after flush
4. exports the resulting `.logjet` file to `./logs/metrics.parquet`
5. prints a DuckDB query to inspect the result

## Inspect with DuckDB

```bash
duckdb -c "SELECT signal_type, metric_name, metric_type, metric_value_number FROM read_parquet('./logs/metrics.parquet') LIMIT 10;"
```

## Notes

- the demo uses `--force` so reruns overwrite the previous output
- metrics are captured as OTLP `ExportMetricsServiceRequest` batches and stored raw
- the Parquet exporter flattens each datapoint into one row with metric-specific columns
- the unified schema includes all signal columns; unused columns are null for metrics
7 changes: 7 additions & 0 deletions demo/parquet-metrics-export/logjetd.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
output: file
file.path: ./logs
file.size: 100000
file.name: metrics.logjet
ingest.protocol: otlp-http
ingest.listen: 127.0.0.1:4318
replay.listen: 127.0.0.1:7002
65 changes: 65 additions & 0 deletions demo/parquet-metrics-export/run-demo.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#!/bin/sh
set -eu

SCRIPT_DIR=$(CDPATH= cd -- "$(dirname -- "$0")" && pwd)
TARGET_DIR="$SCRIPT_DIR/../../target/debug"
LJD="$TARGET_DIR/ljd"
EMITTER="$TARGET_DIR/metrics-emitter"
LJX="$TARGET_DIR/ljx"
PLUGIN="$TARGET_DIR/libljx_parquet_exporter.so"
CONFIG="$SCRIPT_DIR/logjetd.conf"
OUTPUT_DIR="$SCRIPT_DIR/logs"
OUTPUT_FILE="$OUTPUT_DIR/metrics.logjet"
PARQUET_FILE="$OUTPUT_DIR/metrics.parquet"

for path in "$LJD" "$EMITTER" "$LJX" "$PLUGIN"; do
if [ ! -x "$path" ]; then
echo "missing $path"
echo "build it first with: make demo"
exit 1
fi
done

cd "$SCRIPT_DIR"

mkdir -p "$OUTPUT_DIR"
rm -f "$OUTPUT_FILE" "$OUTPUT_DIR/metrics-"*.logjet "$OUTPUT_DIR/metrics.stream-id"

echo "starting ljd with config $CONFIG"
"$LJD" --config "$CONFIG" &
LJD_PID=$!

cleanup() {
if [ -n "${EMITTER_PID:-}" ]; then
kill "$EMITTER_PID" 2>/dev/null || true
wait "$EMITTER_PID" 2>/dev/null || true
fi
if [ -n "${LJD_PID:-}" ]; then
kill "$LJD_PID" 2>/dev/null || true
wait "$LJD_PID" 2>/dev/null || true
fi
}
trap cleanup EXIT INT TERM

sleep 1

METRIC_COUNT=15
echo "starting metrics-emitter toward 127.0.0.1:4318 ($METRIC_COUNT batches)"
"$EMITTER" 127.0.0.1:4318 "$METRIC_COUNT"

echo "emitter finished; giving ljd time to flush"
sleep 2

echo "stopping ljd"
kill "$LJD_PID" 2>/dev/null || true
wait "$LJD_PID" 2>/dev/null || true
LJD_PID=""

echo "exporting $OUTPUT_FILE to Parquet"
LJX_EXPORTER_PATH="$PLUGIN" "$LJX" --export parquet "$OUTPUT_FILE" -o "$PARQUET_FILE" --force

echo
echo "done: $PARQUET_FILE"
echo
echo "inspect with DuckDB:"
echo " duckdb -c \"SELECT signal_type, metric_name, metric_type, metric_value_number FROM read_parquet('$PARQUET_FILE') LIMIT 10;\""
47 changes: 47 additions & 0 deletions demo/parquet-traces-export/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Parquet Traces Export Demo

Demo that captures OTLP traces through `ljd`, stores them in a `.logjet` file, and exports that file to Parquet through the external exporter plugin.

It uses:

- `target/debug/ljd`
- `target/debug/traces-emitter`
- `target/debug/ljx`
- `target/debug/libljx_parquet_exporter.so`

## Build

From the project root:

```bash
make demo
```

## Run

From this directory:

```bash
sh ./run-demo.sh
```

The script:

1. starts `ljd` in file mode listening on `127.0.0.1:4318`
2. sends 10 trace batches via `traces-emitter`
3. stops `ljd` after flush
4. exports the resulting `.logjet` file to `./logs/traces.parquet`
5. prints a DuckDB query to inspect the result

## Inspect with DuckDB

```bash
duckdb -c "SELECT signal_type, span_name, span_kind, trace_id, duration_ns FROM read_parquet('./logs/traces.parquet') LIMIT 10;"
```

## Notes

- the demo uses `--force` so reruns overwrite the previous output
- traces are captured as OTLP `ExportTraceServiceRequest` batches and stored raw
- the Parquet exporter flattens each span into one row with trace-specific columns
- the unified schema includes all signal columns; unused columns are null for traces
7 changes: 7 additions & 0 deletions demo/parquet-traces-export/logjetd.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
output: file
file.path: ./logs
file.size: 100000
file.name: traces.logjet
ingest.protocol: otlp-http
ingest.listen: 127.0.0.1:4318
replay.listen: 127.0.0.1:7002
65 changes: 65 additions & 0 deletions demo/parquet-traces-export/run-demo.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#!/bin/sh
set -eu

SCRIPT_DIR=$(CDPATH= cd -- "$(dirname -- "$0")" && pwd)
TARGET_DIR="$SCRIPT_DIR/../../target/debug"
LJD="$TARGET_DIR/ljd"
EMITTER="$TARGET_DIR/traces-emitter"
LJX="$TARGET_DIR/ljx"
PLUGIN="$TARGET_DIR/libljx_parquet_exporter.so"
CONFIG="$SCRIPT_DIR/logjetd.conf"
OUTPUT_DIR="$SCRIPT_DIR/logs"
OUTPUT_FILE="$OUTPUT_DIR/traces.logjet"
PARQUET_FILE="$OUTPUT_DIR/traces.parquet"

for path in "$LJD" "$EMITTER" "$LJX" "$PLUGIN"; do
if [ ! -x "$path" ]; then
echo "missing $path"
echo "build it first with: make demo"
exit 1
fi
done

cd "$SCRIPT_DIR"

mkdir -p "$OUTPUT_DIR"
rm -f "$OUTPUT_FILE" "$OUTPUT_DIR/traces-"*.logjet "$OUTPUT_DIR/traces.stream-id"

echo "starting ljd with config $CONFIG"
"$LJD" --config "$CONFIG" &
LJD_PID=$!

cleanup() {
if [ -n "${EMITTER_PID:-}" ]; then
kill "$EMITTER_PID" 2>/dev/null || true
wait "$EMITTER_PID" 2>/dev/null || true
fi
if [ -n "${LJD_PID:-}" ]; then
kill "$LJD_PID" 2>/dev/null || true
wait "$LJD_PID" 2>/dev/null || true
fi
}
trap cleanup EXIT INT TERM

sleep 1

TRACE_COUNT=10
echo "starting traces-emitter toward 127.0.0.1:4318 ($TRACE_COUNT batches)"
"$EMITTER" 127.0.0.1:4318 "$TRACE_COUNT"

echo "emitter finished; giving ljd time to flush"
sleep 2

echo "stopping ljd"
kill "$LJD_PID" 2>/dev/null || true
wait "$LJD_PID" 2>/dev/null || true
LJD_PID=""

echo "exporting $OUTPUT_FILE to Parquet"
LJX_EXPORTER_PATH="$PLUGIN" "$LJX" --export parquet "$OUTPUT_FILE" -o "$PARQUET_FILE" --force

echo
echo "done: $PARQUET_FILE"
echo
echo "inspect with DuckDB:"
echo " duckdb -c \"SELECT signal_type, span_name, span_kind, trace_id, duration_ns FROM read_parquet('$PARQUET_FILE') LIMIT 10;\""
8 changes: 8 additions & 0 deletions liblogjet/src/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ pub const LJX_EXPORT_CAP_RECORD_METRICS: u64 = 1 << 2;
pub const LJX_EXPORT_CAP_RECORD_TRACES: u64 = 1 << 3;
/// Plugin understands OTLP `ExportLogsServiceRequest` payloads.
pub const LJX_EXPORT_CAP_PAYLOAD_OTLP_EXPORT_LOGS_REQUEST: u64 = 1 << 4;
/// Plugin understands OTLP `ExportMetricsServiceRequest` payloads.
pub const LJX_EXPORT_CAP_PAYLOAD_OTLP_EXPORT_METRICS_REQUEST: u64 = 1 << 5;
/// Plugin understands OTLP `ExportTraceServiceRequest` payloads.
pub const LJX_EXPORT_CAP_PAYLOAD_OTLP_EXPORT_TRACE_REQUEST: u64 = 1 << 6;

/// Record contains logs.
pub const LJX_RECORD_TYPE_LOGS: u32 = 1;
Expand All @@ -45,6 +49,10 @@ pub const LJX_RECORD_TYPE_EVENTS: u32 = 4;
pub const LJX_PAYLOAD_KIND_OPAQUE: u32 = 0;
/// Payload is an OTLP `ExportLogsServiceRequest` protobuf.
pub const LJX_PAYLOAD_KIND_OTLP_EXPORT_LOGS_REQUEST: u32 = 1;
/// Payload is an OTLP `ExportMetricsServiceRequest` protobuf.
pub const LJX_PAYLOAD_KIND_OTLP_EXPORT_METRICS_REQUEST: u32 = 2;
/// Payload is an OTLP `ExportTraceServiceRequest` protobuf.
pub const LJX_PAYLOAD_KIND_OTLP_EXPORT_TRACE_REQUEST: u32 = 3;

/// Pointer/length string view used by the exporter ABI.
#[repr(C)]
Expand Down
54 changes: 49 additions & 5 deletions ljx/src/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@ use std::path::{Path, PathBuf};

use libloading::Library;
use liblogjet::export::{
LJX_EXPORT_CAP_PAYLOAD_OTLP_EXPORT_LOGS_REQUEST, LJX_EXPORT_CAP_RECORD_LOGS, LJX_EXPORT_CAP_RECORD_METRICS, LJX_EXPORT_CAP_RECORD_TRACES,
LJX_EXPORT_CAP_PAYLOAD_OTLP_EXPORT_LOGS_REQUEST, LJX_EXPORT_CAP_PAYLOAD_OTLP_EXPORT_METRICS_REQUEST,
LJX_EXPORT_CAP_PAYLOAD_OTLP_EXPORT_TRACE_REQUEST, LJX_EXPORT_CAP_RECORD_LOGS, LJX_EXPORT_CAP_RECORD_METRICS, LJX_EXPORT_CAP_RECORD_TRACES,
LJX_EXPORT_CAP_STREAMING, LJX_EXPORT_STATUS_IO, LJX_EXPORT_STATUS_OK, LJX_EXPORTER_ABI_MAJOR, LJX_EXPORTER_ABI_MINOR,
LJX_EXPORTER_DESCRIPTOR_V1_SYMBOL, LjxAbiBytes, LjxAbiString, LjxExportHostV1, LjxExportInitV1, LjxExportRecordV1, LjxExporterCtx,
LjxExporterDescriptorV1,
};
use logjet::{LogjetReader, OwnedRecord, RecordType};
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use prost::Message;

use crate::error::{Error, Result};
Expand Down Expand Up @@ -298,6 +301,28 @@ impl LoadedExporter {
record.seq
)));
}
if payload_kind == liblogjet::export::LJX_PAYLOAD_KIND_OTLP_EXPORT_METRICS_REQUEST
&& self.capabilities & LJX_EXPORT_CAP_PAYLOAD_OTLP_EXPORT_METRICS_REQUEST == 0
{
return Err(Error::Usage(format!(
"exporter `{}` from {} does not accept OTLP metrics payloads in {} at seq {}",
self.display_name,
self.path.display(),
input.display(),
record.seq
)));
}
if payload_kind == liblogjet::export::LJX_PAYLOAD_KIND_OTLP_EXPORT_TRACE_REQUEST
&& self.capabilities & LJX_EXPORT_CAP_PAYLOAD_OTLP_EXPORT_TRACE_REQUEST == 0
{
return Err(Error::Usage(format!(
"exporter `{}` from {} does not accept OTLP traces payloads in {} at seq {}",
self.display_name,
self.path.display(),
input.display(),
record.seq
)));
}
Ok(())
}

Expand Down Expand Up @@ -566,10 +591,29 @@ fn abi_record_type(value: RecordType) -> u32 {
}

fn payload_kind(record: &OwnedRecord) -> u32 {
if record.record_type == RecordType::Logs && ExportLogsServiceRequest::decode(record.payload.as_slice()).is_ok() {
liblogjet::export::LJX_PAYLOAD_KIND_OTLP_EXPORT_LOGS_REQUEST
} else {
liblogjet::export::LJX_PAYLOAD_KIND_OPAQUE
match record.record_type {
RecordType::Logs => {
if ExportLogsServiceRequest::decode(record.payload.as_slice()).is_ok() {
liblogjet::export::LJX_PAYLOAD_KIND_OTLP_EXPORT_LOGS_REQUEST
} else {
liblogjet::export::LJX_PAYLOAD_KIND_OPAQUE
}
}
RecordType::Metrics => {
if ExportMetricsServiceRequest::decode(record.payload.as_slice()).is_ok() {
liblogjet::export::LJX_PAYLOAD_KIND_OTLP_EXPORT_METRICS_REQUEST
} else {
liblogjet::export::LJX_PAYLOAD_KIND_OPAQUE
}
}
RecordType::Traces => {
if ExportTraceServiceRequest::decode(record.payload.as_slice()).is_ok() {
liblogjet::export::LJX_PAYLOAD_KIND_OTLP_EXPORT_TRACE_REQUEST
} else {
liblogjet::export::LJX_PAYLOAD_KIND_OPAQUE
}
}
RecordType::Events => liblogjet::export::LJX_PAYLOAD_KIND_OPAQUE,
}
}

Expand Down
2 changes: 1 addition & 1 deletion plugins/parquet-exporter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ crate-type = ["cdylib"]
arrow-array = "58.0.0"
arrow-schema = "58.0.0"
liblogjet = { path = "../../liblogjet" }
opentelemetry-proto = { version = "0.31", features = ["gen-tonic", "logs"] }
opentelemetry-proto = { version = "0.31", features = ["gen-tonic", "logs", "metrics", "trace"] }
parquet = { version = "58.0.0", default-features = false, features = ["arrow", "zstd"] }
prost = "0.14"
serde_json = "1"
Loading
Loading