diff --git a/demo/README.md b/demo/README.md index 9368695..93b908f 100644 --- a/demo/README.md +++ b/demo/README.md @@ -43,6 +43,10 @@ It also contains scenario demos under subdirectories: - inspect rotated file segments and prune archived files deliberately - [`parquet-export`](./parquet-export) - generate about 5K BOFH log entries, then export that `.logjet` file to Parquet through the external exporter plugin +- [`parquet-metrics-export`](./parquet-metrics-export) + - generate OTLP metrics batches, ingest them into `ljd`, then export the `.logjet` file to Parquet +- [`parquet-traces-export`](./parquet-traces-export) + - generate OTLP traces batches, ingest them into `ljd`, then export the `.logjet` file to Parquet - [`tui-view`](./tui-view) - generate 1000 randomized log entries and open `ljx view` on the result - [`metrics-view`](./metrics-view) diff --git a/demo/parquet-metrics-export/README.md b/demo/parquet-metrics-export/README.md new file mode 100644 index 0000000..f627c31 --- /dev/null +++ b/demo/parquet-metrics-export/README.md @@ -0,0 +1,47 @@ +# Parquet Metrics Export Demo + +Demo that captures OTLP metrics through `ljd`, stores them in a `.logjet` file, and exports that file to Parquet through the external exporter plugin. + +It uses: + +- `target/debug/ljd` +- `target/debug/metrics-emitter` +- `target/debug/ljx` +- `target/debug/libljx_parquet_exporter.so` + +## Build + +From the project root: + +```bash +make demo +``` + +## Run + +From this directory: + +```bash +sh ./run-demo.sh +``` + +The script: + +1. starts `ljd` in file mode listening on `127.0.0.1:4318` +2. sends 15 metric batches via `metrics-emitter` +3. stops `ljd` after flush +4. exports the resulting `.logjet` file to `./logs/metrics.parquet` +5. prints a DuckDB query to inspect the result + +## Inspect with DuckDB + +```bash +duckdb -c "SELECT signal_type, metric_name, metric_type, metric_value_number FROM read_parquet('./logs/metrics.parquet') LIMIT 10;" +``` + +## Notes + +- the demo uses `--force` so reruns overwrite the previous output +- metrics are captured as OTLP `ExportMetricsServiceRequest` batches and stored raw +- the Parquet exporter flattens each datapoint into one row with metric-specific columns +- the unified schema includes all signal columns; unused columns are null for metrics diff --git a/demo/parquet-metrics-export/logjetd.conf b/demo/parquet-metrics-export/logjetd.conf new file mode 100644 index 0000000..cb44657 --- /dev/null +++ b/demo/parquet-metrics-export/logjetd.conf @@ -0,0 +1,7 @@ +output: file +file.path: ./logs +file.size: 100000 +file.name: metrics.logjet +ingest.protocol: otlp-http +ingest.listen: 127.0.0.1:4318 +replay.listen: 127.0.0.1:7002 diff --git a/demo/parquet-metrics-export/run-demo.sh b/demo/parquet-metrics-export/run-demo.sh new file mode 100755 index 0000000..682285f --- /dev/null +++ b/demo/parquet-metrics-export/run-demo.sh @@ -0,0 +1,65 @@ +#!/bin/sh +set -eu + +SCRIPT_DIR=$(CDPATH= cd -- "$(dirname -- "$0")" && pwd) +TARGET_DIR="$SCRIPT_DIR/../../target/debug" +LJD="$TARGET_DIR/ljd" +EMITTER="$TARGET_DIR/metrics-emitter" +LJX="$TARGET_DIR/ljx" +PLUGIN="$TARGET_DIR/libljx_parquet_exporter.so" +CONFIG="$SCRIPT_DIR/logjetd.conf" +OUTPUT_DIR="$SCRIPT_DIR/logs" +OUTPUT_FILE="$OUTPUT_DIR/metrics.logjet" +PARQUET_FILE="$OUTPUT_DIR/metrics.parquet" + +for path in "$LJD" "$EMITTER" "$LJX" "$PLUGIN"; do + if [ ! -x "$path" ]; then + echo "missing $path" + echo "build it first with: make demo" + exit 1 + fi +done + +cd "$SCRIPT_DIR" + +mkdir -p "$OUTPUT_DIR" +rm -f "$OUTPUT_FILE" "$OUTPUT_DIR/metrics-"*.logjet "$OUTPUT_DIR/metrics.stream-id" + +echo "starting ljd with config $CONFIG" +"$LJD" --config "$CONFIG" & +LJD_PID=$! + +cleanup() { + if [ -n "${EMITTER_PID:-}" ]; then + kill "$EMITTER_PID" 2>/dev/null || true + wait "$EMITTER_PID" 2>/dev/null || true + fi + if [ -n "${LJD_PID:-}" ]; then + kill "$LJD_PID" 2>/dev/null || true + wait "$LJD_PID" 2>/dev/null || true + fi +} +trap cleanup EXIT INT TERM + +sleep 1 + +METRIC_COUNT=15 +echo "starting metrics-emitter toward 127.0.0.1:4318 ($METRIC_COUNT batches)" +"$EMITTER" 127.0.0.1:4318 "$METRIC_COUNT" + +echo "emitter finished; giving ljd time to flush" +sleep 2 + +echo "stopping ljd" +kill "$LJD_PID" 2>/dev/null || true +wait "$LJD_PID" 2>/dev/null || true +LJD_PID="" + +echo "exporting $OUTPUT_FILE to Parquet" +LJX_EXPORTER_PATH="$PLUGIN" "$LJX" --export parquet "$OUTPUT_FILE" -o "$PARQUET_FILE" --force + +echo +echo "done: $PARQUET_FILE" +echo +echo "inspect with DuckDB:" +echo " duckdb -c \"SELECT signal_type, metric_name, metric_type, metric_value_number FROM read_parquet('$PARQUET_FILE') LIMIT 10;\"" diff --git a/demo/parquet-traces-export/README.md b/demo/parquet-traces-export/README.md new file mode 100644 index 0000000..4f6813e --- /dev/null +++ b/demo/parquet-traces-export/README.md @@ -0,0 +1,47 @@ +# Parquet Traces Export Demo + +Demo that captures OTLP traces through `ljd`, stores them in a `.logjet` file, and exports that file to Parquet through the external exporter plugin. + +It uses: + +- `target/debug/ljd` +- `target/debug/traces-emitter` +- `target/debug/ljx` +- `target/debug/libljx_parquet_exporter.so` + +## Build + +From the project root: + +```bash +make demo +``` + +## Run + +From this directory: + +```bash +sh ./run-demo.sh +``` + +The script: + +1. starts `ljd` in file mode listening on `127.0.0.1:4318` +2. sends 10 trace batches via `traces-emitter` +3. stops `ljd` after flush +4. exports the resulting `.logjet` file to `./logs/traces.parquet` +5. prints a DuckDB query to inspect the result + +## Inspect with DuckDB + +```bash +duckdb -c "SELECT signal_type, span_name, span_kind, trace_id, duration_ns FROM read_parquet('./logs/traces.parquet') LIMIT 10;" +``` + +## Notes + +- the demo uses `--force` so reruns overwrite the previous output +- traces are captured as OTLP `ExportTraceServiceRequest` batches and stored raw +- the Parquet exporter flattens each span into one row with trace-specific columns +- the unified schema includes all signal columns; unused columns are null for traces diff --git a/demo/parquet-traces-export/logjetd.conf b/demo/parquet-traces-export/logjetd.conf new file mode 100644 index 0000000..5580b78 --- /dev/null +++ b/demo/parquet-traces-export/logjetd.conf @@ -0,0 +1,7 @@ +output: file +file.path: ./logs +file.size: 100000 +file.name: traces.logjet +ingest.protocol: otlp-http +ingest.listen: 127.0.0.1:4318 +replay.listen: 127.0.0.1:7002 diff --git a/demo/parquet-traces-export/run-demo.sh b/demo/parquet-traces-export/run-demo.sh new file mode 100755 index 0000000..b1f62eb --- /dev/null +++ b/demo/parquet-traces-export/run-demo.sh @@ -0,0 +1,65 @@ +#!/bin/sh +set -eu + +SCRIPT_DIR=$(CDPATH= cd -- "$(dirname -- "$0")" && pwd) +TARGET_DIR="$SCRIPT_DIR/../../target/debug" +LJD="$TARGET_DIR/ljd" +EMITTER="$TARGET_DIR/traces-emitter" +LJX="$TARGET_DIR/ljx" +PLUGIN="$TARGET_DIR/libljx_parquet_exporter.so" +CONFIG="$SCRIPT_DIR/logjetd.conf" +OUTPUT_DIR="$SCRIPT_DIR/logs" +OUTPUT_FILE="$OUTPUT_DIR/traces.logjet" +PARQUET_FILE="$OUTPUT_DIR/traces.parquet" + +for path in "$LJD" "$EMITTER" "$LJX" "$PLUGIN"; do + if [ ! -x "$path" ]; then + echo "missing $path" + echo "build it first with: make demo" + exit 1 + fi +done + +cd "$SCRIPT_DIR" + +mkdir -p "$OUTPUT_DIR" +rm -f "$OUTPUT_FILE" "$OUTPUT_DIR/traces-"*.logjet "$OUTPUT_DIR/traces.stream-id" + +echo "starting ljd with config $CONFIG" +"$LJD" --config "$CONFIG" & +LJD_PID=$! + +cleanup() { + if [ -n "${EMITTER_PID:-}" ]; then + kill "$EMITTER_PID" 2>/dev/null || true + wait "$EMITTER_PID" 2>/dev/null || true + fi + if [ -n "${LJD_PID:-}" ]; then + kill "$LJD_PID" 2>/dev/null || true + wait "$LJD_PID" 2>/dev/null || true + fi +} +trap cleanup EXIT INT TERM + +sleep 1 + +TRACE_COUNT=10 +echo "starting traces-emitter toward 127.0.0.1:4318 ($TRACE_COUNT batches)" +"$EMITTER" 127.0.0.1:4318 "$TRACE_COUNT" + +echo "emitter finished; giving ljd time to flush" +sleep 2 + +echo "stopping ljd" +kill "$LJD_PID" 2>/dev/null || true +wait "$LJD_PID" 2>/dev/null || true +LJD_PID="" + +echo "exporting $OUTPUT_FILE to Parquet" +LJX_EXPORTER_PATH="$PLUGIN" "$LJX" --export parquet "$OUTPUT_FILE" -o "$PARQUET_FILE" --force + +echo +echo "done: $PARQUET_FILE" +echo +echo "inspect with DuckDB:" +echo " duckdb -c \"SELECT signal_type, span_name, span_kind, trace_id, duration_ns FROM read_parquet('$PARQUET_FILE') LIMIT 10;\"" diff --git a/liblogjet/src/export.rs b/liblogjet/src/export.rs index 627ab07..aec8e3f 100644 --- a/liblogjet/src/export.rs +++ b/liblogjet/src/export.rs @@ -31,6 +31,10 @@ pub const LJX_EXPORT_CAP_RECORD_METRICS: u64 = 1 << 2; pub const LJX_EXPORT_CAP_RECORD_TRACES: u64 = 1 << 3; /// Plugin understands OTLP `ExportLogsServiceRequest` payloads. pub const LJX_EXPORT_CAP_PAYLOAD_OTLP_EXPORT_LOGS_REQUEST: u64 = 1 << 4; +/// Plugin understands OTLP `ExportMetricsServiceRequest` payloads. +pub const LJX_EXPORT_CAP_PAYLOAD_OTLP_EXPORT_METRICS_REQUEST: u64 = 1 << 5; +/// Plugin understands OTLP `ExportTraceServiceRequest` payloads. +pub const LJX_EXPORT_CAP_PAYLOAD_OTLP_EXPORT_TRACE_REQUEST: u64 = 1 << 6; /// Record contains logs. pub const LJX_RECORD_TYPE_LOGS: u32 = 1; @@ -45,6 +49,10 @@ pub const LJX_RECORD_TYPE_EVENTS: u32 = 4; pub const LJX_PAYLOAD_KIND_OPAQUE: u32 = 0; /// Payload is an OTLP `ExportLogsServiceRequest` protobuf. pub const LJX_PAYLOAD_KIND_OTLP_EXPORT_LOGS_REQUEST: u32 = 1; +/// Payload is an OTLP `ExportMetricsServiceRequest` protobuf. +pub const LJX_PAYLOAD_KIND_OTLP_EXPORT_METRICS_REQUEST: u32 = 2; +/// Payload is an OTLP `ExportTraceServiceRequest` protobuf. +pub const LJX_PAYLOAD_KIND_OTLP_EXPORT_TRACE_REQUEST: u32 = 3; /// Pointer/length string view used by the exporter ABI. #[repr(C)] diff --git a/ljx/src/exporter.rs b/ljx/src/exporter.rs index a3d7af6..7885f62 100644 --- a/ljx/src/exporter.rs +++ b/ljx/src/exporter.rs @@ -13,13 +13,16 @@ use std::path::{Path, PathBuf}; use libloading::Library; use liblogjet::export::{ - LJX_EXPORT_CAP_PAYLOAD_OTLP_EXPORT_LOGS_REQUEST, LJX_EXPORT_CAP_RECORD_LOGS, LJX_EXPORT_CAP_RECORD_METRICS, LJX_EXPORT_CAP_RECORD_TRACES, + LJX_EXPORT_CAP_PAYLOAD_OTLP_EXPORT_LOGS_REQUEST, LJX_EXPORT_CAP_PAYLOAD_OTLP_EXPORT_METRICS_REQUEST, + LJX_EXPORT_CAP_PAYLOAD_OTLP_EXPORT_TRACE_REQUEST, LJX_EXPORT_CAP_RECORD_LOGS, LJX_EXPORT_CAP_RECORD_METRICS, LJX_EXPORT_CAP_RECORD_TRACES, LJX_EXPORT_CAP_STREAMING, LJX_EXPORT_STATUS_IO, LJX_EXPORT_STATUS_OK, LJX_EXPORTER_ABI_MAJOR, LJX_EXPORTER_ABI_MINOR, LJX_EXPORTER_DESCRIPTOR_V1_SYMBOL, LjxAbiBytes, LjxAbiString, LjxExportHostV1, LjxExportInitV1, LjxExportRecordV1, LjxExporterCtx, LjxExporterDescriptorV1, }; use logjet::{LogjetReader, OwnedRecord, RecordType}; use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; +use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; +use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use prost::Message; use crate::error::{Error, Result}; @@ -298,6 +301,28 @@ impl LoadedExporter { record.seq ))); } + if payload_kind == liblogjet::export::LJX_PAYLOAD_KIND_OTLP_EXPORT_METRICS_REQUEST + && self.capabilities & LJX_EXPORT_CAP_PAYLOAD_OTLP_EXPORT_METRICS_REQUEST == 0 + { + return Err(Error::Usage(format!( + "exporter `{}` from {} does not accept OTLP metrics payloads in {} at seq {}", + self.display_name, + self.path.display(), + input.display(), + record.seq + ))); + } + if payload_kind == liblogjet::export::LJX_PAYLOAD_KIND_OTLP_EXPORT_TRACE_REQUEST + && self.capabilities & LJX_EXPORT_CAP_PAYLOAD_OTLP_EXPORT_TRACE_REQUEST == 0 + { + return Err(Error::Usage(format!( + "exporter `{}` from {} does not accept OTLP traces payloads in {} at seq {}", + self.display_name, + self.path.display(), + input.display(), + record.seq + ))); + } Ok(()) } @@ -566,10 +591,29 @@ fn abi_record_type(value: RecordType) -> u32 { } fn payload_kind(record: &OwnedRecord) -> u32 { - if record.record_type == RecordType::Logs && ExportLogsServiceRequest::decode(record.payload.as_slice()).is_ok() { - liblogjet::export::LJX_PAYLOAD_KIND_OTLP_EXPORT_LOGS_REQUEST - } else { - liblogjet::export::LJX_PAYLOAD_KIND_OPAQUE + match record.record_type { + RecordType::Logs => { + if ExportLogsServiceRequest::decode(record.payload.as_slice()).is_ok() { + liblogjet::export::LJX_PAYLOAD_KIND_OTLP_EXPORT_LOGS_REQUEST + } else { + liblogjet::export::LJX_PAYLOAD_KIND_OPAQUE + } + } + RecordType::Metrics => { + if ExportMetricsServiceRequest::decode(record.payload.as_slice()).is_ok() { + liblogjet::export::LJX_PAYLOAD_KIND_OTLP_EXPORT_METRICS_REQUEST + } else { + liblogjet::export::LJX_PAYLOAD_KIND_OPAQUE + } + } + RecordType::Traces => { + if ExportTraceServiceRequest::decode(record.payload.as_slice()).is_ok() { + liblogjet::export::LJX_PAYLOAD_KIND_OTLP_EXPORT_TRACE_REQUEST + } else { + liblogjet::export::LJX_PAYLOAD_KIND_OPAQUE + } + } + RecordType::Events => liblogjet::export::LJX_PAYLOAD_KIND_OPAQUE, } } diff --git a/plugins/parquet-exporter/Cargo.toml b/plugins/parquet-exporter/Cargo.toml index 50a1118..ce982cc 100644 --- a/plugins/parquet-exporter/Cargo.toml +++ b/plugins/parquet-exporter/Cargo.toml @@ -11,7 +11,7 @@ crate-type = ["cdylib"] arrow-array = "58.0.0" arrow-schema = "58.0.0" liblogjet = { path = "../../liblogjet" } -opentelemetry-proto = { version = "0.31", features = ["gen-tonic", "logs"] } +opentelemetry-proto = { version = "0.31", features = ["gen-tonic", "logs", "metrics", "trace"] } parquet = { version = "58.0.0", default-features = false, features = ["arrow", "zstd"] } prost = "0.14" serde_json = "1" diff --git a/plugins/parquet-exporter/src/lib.rs b/plugins/parquet-exporter/src/lib.rs index e95ad74..123d710 100644 --- a/plugins/parquet-exporter/src/lib.rs +++ b/plugins/parquet-exporter/src/lib.rs @@ -3,6 +3,10 @@ //! This crate implements the stable `liblogjet::export` C ABI directly. //! The host streams raw logjet records into `write_record`, and the plugin //! emits a Parquet file through the host-provided write/flush callbacks. +//! +//! Supports all three OTLP signal types: logs, metrics, traces. +//! Uses a single fixed-column Arrow schema that covers every field from every +//! signal; when a signal does not use a column it is simply null everywhere. use std::ffi::{c_char, c_void}; use std::io::{self, Write}; @@ -12,14 +16,19 @@ use arrow_array::builder::{Int32Builder, StringBuilder, UInt32Builder, UInt64Bui use arrow_array::{ArrayRef, RecordBatch}; use arrow_schema::{DataType, Field, Schema, SchemaRef}; use liblogjet::export::{ - LJX_EXPORT_CAP_PAYLOAD_OTLP_EXPORT_LOGS_REQUEST, LJX_EXPORT_CAP_RECORD_LOGS, LJX_EXPORT_CAP_STREAMING, LJX_EXPORT_STATUS_BAD_ARG, - LJX_EXPORT_STATUS_ERROR, LJX_EXPORT_STATUS_IO, LJX_EXPORT_STATUS_OK, LJX_EXPORT_STATUS_UNSUPPORTED, LJX_EXPORTER_ABI_MAJOR, - LJX_EXPORTER_ABI_MINOR, LJX_PAYLOAD_KIND_OTLP_EXPORT_LOGS_REQUEST, LJX_RECORD_TYPE_LOGS, LjxAbiBytes, LjxAbiString, LjxExportHostV1, - LjxExportInitV1, LjxExportRecordV1, LjxExporterCtx, LjxExporterDescriptorV1, + LJX_EXPORT_CAP_PAYLOAD_OTLP_EXPORT_LOGS_REQUEST, LJX_EXPORT_CAP_PAYLOAD_OTLP_EXPORT_METRICS_REQUEST, + LJX_EXPORT_CAP_PAYLOAD_OTLP_EXPORT_TRACE_REQUEST, LJX_EXPORT_CAP_RECORD_LOGS, LJX_EXPORT_CAP_RECORD_METRICS, LJX_EXPORT_CAP_RECORD_TRACES, + LJX_EXPORT_CAP_STREAMING, LJX_EXPORT_STATUS_BAD_ARG, LJX_EXPORT_STATUS_ERROR, LJX_EXPORT_STATUS_IO, LJX_EXPORT_STATUS_OK, + LJX_EXPORT_STATUS_UNSUPPORTED, LJX_EXPORTER_ABI_MAJOR, LJX_EXPORTER_ABI_MINOR, LJX_PAYLOAD_KIND_OTLP_EXPORT_LOGS_REQUEST, + LJX_PAYLOAD_KIND_OTLP_EXPORT_METRICS_REQUEST, LJX_PAYLOAD_KIND_OTLP_EXPORT_TRACE_REQUEST, LJX_RECORD_TYPE_LOGS, LJX_RECORD_TYPE_METRICS, + LJX_RECORD_TYPE_TRACES, LjxAbiBytes, LjxAbiString, LjxExportHostV1, LjxExportInitV1, LjxExportRecordV1, LjxExporterCtx, LjxExporterDescriptorV1, }; use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; +use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; +use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use opentelemetry_proto::tonic::common::v1::any_value::Value; use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue}; +use opentelemetry_proto::tonic::metrics::v1::metric::Data as MetricData; use parquet::arrow::arrow_writer::ArrowWriter; use parquet::basic::Compression; use parquet::file::properties::WriterProperties; @@ -39,7 +48,13 @@ static PARQUET_EXPORTER_DESCRIPTOR: ExporterDescriptor = ExporterDescriptor(LjxE abi_major: LJX_EXPORTER_ABI_MAJOR, abi_minor: LJX_EXPORTER_ABI_MINOR, plugin_api_version: PLUGIN_API_VERSION, - capabilities: LJX_EXPORT_CAP_STREAMING | LJX_EXPORT_CAP_RECORD_LOGS | LJX_EXPORT_CAP_PAYLOAD_OTLP_EXPORT_LOGS_REQUEST, + capabilities: LJX_EXPORT_CAP_STREAMING + | LJX_EXPORT_CAP_RECORD_LOGS + | LJX_EXPORT_CAP_RECORD_METRICS + | LJX_EXPORT_CAP_RECORD_TRACES + | LJX_EXPORT_CAP_PAYLOAD_OTLP_EXPORT_LOGS_REQUEST + | LJX_EXPORT_CAP_PAYLOAD_OTLP_EXPORT_METRICS_REQUEST + | LJX_EXPORT_CAP_PAYLOAD_OTLP_EXPORT_TRACE_REQUEST, format_name: LjxAbiString::from_static("parquet"), display_name: LjxAbiString::from_static("Parquet"), default_extension: LjxAbiString::from_static("parquet"), @@ -120,29 +135,41 @@ impl ParquetExporter { if record.struct_size < std::mem::size_of::() as u32 { return self.fail_status( LJX_EXPORT_STATUS_BAD_ARG, - format!("record struct_size {} is smaller than host ABI expects {}", record.struct_size, std::mem::size_of::()), + format!( + "record struct_size {} is smaller than host ABI expects {}", + record.struct_size, + std::mem::size_of::() + ), ); } if record.payload.ptr.is_null() && record.payload.len != 0 { return self.fail_status(LJX_EXPORT_STATUS_BAD_ARG, "record payload pointer is null but len is non-zero"); } - if record.record_type != LJX_RECORD_TYPE_LOGS { - return self.fail_status( - LJX_EXPORT_STATUS_UNSUPPORTED, - format!("unsupported record type {}; parquet exporter currently supports logs only", record.record_type), - ); + + let payload = match abi_bytes(record.payload) { + Ok(payload) => payload, + Err(err) => return self.fail_status(LJX_EXPORT_STATUS_BAD_ARG, err), + }; + + match record.record_type { + LJX_RECORD_TYPE_LOGS => self.write_logs_record(record, payload), + LJX_RECORD_TYPE_METRICS => self.write_metrics_record(record, payload), + LJX_RECORD_TYPE_TRACES => self.write_traces_record(record, payload), + other => self.fail_status(LJX_EXPORT_STATUS_UNSUPPORTED, format!("unsupported record type {other}")), } + } + + fn write_logs_record(&mut self, record: &LjxExportRecordV1, payload: &[u8]) -> i32 { if record.payload_kind != LJX_PAYLOAD_KIND_OTLP_EXPORT_LOGS_REQUEST { return self.fail_status( LJX_EXPORT_STATUS_UNSUPPORTED, - format!("unsupported payload kind {}; parquet exporter currently supports OTLP ExportLogsServiceRequest only", record.payload_kind), + format!( + "unsupported payload kind {}; parquet exporter currently supports OTLP ExportLogsServiceRequest only for logs", + record.payload_kind + ), ); } - let payload = match abi_bytes(record.payload) { - Ok(payload) => payload, - Err(err) => return self.fail_status(LJX_EXPORT_STATUS_BAD_ARG, err), - }; let request = match ExportLogsServiceRequest::decode(payload) { Ok(request) => request, Err(err) => return self.fail_status(LJX_EXPORT_STATUS_ERROR, format!("failed to decode OTLP logs payload at seq {}: {err}", record.seq)), @@ -160,22 +187,41 @@ impl ParquetExporter { for log_record in &scope_logs.log_records { let row = ParquetRow { sequence: record.seq, - timestamp_unix_ns: record.timestamp_unix_ns, + timestamp_unix_ns: Some(record.timestamp_unix_ns), + signal_type: "logs".to_string(), observed_timestamp_unix_ns: zero_is_none(log_record.observed_time_unix_nano), trace_id: bytes_to_lower_hex(&log_record.trace_id), span_id: bytes_to_lower_hex(&log_record.span_id), trace_flags: zero_is_none_u32(log_record.flags), severity_number: (log_record.severity_number != 0).then_some(log_record.severity_number), severity_text: non_empty_owned(&log_record.severity_text), - body_kind: body_kind(log_record.body.as_ref()), + body_kind: Some(body_kind(log_record.body.as_ref())), body_string: body_string(log_record.body.as_ref()), body_json: body_json(log_record.body.as_ref()), + metric_name: None, + metric_description: None, + metric_unit: None, + metric_type: None, + metric_value_number: None, + metric_value_count: None, + metric_value_sum: None, + is_monotonic: None, + aggregation_temporality: None, + span_name: None, + span_kind: None, + parent_span_id: None, + start_time_unix_ns: None, + end_time_unix_ns: None, + duration_ns: None, + status_code: None, + status_message: None, service_name: service_name.clone(), scope_name: scope_name.map(str::to_owned), scope_version: scope_version.map(str::to_owned), resource_attributes_json: resource_attributes_json.clone(), scope_attributes_json: scope_attributes_json.clone(), log_attributes_json: attrs_to_json(&log_record.attributes), + span_attributes_json: None, event_name: non_empty_owned(&log_record.event_name), }; self.rows.push(row); @@ -183,6 +229,239 @@ impl ParquetExporter { } } + self.maybe_flush() + } + + fn write_metrics_record(&mut self, record: &LjxExportRecordV1, payload: &[u8]) -> i32 { + if record.payload_kind != LJX_PAYLOAD_KIND_OTLP_EXPORT_METRICS_REQUEST { + return self.fail_status( + LJX_EXPORT_STATUS_UNSUPPORTED, + format!( + "unsupported payload kind {}; parquet exporter currently supports OTLP ExportMetricsServiceRequest only for metrics", + record.payload_kind + ), + ); + } + + let request = match ExportMetricsServiceRequest::decode(payload) { + Ok(request) => request, + Err(err) => { + return self.fail_status(LJX_EXPORT_STATUS_ERROR, format!("failed to decode OTLP metrics payload at seq {}: {err}", record.seq)) + } + }; + + for resource_metrics in &request.resource_metrics { + let resource_attrs = resource_metrics.resource.as_ref().map(|resource| resource.attributes.as_slice()).unwrap_or(&[]); + let service_name = find_attr_string(resource_attrs, "service.name"); + let resource_attributes_json = attrs_to_json(resource_attrs); + for scope_metrics in &resource_metrics.scope_metrics { + let scope = scope_metrics.scope.as_ref(); + let scope_name = scope.and_then(|scope| non_empty(scope.name.as_str())); + let scope_version = scope.and_then(|scope| non_empty(scope.version.as_str())); + let scope_attributes_json = scope.and_then(|scope| attrs_to_json(scope.attributes.as_slice())); + + for metric in &scope_metrics.metrics { + let metric_name = non_empty_owned(metric.name.as_str()); + let metric_description = non_empty_owned(metric.description.as_str()); + let metric_unit = non_empty_owned(metric.unit.as_str()); + + if let Some(ref data) = metric.data { + match data { + MetricData::Gauge(gauge) => { + for dp in &gauge.data_points { + let mut row = self.base_metrics_row(record, &service_name, &resource_attributes_json, scope_name, scope_version, &scope_attributes_json, &metric_name, &metric_description, &metric_unit, "Gauge"); + row.metric_value_number = dp.value.as_ref().and_then(number_data_point_value_f64); + row.timestamp_unix_ns = Some(dp.time_unix_nano.max(record.timestamp_unix_ns)); + row.start_time_unix_ns = zero_is_none(dp.start_time_unix_nano); + row.scope_attributes_json = attrs_to_json(&dp.attributes); + self.rows.push(row); + } + } + MetricData::Sum(sum) => { + for dp in &sum.data_points { + let mut row = self.base_metrics_row(record, &service_name, &resource_attributes_json, scope_name, scope_version, &scope_attributes_json, &metric_name, &metric_description, &metric_unit, "Sum"); + row.metric_value_number = dp.value.as_ref().and_then(number_data_point_value_f64); + row.timestamp_unix_ns = Some(dp.time_unix_nano.max(record.timestamp_unix_ns)); + row.start_time_unix_ns = zero_is_none(dp.start_time_unix_nano); + row.is_monotonic = Some(sum.is_monotonic); + row.aggregation_temporality = Some(sum.aggregation_temporality); + row.scope_attributes_json = attrs_to_json(&dp.attributes); + self.rows.push(row); + } + } + MetricData::Histogram(hist) => { + for dp in &hist.data_points { + let mut row = self.base_metrics_row(record, &service_name, &resource_attributes_json, scope_name, scope_version, &scope_attributes_json, &metric_name, &metric_description, &metric_unit, "Histogram"); + row.timestamp_unix_ns = Some(dp.time_unix_nano.max(record.timestamp_unix_ns)); + row.start_time_unix_ns = zero_is_none(dp.start_time_unix_nano); + row.metric_value_count = Some(dp.count); + row.metric_value_sum = dp.sum; + row.aggregation_temporality = Some(hist.aggregation_temporality); + row.scope_attributes_json = attrs_to_json(&dp.attributes); + self.rows.push(row); + } + } + MetricData::ExponentialHistogram(ehist) => { + for dp in &ehist.data_points { + let mut row = self.base_metrics_row(record, &service_name, &resource_attributes_json, scope_name, scope_version, &scope_attributes_json, &metric_name, &metric_description, &metric_unit, "ExponentialHistogram"); + row.timestamp_unix_ns = Some(dp.time_unix_nano.max(record.timestamp_unix_ns)); + row.start_time_unix_ns = zero_is_none(dp.start_time_unix_nano); + row.metric_value_count = Some(dp.count); + row.metric_value_sum = dp.sum; + row.aggregation_temporality = Some(ehist.aggregation_temporality); + row.scope_attributes_json = attrs_to_json(&dp.attributes); + self.rows.push(row); + } + } + MetricData::Summary(summary) => { + for dp in &summary.data_points { + let mut row = self.base_metrics_row(record, &service_name, &resource_attributes_json, scope_name, scope_version, &scope_attributes_json, &metric_name, &metric_description, &metric_unit, "Summary"); + row.timestamp_unix_ns = Some(dp.time_unix_nano.max(record.timestamp_unix_ns)); + row.start_time_unix_ns = zero_is_none(dp.start_time_unix_nano); + row.metric_value_count = Some(dp.count); + row.metric_value_sum = Some(dp.sum); + row.scope_attributes_json = attrs_to_json(&dp.attributes); + self.rows.push(row); + } + } + } + } else { + // Metric with no data: emit one row with metadata only + let row = self.base_metrics_row(record, &service_name, &resource_attributes_json, scope_name, scope_version, &scope_attributes_json, &metric_name, &metric_description, &metric_unit, "Unknown"); + self.rows.push(row); + } + } + } + } + + self.maybe_flush() + } + + fn write_traces_record(&mut self, record: &LjxExportRecordV1, payload: &[u8]) -> i32 { + if record.payload_kind != LJX_PAYLOAD_KIND_OTLP_EXPORT_TRACE_REQUEST { + return self.fail_status( + LJX_EXPORT_STATUS_UNSUPPORTED, + format!( + "unsupported payload kind {}; parquet exporter currently supports OTLP ExportTraceServiceRequest only for traces", + record.payload_kind + ), + ); + } + + let request = match ExportTraceServiceRequest::decode(payload) { + Ok(request) => request, + Err(err) => { + return self.fail_status(LJX_EXPORT_STATUS_ERROR, format!("failed to decode OTLP traces payload at seq {}: {err}", record.seq)) + } + }; + + for resource_spans in &request.resource_spans { + let resource_attrs = resource_spans.resource.as_ref().map(|resource| resource.attributes.as_slice()).unwrap_or(&[]); + let service_name = find_attr_string(resource_attrs, "service.name"); + let resource_attributes_json = attrs_to_json(resource_attrs); + for scope_spans in &resource_spans.scope_spans { + let scope = scope_spans.scope.as_ref(); + let scope_name = scope.and_then(|scope| non_empty(scope.name.as_str())); + let scope_version = scope.and_then(|scope| non_empty(scope.version.as_str())); + let scope_attributes_json = scope.and_then(|scope| attrs_to_json(scope.attributes.as_slice())); + + for span in &scope_spans.spans { + let duration_ns = span.end_time_unix_nano.saturating_sub(span.start_time_unix_nano); + let row = ParquetRow { + sequence: record.seq, + timestamp_unix_ns: Some(record.timestamp_unix_ns), + signal_type: "traces".to_string(), + observed_timestamp_unix_ns: None, + trace_id: bytes_to_lower_hex(&span.trace_id), + span_id: bytes_to_lower_hex(&span.span_id), + trace_flags: zero_is_none_u32(span.flags), + severity_number: None, + severity_text: None, + body_kind: None, + body_string: None, + body_json: None, + metric_name: None, + metric_description: None, + metric_unit: None, + metric_type: None, + metric_value_number: None, + metric_value_count: None, + metric_value_sum: None, + is_monotonic: None, + aggregation_temporality: None, + span_name: non_empty_owned(span.name.as_str()), + span_kind: Some(format_span_kind(span.kind)), + parent_span_id: bytes_to_lower_hex(&span.parent_span_id), + start_time_unix_ns: zero_is_none(span.start_time_unix_nano), + end_time_unix_ns: zero_is_none(span.end_time_unix_nano), + duration_ns: (duration_ns > 0).then_some(duration_ns), + status_code: span.status.as_ref().and_then(|s| (s.code != 0).then_some(s.code)), + status_message: span.status.as_ref().and_then(|s| non_empty_owned(&s.message)), + service_name: service_name.clone(), + scope_name: scope_name.map(str::to_owned), + scope_version: scope_version.map(str::to_owned), + resource_attributes_json: resource_attributes_json.clone(), + scope_attributes_json: scope_attributes_json.clone(), + log_attributes_json: None, + span_attributes_json: attrs_to_json(&span.attributes), + event_name: None, + }; + self.rows.push(row); + } + } + } + + self.maybe_flush() + } + + #[allow(clippy::too_many_arguments)] + fn base_metrics_row( + &self, record: &LjxExportRecordV1, service_name: &Option, resource_attributes_json: &Option, scope_name: Option<&str>, + scope_version: Option<&str>, scope_attributes_json: &Option, metric_name: &Option, metric_description: &Option, + metric_unit: &Option, metric_type: &str, + ) -> ParquetRow { + ParquetRow { + sequence: record.seq, + timestamp_unix_ns: Some(record.timestamp_unix_ns), + signal_type: "metrics".to_string(), + observed_timestamp_unix_ns: None, + trace_id: None, + span_id: None, + trace_flags: None, + severity_number: None, + severity_text: None, + body_kind: None, + body_string: None, + body_json: None, + metric_name: metric_name.clone(), + metric_description: metric_description.clone(), + metric_unit: metric_unit.clone(), + metric_type: Some(metric_type.to_string()), + metric_value_number: None, + metric_value_count: None, + metric_value_sum: None, + is_monotonic: None, + aggregation_temporality: None, + span_name: None, + span_kind: None, + parent_span_id: None, + start_time_unix_ns: None, + end_time_unix_ns: None, + duration_ns: None, + status_code: None, + status_message: None, + service_name: service_name.clone(), + scope_name: scope_name.map(str::to_owned), + scope_version: scope_version.map(str::to_owned), + resource_attributes_json: resource_attributes_json.clone(), + scope_attributes_json: scope_attributes_json.clone(), + log_attributes_json: None, + span_attributes_json: None, + event_name: None, + } + } + + fn maybe_flush(&mut self) -> i32 { if self.rows.len() >= self.cfg.row_group_rows && let Err(err) = self.flush_rows() { @@ -201,7 +480,7 @@ impl ParquetExporter { if let Err(err) = self.writer.finish() { return self.fail_status(status_for_message(&err.to_string()), format!("failed to finish parquet writer: {err}")); } - if let Err(err) = self.writer.sync() { + if let Err(err) = self.writer.flush() { return self.fail_status(LJX_EXPORT_STATUS_IO, format!("failed to flush host output after parquet finish: {err}")); } self.finished = true; @@ -217,7 +496,7 @@ impl ParquetExporter { self.writer.write(&batch).map_err(|err| format!("failed to write parquet batch: {err}"))?; if self.writer.in_progress_rows() >= self.cfg.row_group_rows { self.writer.flush().map_err(|err| format!("failed to flush parquet row group: {err}"))?; - self.writer.sync().map_err(|err| format!("failed to flush host output: {err}"))?; + self.writer.flush().map_err(|err| format!("failed to flush host output: {err}"))?; } Ok(()) } @@ -277,53 +556,92 @@ impl ParquetConfig { } } +struct ParquetRow { + sequence: u64, + timestamp_unix_ns: Option, + signal_type: String, + observed_timestamp_unix_ns: Option, + trace_id: Option, + span_id: Option, + trace_flags: Option, + severity_number: Option, + severity_text: Option, + body_kind: Option, + body_string: Option, + body_json: Option, + metric_name: Option, + metric_description: Option, + metric_unit: Option, + metric_type: Option, + metric_value_number: Option, + metric_value_count: Option, + metric_value_sum: Option, + is_monotonic: Option, + aggregation_temporality: Option, + span_name: Option, + span_kind: Option, + parent_span_id: Option, + start_time_unix_ns: Option, + end_time_unix_ns: Option, + duration_ns: Option, + status_code: Option, + status_message: Option, + service_name: Option, + scope_name: Option, + scope_version: Option, + resource_attributes_json: Option, + scope_attributes_json: Option, + log_attributes_json: Option, + span_attributes_json: Option, + event_name: Option, +} + #[derive(Default)] struct RowBuffer { sequence: Vec, - timestamp_unix_ns: Vec, + timestamp_unix_ns: Vec>, + signal_type: Vec, observed_timestamp_unix_ns: Vec>, trace_id: Vec>, span_id: Vec>, trace_flags: Vec>, severity_number: Vec>, severity_text: Vec>, - body_kind: Vec, + body_kind: Vec>, body_string: Vec>, body_json: Vec>, + metric_name: Vec>, + metric_description: Vec>, + metric_unit: Vec>, + metric_type: Vec>, + metric_value_number: Vec>, + metric_value_count: Vec>, + metric_value_sum: Vec>, + is_monotonic: Vec>, + aggregation_temporality: Vec>, + span_name: Vec>, + span_kind: Vec>, + parent_span_id: Vec>, + start_time_unix_ns: Vec>, + end_time_unix_ns: Vec>, + duration_ns: Vec>, + status_code: Vec>, + status_message: Vec>, service_name: Vec>, scope_name: Vec>, scope_version: Vec>, resource_attributes_json: Vec>, scope_attributes_json: Vec>, log_attributes_json: Vec>, + span_attributes_json: Vec>, event_name: Vec>, } -struct ParquetRow { - sequence: u64, - timestamp_unix_ns: u64, - observed_timestamp_unix_ns: Option, - trace_id: Option, - span_id: Option, - trace_flags: Option, - severity_number: Option, - severity_text: Option, - body_kind: String, - body_string: Option, - body_json: Option, - service_name: Option, - scope_name: Option, - scope_version: Option, - resource_attributes_json: Option, - scope_attributes_json: Option, - log_attributes_json: Option, - event_name: Option, -} - impl RowBuffer { fn push(&mut self, row: ParquetRow) { self.sequence.push(row.sequence); self.timestamp_unix_ns.push(row.timestamp_unix_ns); + self.signal_type.push(row.signal_type); self.observed_timestamp_unix_ns.push(row.observed_timestamp_unix_ns); self.trace_id.push(row.trace_id); self.span_id.push(row.span_id); @@ -333,12 +651,30 @@ impl RowBuffer { self.body_kind.push(row.body_kind); self.body_string.push(row.body_string); self.body_json.push(row.body_json); + self.metric_name.push(row.metric_name); + self.metric_description.push(row.metric_description); + self.metric_unit.push(row.metric_unit); + self.metric_type.push(row.metric_type); + self.metric_value_number.push(row.metric_value_number); + self.metric_value_count.push(row.metric_value_count); + self.metric_value_sum.push(row.metric_value_sum); + self.is_monotonic.push(row.is_monotonic); + self.aggregation_temporality.push(row.aggregation_temporality); + self.span_name.push(row.span_name); + self.span_kind.push(row.span_kind); + self.parent_span_id.push(row.parent_span_id); + self.start_time_unix_ns.push(row.start_time_unix_ns); + self.end_time_unix_ns.push(row.end_time_unix_ns); + self.duration_ns.push(row.duration_ns); + self.status_code.push(row.status_code); + self.status_message.push(row.status_message); self.service_name.push(row.service_name); self.scope_name.push(row.scope_name); self.scope_version.push(row.scope_version); self.resource_attributes_json.push(row.resource_attributes_json); self.scope_attributes_json.push(row.scope_attributes_json); self.log_attributes_json.push(row.log_attributes_json); + self.span_attributes_json.push(row.span_attributes_json); self.event_name.push(row.event_name); } @@ -353,22 +689,41 @@ impl RowBuffer { fn drain_into_batch(&mut self, schema: SchemaRef) -> Result { let arrays: Vec = vec![ Arc::new(u64_array(std::mem::take(&mut self.sequence))), - Arc::new(u64_array(std::mem::take(&mut self.timestamp_unix_ns))), + Arc::new(opt_u64_array(std::mem::take(&mut self.timestamp_unix_ns))), + Arc::new(string_array(std::mem::take(&mut self.signal_type))), Arc::new(opt_u64_array(std::mem::take(&mut self.observed_timestamp_unix_ns))), Arc::new(opt_string_array(std::mem::take(&mut self.trace_id))), Arc::new(opt_string_array(std::mem::take(&mut self.span_id))), Arc::new(opt_u32_array(std::mem::take(&mut self.trace_flags))), Arc::new(opt_i32_array(std::mem::take(&mut self.severity_number))), Arc::new(opt_string_array(std::mem::take(&mut self.severity_text))), - Arc::new(string_array(std::mem::take(&mut self.body_kind))), + Arc::new(opt_string_array(std::mem::take(&mut self.body_kind))), Arc::new(opt_string_array(std::mem::take(&mut self.body_string))), Arc::new(opt_string_array(std::mem::take(&mut self.body_json))), + Arc::new(opt_string_array(std::mem::take(&mut self.metric_name))), + Arc::new(opt_string_array(std::mem::take(&mut self.metric_description))), + Arc::new(opt_string_array(std::mem::take(&mut self.metric_unit))), + Arc::new(opt_string_array(std::mem::take(&mut self.metric_type))), + Arc::new(opt_f64_array(std::mem::take(&mut self.metric_value_number))), + Arc::new(opt_u64_array(std::mem::take(&mut self.metric_value_count))), + Arc::new(opt_f64_array(std::mem::take(&mut self.metric_value_sum))), + Arc::new(opt_bool_array(std::mem::take(&mut self.is_monotonic))), + Arc::new(opt_i32_array(std::mem::take(&mut self.aggregation_temporality))), + Arc::new(opt_string_array(std::mem::take(&mut self.span_name))), + Arc::new(opt_string_array(std::mem::take(&mut self.span_kind))), + Arc::new(opt_string_array(std::mem::take(&mut self.parent_span_id))), + Arc::new(opt_u64_array(std::mem::take(&mut self.start_time_unix_ns))), + Arc::new(opt_u64_array(std::mem::take(&mut self.end_time_unix_ns))), + Arc::new(opt_u64_array(std::mem::take(&mut self.duration_ns))), + Arc::new(opt_i32_array(std::mem::take(&mut self.status_code))), + Arc::new(opt_string_array(std::mem::take(&mut self.status_message))), Arc::new(opt_string_array(std::mem::take(&mut self.service_name))), Arc::new(opt_string_array(std::mem::take(&mut self.scope_name))), Arc::new(opt_string_array(std::mem::take(&mut self.scope_version))), Arc::new(opt_string_array(std::mem::take(&mut self.resource_attributes_json))), Arc::new(opt_string_array(std::mem::take(&mut self.scope_attributes_json))), Arc::new(opt_string_array(std::mem::take(&mut self.log_attributes_json))), + Arc::new(opt_string_array(std::mem::take(&mut self.span_attributes_json))), Arc::new(opt_string_array(std::mem::take(&mut self.event_name))), ]; RecordBatch::try_new(schema, arrays).map_err(|err| format!("failed to build parquet batch: {err}")) @@ -378,33 +733,52 @@ impl RowBuffer { fn schema() -> SchemaRef { Arc::new(Schema::new(vec![ Field::new("sequence", DataType::UInt64, false), - Field::new("timestamp_unix_ns", DataType::UInt64, false), + Field::new("timestamp_unix_ns", DataType::UInt64, true), + Field::new("signal_type", DataType::Utf8, false), Field::new("observed_timestamp_unix_ns", DataType::UInt64, true), Field::new("trace_id", DataType::Utf8, true), Field::new("span_id", DataType::Utf8, true), Field::new("trace_flags", DataType::UInt32, true), Field::new("severity_number", DataType::Int32, true), Field::new("severity_text", DataType::Utf8, true), - Field::new("body_kind", DataType::Utf8, false), - // `body_string` preserves the common string fast-path while `body_json` - // keeps non-string AnyValue bodies in a stable bounded representation. + Field::new("body_kind", DataType::Utf8, true), Field::new("body_string", DataType::Utf8, true), Field::new("body_json", DataType::Utf8, true), + Field::new("metric_name", DataType::Utf8, true), + Field::new("metric_description", DataType::Utf8, true), + Field::new("metric_unit", DataType::Utf8, true), + Field::new("metric_type", DataType::Utf8, true), + Field::new("metric_value_number", DataType::Float64, true), + Field::new("metric_value_count", DataType::UInt64, true), + Field::new("metric_value_sum", DataType::Float64, true), + Field::new("is_monotonic", DataType::Boolean, true), + Field::new("aggregation_temporality", DataType::Int32, true), + Field::new("span_name", DataType::Utf8, true), + Field::new("span_kind", DataType::Utf8, true), + Field::new("parent_span_id", DataType::Utf8, true), + Field::new("start_time_unix_ns", DataType::UInt64, true), + Field::new("end_time_unix_ns", DataType::UInt64, true), + Field::new("duration_ns", DataType::UInt64, true), + Field::new("status_code", DataType::Int32, true), + Field::new("status_message", DataType::Utf8, true), Field::new("service_name", DataType::Utf8, true), Field::new("scope_name", DataType::Utf8, true), Field::new("scope_version", DataType::Utf8, true), - // Resource, scope, and log attributes stay separate JSON text columns to - // avoid unbounded top-level column growth while preserving OTel meaning. Field::new("resource_attributes_json", DataType::Utf8, true), Field::new("scope_attributes_json", DataType::Utf8, true), Field::new("log_attributes_json", DataType::Utf8, true), + Field::new("span_attributes_json", DataType::Utf8, true), Field::new("event_name", DataType::Utf8, true), ])) } fn validate_host(host: &LjxExportHostV1) -> Result<(), String> { if host.struct_size < std::mem::size_of::() as u32 { - return Err(format!("host struct_size {} is smaller than plugin ABI expects {}", host.struct_size, std::mem::size_of::())); + return Err(format!( + "host struct_size {} is smaller than plugin ABI expects {}", + host.struct_size, + std::mem::size_of::() + )); } Ok(()) } @@ -434,7 +808,11 @@ fn abi_bytes<'a>(value: LjxAbiBytes) -> Result<&'a [u8], String> { } fn status_for_message(message: &str) -> i32 { - if message.contains("host callback") || message.contains("flush host output") { LJX_EXPORT_STATUS_IO } else { LJX_EXPORT_STATUS_ERROR } + if message.contains("host callback") || message.contains("flush host output") { + LJX_EXPORT_STATUS_IO + } else { + LJX_EXPORT_STATUS_ERROR + } } fn zero_is_none(value: u64) -> Option { @@ -472,6 +850,25 @@ fn nibble_to_hex(value: u8) -> char { } } +fn format_span_kind(kind: i32) -> String { + match kind { + 1 => "Internal", + 2 => "Server", + 3 => "Client", + 4 => "Producer", + 5 => "Consumer", + _ => return format!("Unknown({kind})"), + } + .to_string() +} + +fn number_data_point_value_f64(value: &opentelemetry_proto::tonic::metrics::v1::number_data_point::Value) -> Option { + match value { + opentelemetry_proto::tonic::metrics::v1::number_data_point::Value::AsDouble(v) => Some(*v), + opentelemetry_proto::tonic::metrics::v1::number_data_point::Value::AsInt(v) => Some(*v as f64), + } +} + fn body_kind(value: Option<&AnyValue>) -> String { match value.and_then(|value| value.value.as_ref()) { None => "empty", @@ -526,8 +923,10 @@ fn attrs_to_json(attrs: &[KeyValue]) -> Option { if attrs.is_empty() { return None; } - let mut pairs = - attrs.iter().filter_map(|attr| attr.value.as_ref().and_then(any_value_to_json).map(|value| (attr.key.clone(), value))).collect::>(); + let mut pairs = attrs + .iter() + .filter_map(|attr| attr.value.as_ref().and_then(any_value_to_json).map(|value| (attr.key.clone(), value))) + .collect::>(); if pairs.is_empty() { return None; } @@ -606,6 +1005,30 @@ fn opt_string_array(values: Vec>) -> arrow_array::StringArray { builder.finish() } +fn opt_f64_array(values: Vec>) -> arrow_array::Float64Array { + use arrow_array::builder::Float64Builder; + let mut builder = Float64Builder::new(); + for value in values { + match value { + Some(value) => builder.append_value(value), + None => builder.append_null(), + } + } + builder.finish() +} + +fn opt_bool_array(values: Vec>) -> arrow_array::BooleanArray { + use arrow_array::builder::BooleanBuilder; + let mut builder = BooleanBuilder::new(); + for value in values { + match value { + Some(value) => builder.append_value(value), + None => builder.append_null(), + } + } + builder.finish() +} + fn ctx_mut<'a>(ctx: *mut LjxExporterCtx) -> Result<&'a mut ParquetExporter, i32> { if ctx.is_null() { return Err(LJX_EXPORT_STATUS_BAD_ARG); diff --git a/tests/ljx_export.rs b/tests/ljx_export.rs index 8da65ab..1b0ff5e 100644 --- a/tests/ljx_export.rs +++ b/tests/ljx_export.rs @@ -7,9 +7,14 @@ use std::time::{SystemTime, UNIX_EPOCH}; use arrow_array::Array; use logjet::{Codec, LogjetReader, LogjetWriter, RecordType, WriterConfig}; use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; +use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; +use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use opentelemetry_proto::tonic::common::v1::any_value::Value; use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue}; use opentelemetry_proto::tonic::logs::v1::{LogRecord, ResourceLogs, ScopeLogs}; +use opentelemetry_proto::tonic::metrics::v1::metric::Data as MetricData; +use opentelemetry_proto::tonic::metrics::v1::{Gauge, Metric, NumberDataPoint, ResourceMetrics, ScopeMetrics}; +use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, ScopeSpans, Span, Status}; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use prost::Message; @@ -39,7 +44,7 @@ fn ljx_exports_cpp_demo_to_parquet_and_preserves_rows() -> io::Result<()> { actual.iter().filter_map(|row| row.service_name.clone()).collect::>(), expected.iter().filter_map(|row| row.service_name.clone()).collect::>() ); - assert!(actual.iter().all(|row| row.body_kind == "string" || row.body_kind == "empty" || row.body_json.is_some())); + assert!(actual.iter().all(|row| row.body_kind == Some("string".to_string()) || row.body_kind == Some("empty".to_string()) || row.body_json.is_some())); Ok(()) } @@ -102,6 +107,127 @@ fn ljx_exports_large_generated_input_to_parquet() -> io::Result<()> { Ok(()) } +#[test] +fn ljx_exports_metrics_to_parquet() -> io::Result<()> { + ensure_export_artifacts_exist()?; + + let dir = TestDir::new("ljx-export-metrics")?; + let input = dir.path().join("metrics.logjet"); + let output = dir.path().join("metrics.parquet"); + write_metrics_fixture(&input)?; + + let export = run_ljx_export(&input, &output)?; + if !export.status.success() { + return Err(io::Error::other(format!("metrics export failed: {}", String::from_utf8_lossy(&export.stderr)))); + } + + let actual = read_parquet_rows(&output)?; + assert_eq!(actual.len(), 3, "expected 3 metric datapoint rows (one per metric type)"); + + // First row: gauge + let gauge = &actual[0]; + assert_eq!(gauge.signal_type, Some("metrics".to_string())); + assert_eq!(gauge.metric_name, Some("cpu_usage".to_string())); + assert_eq!(gauge.metric_type, Some("Gauge".to_string())); + assert!(gauge.metric_value_number.is_some()); + assert_eq!(gauge.service_name, Some("metrics-service".to_string())); + + // Second row: sum + let sum = &actual[1]; + assert_eq!(sum.metric_name, Some("request_count".to_string())); + assert_eq!(sum.metric_type, Some("Sum".to_string())); + assert_eq!(sum.is_monotonic, Some(true)); + assert!(sum.metric_value_number.is_some()); + + // Third row: histogram + let hist = &actual[2]; + assert_eq!(hist.metric_name, Some("latency".to_string())); + assert_eq!(hist.metric_type, Some("Histogram".to_string())); + assert!(hist.metric_value_count.is_some()); + + Ok(()) +} + +#[test] +fn ljx_exports_traces_to_parquet() -> io::Result<()> { + ensure_export_artifacts_exist()?; + + let dir = TestDir::new("ljx-export-traces")?; + let input = dir.path().join("traces.logjet"); + let output = dir.path().join("traces.parquet"); + write_traces_fixture(&input)?; + + let export = run_ljx_export(&input, &output)?; + if !export.status.success() { + return Err(io::Error::other(format!("traces export failed: {}", String::from_utf8_lossy(&export.stderr)))); + } + + let actual = read_parquet_rows(&output)?; + assert_eq!(actual.len(), 2, "expected 2 span rows"); + + let span1 = &actual[0]; + assert_eq!(span1.signal_type, Some("traces".to_string())); + assert_eq!(span1.span_name, Some("GET /api/users".to_string())); + assert_eq!(span1.span_kind, Some("Server".to_string())); + assert!(span1.trace_id.is_some()); + assert!(span1.span_id.is_some()); + assert!(span1.duration_ns.is_some()); + assert_eq!(span1.service_name, Some("trace-service".to_string())); + + let span2 = &actual[1]; + assert_eq!(span2.span_name, Some("query_database".to_string())); + assert_eq!(span2.span_kind, Some("Client".to_string())); + assert!(span2.parent_span_id.is_some()); + + Ok(()) +} + +#[test] +fn ljx_exports_mixed_signals_to_parquet() -> io::Result<()> { + ensure_export_artifacts_exist()?; + + let dir = TestDir::new("ljx-export-mixed")?; + let input = dir.path().join("mixed.logjet"); + let output = dir.path().join("mixed.parquet"); + write_mixed_fixture(&input)?; + + let export = run_ljx_export(&input, &output)?; + if !export.status.success() { + return Err(io::Error::other(format!("mixed export failed: {}", String::from_utf8_lossy(&export.stderr)))); + } + + let actual = read_parquet_rows(&output)?; + assert_eq!(actual.len(), 6, "expected 6 rows: 1 log + 3 metrics + 2 traces"); + + let logs = actual.iter().filter(|r| r.signal_type == Some("logs".to_string())).collect::>(); + let metrics = actual.iter().filter(|r| r.signal_type == Some("metrics".to_string())).collect::>(); + let traces = actual.iter().filter(|r| r.signal_type == Some("traces".to_string())).collect::>(); + + assert_eq!(logs.len(), 1); + assert_eq!(metrics.len(), 3); + assert_eq!(traces.len(), 2); + + // Verify logs row has metrics/traces columns null + let log_row = logs[0]; + assert!(log_row.metric_name.is_none()); + assert!(log_row.span_name.is_none()); + assert!(log_row.body_string.is_some()); + + // Verify metrics row has logs/traces columns null + let metric_row = metrics[0]; + assert!(metric_row.body_string.is_none()); + assert!(metric_row.span_name.is_none()); + assert!(metric_row.metric_name.is_some()); + + // Verify traces row has logs/metrics columns null + let trace_row = traces[0]; + assert!(trace_row.body_string.is_none()); + assert!(trace_row.metric_name.is_none()); + assert!(trace_row.span_name.is_some()); + + Ok(()) +} + fn ensure_export_artifacts_exist() -> io::Result<()> { for path in [ljx_bin(), parquet_plugin_bin()] { if !path.is_file() { @@ -180,6 +306,44 @@ fn write_empty_logjet_fixture(path: &Path) -> io::Result<()> { Ok(()) } +fn write_metrics_fixture(path: &Path) -> io::Result<()> { + let file = File::create(path)?; + let mut writer = LogjetWriter::with_config(file, WriterConfig { codec: Codec::Lz4, ..WriterConfig::default() }); + let payload = encode_metrics_request(Some("metrics-service"))?; + writer.push(RecordType::Metrics, 1, 1_700_000_000_000_000_000, &payload).map_err(io::Error::other)?; + let mut file = writer.into_inner().map_err(io::Error::other)?; + file.flush()?; + Ok(()) +} + +fn write_traces_fixture(path: &Path) -> io::Result<()> { + let file = File::create(path)?; + let mut writer = LogjetWriter::with_config(file, WriterConfig { codec: Codec::Lz4, ..WriterConfig::default() }); + let payload = encode_traces_request(Some("trace-service"))?; + writer.push(RecordType::Traces, 1, 1_700_000_000_000_000_000, &payload).map_err(io::Error::other)?; + let mut file = writer.into_inner().map_err(io::Error::other)?; + file.flush()?; + Ok(()) +} + +fn write_mixed_fixture(path: &Path) -> io::Result<()> { + let file = File::create(path)?; + let mut writer = LogjetWriter::with_config(file, WriterConfig { codec: Codec::Lz4, ..WriterConfig::default() }); + + let logs_payload = encode_logs_request("mixed-log", Some("mixed-service"))?; + writer.push(RecordType::Logs, 1, 1_700_000_000_000_000_000, &logs_payload).map_err(io::Error::other)?; + + let metrics_payload = encode_metrics_request(Some("mixed-service"))?; + writer.push(RecordType::Metrics, 2, 1_700_000_000_000_000_001, &metrics_payload).map_err(io::Error::other)?; + + let traces_payload = encode_traces_request(Some("mixed-service"))?; + writer.push(RecordType::Traces, 3, 1_700_000_000_000_000_002, &traces_payload).map_err(io::Error::other)?; + + let mut file = writer.into_inner().map_err(io::Error::other)?; + file.flush()?; + Ok(()) +} + fn encode_logs_request(message: &str, service_name: Option<&str>) -> io::Result> { let resource_logs = ResourceLogs { resource: Some(opentelemetry_proto::tonic::resource::v1::Resource { @@ -213,6 +377,152 @@ fn encode_logs_request(message: &str, service_name: Option<&str>) -> io::Result< Ok(ExportLogsServiceRequest { resource_logs: vec![resource_logs] }.encode_to_vec()) } +fn encode_metrics_request(service_name: Option<&str>) -> io::Result> { + let resource = opentelemetry_proto::tonic::resource::v1::Resource { + attributes: service_name + .map(|name| { + vec![KeyValue { key: "service.name".to_string(), value: Some(AnyValue { value: Some(Value::StringValue(name.to_string())) }) }] + }) + .unwrap_or_default(), + dropped_attributes_count: 0, + entity_refs: Vec::new(), + }; + + let metrics = vec![ + Metric { + name: "cpu_usage".to_string(), + description: "CPU usage percentage".to_string(), + unit: "%".to_string(), + data: Some(MetricData::Gauge(Gauge { + data_points: vec![NumberDataPoint { + time_unix_nano: 1_700_000_000_000_000_000, + start_time_unix_nano: 0, + value: Some(opentelemetry_proto::tonic::metrics::v1::number_data_point::Value::AsDouble(42.5)), + attributes: vec![], + exemplars: vec![], + flags: 0, + }], + })), + metadata: vec![], + }, + Metric { + name: "request_count".to_string(), + description: "Total request count".to_string(), + unit: "1".to_string(), + data: Some(MetricData::Sum(opentelemetry_proto::tonic::metrics::v1::Sum { + data_points: vec![NumberDataPoint { + time_unix_nano: 1_700_000_000_000_000_000, + start_time_unix_nano: 1_600_000_000_000_000_000, + value: Some(opentelemetry_proto::tonic::metrics::v1::number_data_point::Value::AsInt(100)), + attributes: vec![], + exemplars: vec![], + flags: 0, + }], + aggregation_temporality: 2, + is_monotonic: true, + })), + metadata: vec![], + }, + Metric { + name: "latency".to_string(), + description: "Request latency".to_string(), + unit: "ms".to_string(), + data: Some(MetricData::Histogram(opentelemetry_proto::tonic::metrics::v1::Histogram { + data_points: vec![opentelemetry_proto::tonic::metrics::v1::HistogramDataPoint { + time_unix_nano: 1_700_000_000_000_000_000, + start_time_unix_nano: 1_600_000_000_000_000_000, + count: 50, + sum: Some(250.0), + bucket_counts: vec![], + explicit_bounds: vec![], + attributes: vec![], + exemplars: vec![], + flags: 0, + max: None, + min: None, + }], + aggregation_temporality: 2, + })), + metadata: vec![], + }, + ]; + + let resource_metrics = ResourceMetrics { + resource: Some(resource), + scope_metrics: vec![ScopeMetrics { + scope: None, + metrics, + schema_url: String::new(), + }], + schema_url: String::new(), + }; + + Ok(ExportMetricsServiceRequest { resource_metrics: vec![resource_metrics] }.encode_to_vec()) +} + +fn encode_traces_request(service_name: Option<&str>) -> io::Result> { + let resource = opentelemetry_proto::tonic::resource::v1::Resource { + attributes: service_name + .map(|name| { + vec![KeyValue { key: "service.name".to_string(), value: Some(AnyValue { value: Some(Value::StringValue(name.to_string())) }) }] + }) + .unwrap_or_default(), + dropped_attributes_count: 0, + entity_refs: Vec::new(), + }; + + let spans = vec![ + Span { + trace_id: vec![0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10], + span_id: vec![0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18], + parent_span_id: vec![], + trace_state: String::new(), + name: "GET /api/users".to_string(), + kind: 2, // Server + start_time_unix_nano: 1_700_000_000_000_000_000, + end_time_unix_nano: 1_700_000_000_000_001_000, + attributes: vec![], + dropped_attributes_count: 0, + events: vec![], + dropped_events_count: 0, + links: vec![], + dropped_links_count: 0, + status: Some(Status { code: 1, message: String::new() }), + flags: 0, + }, + Span { + trace_id: vec![0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10], + span_id: vec![0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27, 0x28], + parent_span_id: vec![0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18], + trace_state: String::new(), + name: "query_database".to_string(), + kind: 3, // Client + start_time_unix_nano: 1_700_000_000_000_000_100, + end_time_unix_nano: 1_700_000_000_000_000_800, + attributes: vec![], + dropped_attributes_count: 0, + events: vec![], + dropped_events_count: 0, + links: vec![], + dropped_links_count: 0, + status: Some(Status { code: 0, message: String::new() }), + flags: 0, + }, + ]; + + let resource_spans = ResourceSpans { + resource: Some(resource), + scope_spans: vec![ScopeSpans { + scope: None, + spans, + schema_url: String::new(), + }], + schema_url: String::new(), + }; + + Ok(ExportTraceServiceRequest { resource_spans: vec![resource_spans] }.encode_to_vec()) +} + fn read_parquet_rows(path: &Path) -> io::Result> { let file = File::open(path)?; let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(io::Error::other)?; @@ -220,43 +530,79 @@ fn read_parquet_rows(path: &Path) -> io::Result> { let mut rows = Vec::new(); for batch in reader { let batch = batch.map_err(io::Error::other)?; - let sequence = batch - .column_by_name("sequence") - .ok_or_else(|| io::Error::other("missing sequence column"))? - .as_any() - .downcast_ref::() - .ok_or_else(|| io::Error::other("sequence column type mismatch"))?; - let body_kind = batch - .column_by_name("body_kind") - .ok_or_else(|| io::Error::other("missing body_kind column"))? - .as_any() - .downcast_ref::() - .ok_or_else(|| io::Error::other("body_kind column type mismatch"))?; - let body_string = batch - .column_by_name("body_string") - .ok_or_else(|| io::Error::other("missing body_string column"))? - .as_any() - .downcast_ref::() - .ok_or_else(|| io::Error::other("body_string column type mismatch"))?; - let body_json = batch - .column_by_name("body_json") - .ok_or_else(|| io::Error::other("missing body_json column"))? - .as_any() - .downcast_ref::() - .ok_or_else(|| io::Error::other("body_json column type mismatch"))?; - let service_name = batch - .column_by_name("service_name") - .ok_or_else(|| io::Error::other("missing service_name column"))? - .as_any() - .downcast_ref::() - .ok_or_else(|| io::Error::other("service_name column type mismatch"))?; + + macro_rules! col { + ($name:expr, $ty:ty) => { + batch + .column_by_name($name) + .ok_or_else(|| io::Error::other(format!("missing {} column", $name)))? + .as_any() + .downcast_ref::<$ty>() + .ok_or_else(|| io::Error::other(format!("{} column type mismatch", $name)))? + }; + } + + macro_rules! opt_string { + ($col:expr, $row:expr) => { + if $col.is_null($row) { None } else { Some($col.value($row).to_string()) } + }; + } + + macro_rules! opt_u64 { + ($col:expr, $row:expr) => { + if $col.is_null($row) { None } else { Some($col.value($row)) } + }; + } + + macro_rules! opt_f64 { + ($col:expr, $row:expr) => { + if $col.is_null($row) { None } else { Some($col.value($row)) } + }; + } + + macro_rules! opt_bool { + ($col:expr, $row:expr) => { + if $col.is_null($row) { None } else { Some($col.value($row)) } + }; + } + + let sequence = col!("sequence", arrow_array::UInt64Array); + let signal_type = col!("signal_type", arrow_array::StringArray); + let body_kind = col!("body_kind", arrow_array::StringArray); + let body_string = col!("body_string", arrow_array::StringArray); + let body_json = col!("body_json", arrow_array::StringArray); + let service_name = col!("service_name", arrow_array::StringArray); + let metric_name = col!("metric_name", arrow_array::StringArray); + let metric_type = col!("metric_type", arrow_array::StringArray); + let metric_value_number = col!("metric_value_number", arrow_array::Float64Array); + let metric_value_count = col!("metric_value_count", arrow_array::UInt64Array); + let is_monotonic = col!("is_monotonic", arrow_array::BooleanArray); + let span_name = col!("span_name", arrow_array::StringArray); + let span_kind = col!("span_kind", arrow_array::StringArray); + let trace_id = col!("trace_id", arrow_array::StringArray); + let span_id = col!("span_id", arrow_array::StringArray); + let parent_span_id = col!("parent_span_id", arrow_array::StringArray); + let duration_ns = col!("duration_ns", arrow_array::UInt64Array); + for row in 0..batch.num_rows() { rows.push(ParquetRow { sequence: sequence.value(row), - body_kind: body_kind.value(row).to_string(), - body_string: (!body_string.is_null(row)).then(|| body_string.value(row).to_string()), - body_json: (!body_json.is_null(row)).then(|| body_json.value(row).to_string()), - service_name: (!service_name.is_null(row)).then(|| service_name.value(row).to_string()), + signal_type: opt_string!(signal_type, row), + body_kind: opt_string!(body_kind, row), + body_string: opt_string!(body_string, row), + body_json: opt_string!(body_json, row), + service_name: opt_string!(service_name, row), + metric_name: opt_string!(metric_name, row), + metric_type: opt_string!(metric_type, row), + metric_value_number: opt_f64!(metric_value_number, row), + metric_value_count: opt_u64!(metric_value_count, row), + is_monotonic: opt_bool!(is_monotonic, row), + span_name: opt_string!(span_name, row), + span_kind: opt_string!(span_kind, row), + trace_id: opt_string!(trace_id, row), + span_id: opt_string!(span_id, row), + parent_span_id: opt_string!(parent_span_id, row), + duration_ns: opt_u64!(duration_ns, row), }); } } @@ -332,10 +678,23 @@ struct ExpectedRow { service_name: Option, } +#[derive(Debug)] struct ParquetRow { sequence: u64, - body_kind: String, + signal_type: Option, + body_kind: Option, body_string: Option, body_json: Option, service_name: Option, + metric_name: Option, + metric_type: Option, + metric_value_number: Option, + metric_value_count: Option, + is_monotonic: Option, + span_name: Option, + span_kind: Option, + trace_id: Option, + span_id: Option, + parent_span_id: Option, + duration_ns: Option, }