From 5ceeb393f7e9b12dddc76c18cd6a6e2ded9edb4b Mon Sep 17 00:00:00 2001 From: Bo Maryniuk Date: Sat, 16 May 2026 21:02:55 +0200 Subject: [PATCH 1/6] Refactor daemon for async HTTP --- Cargo.lock | 51 ++++- logjetd/Cargo.toml | 8 +- logjetd/src/daemon.rs | 421 +++++++++++++++--------------------------- 3 files changed, 203 insertions(+), 277 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 942f849..a879898 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -217,6 +217,28 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "aws-lc-rs" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ec2f1fc3ec205783a5da9a7e6c1509cc69dedf09a1949e412c1e18469326d00" +dependencies = [ + "aws-lc-sys", + "zeroize", +] + +[[package]] +name = "aws-lc-sys" +version = "0.41.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a2f9779ce85b93ab6170dd940ad0169b5766ff848247aff13bb788b832fe3f4" +dependencies = [ + "cc", + "cmake", + "dunce", + "fs_extra", +] + [[package]] name = "axum" version = "0.8.9" @@ -417,6 +439,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9" +[[package]] +name = "cmake" +version = "0.1.58" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0f78a02292a74a88ac736019ab962ece0bc380e3f977bf72e376c5d78ff0678" +dependencies = [ + "cc", +] + [[package]] name = "colorchoice" version = "1.0.5" @@ -679,6 +710,12 @@ dependencies = [ "litrs", ] +[[package]] +name = "dunce" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" + [[package]] name = "either" version = "1.15.0" @@ -799,6 +836,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" +[[package]] +name = "fs_extra" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" + [[package]] name = "futures-channel" version = "0.3.32" @@ -1333,6 +1376,10 @@ dependencies = [ "clap", "colored", "flate2", + "http", + "http-body-util", + "hyper", + "hyper-util", "libc", "libloading", "liblogjet", @@ -1345,8 +1392,8 @@ dependencies = [ "rustls-pemfile", "serde", "serde_yaml", - "tiny_http", "tokio", + "tokio-rustls", "tokio-stream", "tonic", ] @@ -2235,6 +2282,7 @@ version = "0.23.40" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef86cd5876211988985292b91c96a8f2d298df24e75989a43a3c73f2d4d8168b" dependencies = [ + "aws-lc-rs", "log", "once_cell", "ring", @@ -2268,6 +2316,7 @@ version = "0.103.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61c429a8649f110dddef65e2a5ad240f747e85f7758a6bccc7e5777bd33f756e" dependencies = [ + "aws-lc-rs", "ring", "rustls-pki-types", "untrusted", diff --git a/logjetd/Cargo.toml b/logjetd/Cargo.toml index 5bfbb42..2ba4ab6 100644 --- a/logjetd/Cargo.toml +++ b/logjetd/Cargo.toml @@ -8,6 +8,10 @@ license.workspace = true clap = { version = "4.5", features = ["derive", "wrap_help"] } colored = "3" flate2 = { version = "1.1", default-features = false, features = ["rust_backend"] } +http = "1.0" +http-body-util = "0.1" +hyper = { version = "1", features = ["server", "http1", "http2"] } +hyper-util = { version = "0.1", features = ["tokio"] } libc = "0.2" libloading = "0.8" liblogjet = { path = "../liblogjet" } @@ -19,9 +23,9 @@ serde = { version = "1", features = ["derive"] } serde_yaml = "0.9" rustls = { version = "0.23", default-features = false, features = ["ring", "std"] } rustls-pemfile = "2" -tiny_http = "0.12" -tokio = { version = "1", features = ["rt-multi-thread", "macros"] } +tokio = { version = "1", features = ["rt-multi-thread", "macros", "net"] } tonic = { version = "0.14", features = ["transport", "tls-ring"] } +tokio-rustls = "0.26" [dev-dependencies] rcgen = "0.12" diff --git a/logjetd/src/daemon.rs b/logjetd/src/daemon.rs index 2925dde..9275cca 100644 --- a/logjetd/src/daemon.rs +++ b/logjetd/src/daemon.rs @@ -1,5 +1,5 @@ use std::fs; -use std::io::{self, BufReader, Read, Write}; +use std::io::{self, BufReader, Read}; use std::net::SocketAddr; use std::net::{TcpListener, TcpStream}; use std::path::PathBuf; @@ -8,6 +8,12 @@ use std::sync::{Arc, Condvar, Mutex}; use std::thread; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +use http_body_util::{BodyExt, Full}; +use hyper::body::Bytes; +use hyper::server::conn::http1; +use hyper::service::service_fn; +use hyper::{Method, Request, Response as HyperResponse, StatusCode}; +use hyper_util::rt::TokioIo; use opentelemetry_proto::tonic::collector::logs::v1::{ ExportLogsServiceRequest, ExportLogsServiceResponse, logs_service_server::{LogsService, LogsServiceServer}, @@ -22,9 +28,10 @@ use opentelemetry_proto::tonic::collector::trace::v1::{ }; use prost::Message; use rustls::{ServerConfig, ServerConnection, StreamOwned}; -use tiny_http::{Method, Response, Server, StatusCode}; +use tokio::net::TcpListener as TokioTcpListener; +use tokio_rustls::TlsAcceptor; use tonic::transport::{Certificate, Identity, ServerTlsConfig}; -use tonic::{Request, Response as GrpcResponse, Status}; +use tonic::{Request as TonicRequest, Response as GrpcResponse, Status}; use crate::config::{Config, IngestLimits, IngestOverloadConfig, IngestProtocol, IngestTlsConfig, SeverityFloor}; use crate::protocol::WireRecord; @@ -264,126 +271,7 @@ fn ingest_loop( } } IngestProtocol::OtlpHttp => { - if ingest_tls.enable { - return otlp_http_tls_loop(bind_addr, ingest_tls, ingest_limits, ingest_policy, spool, next_seq, limiter); - } - let server = Server::http(&bind_addr).map_err(|err| io::Error::other(err.to_string()))?; - eprintln!("ljd ingest listening on http://{bind_addr}/v1/logs /v1/metrics /v1/traces using otlp-http max-batch-bytes={}", ingest_limits.max_batch_bytes); - - for mut request in server.incoming_requests() { - if request.method() != &Method::Post || !matches!(request.url(), "/v1/logs" | "/v1/metrics" | "/v1/traces") { - let response = Response::from_string("not found").with_status_code(StatusCode(404)); - let _ = request.respond(response); - continue; - } - let is_metrics = request.url() == "/v1/metrics"; - let is_traces = request.url() == "/v1/traces"; - - let content_encoding = request.headers().iter().find(|h| h.field.equiv("content-encoding")).map(|h| h.value.to_string()); - - let mut body = Vec::with_capacity(ingest_limits.max_batch_bytes.min(8192)); - request.as_reader().take((ingest_limits.max_batch_bytes + 1) as u64).read_to_end(&mut body)?; - if body.len() > ingest_limits.max_batch_bytes { - ingest_policy.note_oversize()?; - let response = Response::from_string("payload too large").with_status_code(StatusCode(413)); - request.respond(response).map_err(|err| io::Error::other(err.to_string()))?; - continue; - } - let body = match maybe_decompress_body(body, content_encoding.as_deref()) { - Ok(b) => b, - Err(err) => { - let response = Response::from_string(format!("decompression error: {err}")).with_status_code(StatusCode(400)); - request.respond(response).map_err(|resp_err| io::Error::other(resp_err.to_string()))?; - continue; - } - }; - if is_metrics { - match ExportMetricsServiceRequest::decode(body.as_slice()) { - Ok(batch) => { - // Metrics have no severity concept in OTLP, so they always classify as - // BatchPriority::Unknown (lowest priority). This is semantically correct: - // during overload, severity-aware shedding protects high-priority logs - // while metrics are treated as best-effort. If metrics must survive - // overload, increase ingest.max-batches-per-second or use buffer/file mode. - let decision = ingest_policy.decide(BatchPriority::Unknown)?; - if matches!(decision, IngestDecision::RejectRateLimited) { - let response = Response::from_string("rate limit exceeded").with_status_code(StatusCode(429)); - request.respond(response).map_err(|err| io::Error::other(err.to_string()))?; - continue; - } - let record = WireRecord { - record_type: logjet::RecordType::Metrics, - seq: next_seq.fetch_add(1, Ordering::Relaxed), - ts_unix_ns: extract_batch_timestamp_metrics(&batch).unwrap_or_else(unix_time_nanos), - payload: body, - }; - append_batch_record(&spool, record)?; - - let response = Response::empty(200); - request.respond(response).map_err(|err| io::Error::other(err.to_string()))?; - } - Err(err) => { - let response = Response::from_string(format!("decode error: {err}")).with_status_code(StatusCode(400)); - request.respond(response).map_err(|resp_err| io::Error::other(resp_err.to_string()))?; - } - } - } else if is_traces { - match ExportTraceServiceRequest::decode(body.as_slice()) { - Ok(batch) => { - // Traces have no severity concept in OTLP, so they always classify as - // BatchPriority::Unknown (lowest priority). This is semantically correct: - // during overload, severity-aware shedding protects high-priority logs - // while traces are treated as best-effort. If traces must survive - // overload, increase ingest.max-batches-per-second or use buffer/file mode. - let decision = ingest_policy.decide(BatchPriority::Unknown)?; - if matches!(decision, IngestDecision::RejectRateLimited) { - let response = Response::from_string("rate limit exceeded").with_status_code(StatusCode(429)); - request.respond(response).map_err(|err| io::Error::other(err.to_string()))?; - continue; - } - let record = WireRecord { - record_type: logjet::RecordType::Traces, - seq: next_seq.fetch_add(1, Ordering::Relaxed), - ts_unix_ns: extract_batch_timestamp_traces(&batch).unwrap_or_else(unix_time_nanos), - payload: body, - }; - append_batch_record(&spool, record)?; - - let response = Response::empty(200); - request.respond(response).map_err(|err| io::Error::other(err.to_string()))?; - } - Err(err) => { - let response = Response::from_string(format!("decode error: {err}")).with_status_code(StatusCode(400)); - request.respond(response).map_err(|resp_err| io::Error::other(resp_err.to_string()))?; - } - } - } else { - match ExportLogsServiceRequest::decode(body.as_slice()) { - Ok(batch) => { - let decision = ingest_policy.decide(classify_otlp_batch_priority(&batch))?; - if matches!(decision, IngestDecision::RejectRateLimited) { - let response = Response::from_string("rate limit exceeded").with_status_code(StatusCode(429)); - request.respond(response).map_err(|err| io::Error::other(err.to_string()))?; - continue; - } - let record = WireRecord { - record_type: logjet::RecordType::Logs, - seq: next_seq.fetch_add(1, Ordering::Relaxed), - ts_unix_ns: extract_batch_timestamp(&batch).unwrap_or_else(unix_time_nanos), - payload: body, - }; - append_batch_record(&spool, record)?; - - let response = Response::empty(200); - request.respond(response).map_err(|err| io::Error::other(err.to_string()))?; - } - Err(err) => { - let response = Response::from_string(format!("decode error: {err}")).with_status_code(StatusCode(400)); - request.respond(response).map_err(|resp_err| io::Error::other(resp_err.to_string()))?; - } - } - } - } + return otlp_http_async_loop(bind_addr, ingest_tls, ingest_limits, ingest_policy, spool, next_seq, limiter); } IngestProtocol::OtlpGrpc => { let addr: SocketAddr = @@ -421,140 +309,199 @@ fn ingest_loop( Ok(()) } -fn otlp_http_tls_loop( +fn otlp_http_async_loop( bind_addr: String, ingest_tls: IngestTlsConfig, ingest_limits: IngestLimits, ingest_policy: Arc, spool: Arc, next_seq: Arc, limiter: Arc, ) -> io::Result<()> { - let listener = TcpListener::bind(&bind_addr)?; - let tls_server = load_ingest_server_config(&ingest_tls)?; + let max_batch_bytes = ingest_limits.max_batch_bytes; + let max_clients = ingest_limits.max_clients; + let tls_acceptor = if ingest_tls.enable { + let server_config = load_ingest_server_config(&ingest_tls)?; + Some(TlsAcceptor::from(server_config)) + } else { + None + }; + + let runtime = tokio::runtime::Builder::new_multi_thread().enable_all().build().map_err(|err| io::Error::other(err.to_string()))?; + let scheme = if ingest_tls.enable { "https" } else { "http" }; eprintln!( - "ljd ingest listening on https://{bind_addr}/v1/logs /v1/metrics /v1/traces using otlp-http max-batch-bytes={} max-clients={}", - ingest_limits.max_batch_bytes, ingest_limits.max_clients + "ljd ingest listening on {scheme}://{bind_addr}/v1/logs /v1/metrics /v1/traces using otlp-http max-batch-bytes={max_batch_bytes} max-clients={max_clients}" ); - for stream in listener.incoming() { - let stream = stream?; - let spool = Arc::clone(&spool); - let ingest_policy = Arc::clone(&ingest_policy); - let next_seq = Arc::clone(&next_seq); - let tls_server = tls_server.clone(); - let limiter = Arc::clone(&limiter); - let max_batch_bytes = ingest_limits.max_batch_bytes; - thread::Builder::new().name("ljd-otlp-http-tls-client".to_string()).spawn(move || { - if let Err(err) = handle_otlp_http_tls_client(stream, tls_server, spool, ingest_policy, next_seq, limiter, max_batch_bytes) { - eprintln!("ljd otlp-http tls client error: {err}"); - } - })?; - } + runtime.block_on(async move { + let listener = TokioTcpListener::bind(&bind_addr).await.map_err(|err| io::Error::other(err.to_string()))?; - Ok(()) + loop { + let (stream, _peer) = match listener.accept().await { + Ok(accepted) => accepted, + Err(err) => { + eprintln!("ljd ingest accept error: {err}"); + continue; + } + }; + + let spool = Arc::clone(&spool); + let ingest_policy = Arc::clone(&ingest_policy); + let next_seq = Arc::clone(&next_seq); + let limiter = Arc::clone(&limiter); + let tls_acceptor = tls_acceptor.clone(); + + tokio::spawn(async move { + if let Err(err) = serve_otlp_http_connection(stream, tls_acceptor, spool, ingest_policy, next_seq, limiter, max_batch_bytes).await { + eprintln!("ljd ingest client error: {err}"); + } + }); + } + }) } -fn handle_otlp_http_tls_client( - stream: TcpStream, tls_server: Arc, spool: Arc, ingest_policy: Arc, next_seq: Arc, - limiter: Arc, max_batch_bytes: usize, +async fn serve_otlp_http_connection( + stream: tokio::net::TcpStream, tls_acceptor: Option, spool: Arc, ingest_policy: Arc, + next_seq: Arc, limiter: Arc, max_batch_bytes: usize, ) -> io::Result<()> { let Some(_permit) = limiter.try_acquire() else { - eprintln!("ljd ingest refused TLS client: ingest.max-clients reached"); + eprintln!("ljd ingest refused client: ingest.max-clients reached"); ingest_policy.note_client_cap()?; return Ok(()); }; - let conn = ServerConnection::new(tls_server).map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err.to_string()))?; - let mut transport = StreamOwned::new(conn, stream); - let result = handle_otlp_http_transport(&mut transport, spool, ingest_policy, next_seq, max_batch_bytes); - transport.conn.send_close_notify(); - let _ = transport.flush(); - result + + let svc = service_fn(move |req| { + handle_otlp_http_request( + req, + Arc::clone(&spool), + Arc::clone(&ingest_policy), + Arc::clone(&next_seq), + max_batch_bytes, + ) + }); + + if let Some(acceptor) = tls_acceptor { + let tls_stream = acceptor.accept(stream).await.map_err(|err| io::Error::other(err.to_string()))?; + let _ = http1::Builder::new().serve_connection(TokioIo::new(tls_stream), svc).await; + } else { + let _ = http1::Builder::new().serve_connection(TokioIo::new(stream), svc).await; + } + + Ok(()) } -fn handle_otlp_http_transport( - transport: &mut T, spool: Arc, ingest_policy: Arc, next_seq: Arc, max_batch_bytes: usize, -) -> io::Result<()> { - let request = match read_http_request(transport, max_batch_bytes) { - Ok(request) => request, - Err(err) if err.kind() == io::ErrorKind::InvalidData && err.to_string() == "payload too large" => { +async fn handle_otlp_http_request( + req: Request, spool: Arc, ingest_policy: Arc, next_seq: Arc, max_batch_bytes: usize, +) -> Result>, io::Error> +where + B: hyper::body::Body, + B::Error: std::fmt::Display, +{ + if req.method() != Method::POST || !matches!(req.uri().path(), "/v1/logs" | "/v1/metrics" | "/v1/traces") { + return Ok(HyperResponse::builder().status(StatusCode::NOT_FOUND).body(Full::new(Bytes::from("not found"))).unwrap()); + } + let is_metrics = req.uri().path() == "/v1/metrics"; + let is_traces = req.uri().path() == "/v1/traces"; + + let content_encoding = + req.headers().get("content-encoding").and_then(|v| v.to_str().ok()).map(|s| s.to_string()); + + let (parts, body) = req.into_parts(); + + // Check content-length header first for early rejection. + if let Some(len) = parts.headers.get("content-length").and_then(|v| v.to_str().ok()).and_then(|v| v.parse::().ok()) + && len > max_batch_bytes + { ingest_policy.note_oversize()?; - write_http_response(transport, 413, "payload too large")?; - return Ok(()); + return Ok(HyperResponse::builder().status(StatusCode::PAYLOAD_TOO_LARGE).body(Full::new(Bytes::from("payload too large"))).unwrap()); + } + + let collected = body.collect().await.map_err(|err| io::Error::other(format!("failed to read request body: {err}")))?; + let body_bytes = collected.to_bytes(); + + if body_bytes.len() > max_batch_bytes { + ingest_policy.note_oversize()?; + return Ok(HyperResponse::builder().status(StatusCode::PAYLOAD_TOO_LARGE).body(Full::new(Bytes::from("payload too large"))).unwrap()); + } + + let body_vec = match maybe_decompress_body(body_bytes.to_vec(), content_encoding.as_deref()) { + Ok(b) => b, + Err(err) => { + return Ok(HyperResponse::builder() + .status(StatusCode::BAD_REQUEST) + .body(Full::new(Bytes::from(format!("decompression error: {err}")))) + .unwrap()); } - Err(err) => return Err(err), }; - let is_metrics = request.path == "/v1/metrics"; - let is_traces = request.path == "/v1/traces"; - if request.method != "POST" || !matches!(request.path.as_str(), "/v1/logs" | "/v1/metrics" | "/v1/traces") { - write_http_response(transport, 404, "not found")?; - return Ok(()); - } if is_metrics { - let body = maybe_decompress_body(request.body, request.content_encoding.as_deref())?; - match ExportMetricsServiceRequest::decode(body.as_slice()) { + match ExportMetricsServiceRequest::decode(body_vec.as_slice()) { Ok(batch) => { let decision = ingest_policy.decide(BatchPriority::Unknown)?; if matches!(decision, IngestDecision::RejectRateLimited) { - write_http_response(transport, 429, "rate limit exceeded")?; - return Ok(()); + return Ok(HyperResponse::builder() + .status(StatusCode::TOO_MANY_REQUESTS) + .body(Full::new(Bytes::from("rate limit exceeded"))) + .unwrap()); } let record = WireRecord { record_type: logjet::RecordType::Metrics, seq: next_seq.fetch_add(1, Ordering::Relaxed), ts_unix_ns: extract_batch_timestamp_metrics(&batch).unwrap_or_else(unix_time_nanos), - payload: body, + payload: body_vec, }; append_batch_record(&spool, record)?; - write_http_response(transport, 200, "")?; - } - Err(err) => { - write_http_response(transport, 400, &format!("decode error: {err}"))?; + Ok(HyperResponse::builder().status(StatusCode::OK).body(Full::new(Bytes::new())).unwrap()) } + Err(err) => Ok(HyperResponse::builder() + .status(StatusCode::BAD_REQUEST) + .body(Full::new(Bytes::from(format!("decode error: {err}")))) + .unwrap()), } } else if is_traces { - let body = maybe_decompress_body(request.body, request.content_encoding.as_deref())?; - match ExportTraceServiceRequest::decode(body.as_slice()) { + match ExportTraceServiceRequest::decode(body_vec.as_slice()) { Ok(batch) => { let decision = ingest_policy.decide(BatchPriority::Unknown)?; if matches!(decision, IngestDecision::RejectRateLimited) { - write_http_response(transport, 429, "rate limit exceeded")?; - return Ok(()); + return Ok(HyperResponse::builder() + .status(StatusCode::TOO_MANY_REQUESTS) + .body(Full::new(Bytes::from("rate limit exceeded"))) + .unwrap()); } let record = WireRecord { record_type: logjet::RecordType::Traces, seq: next_seq.fetch_add(1, Ordering::Relaxed), ts_unix_ns: extract_batch_timestamp_traces(&batch).unwrap_or_else(unix_time_nanos), - payload: body, + payload: body_vec, }; append_batch_record(&spool, record)?; - write_http_response(transport, 200, "")?; - } - Err(err) => { - write_http_response(transport, 400, &format!("decode error: {err}"))?; + Ok(HyperResponse::builder().status(StatusCode::OK).body(Full::new(Bytes::new())).unwrap()) } + Err(err) => Ok(HyperResponse::builder() + .status(StatusCode::BAD_REQUEST) + .body(Full::new(Bytes::from(format!("decode error: {err}")))) + .unwrap()), } } else { - let body = maybe_decompress_body(request.body, request.content_encoding.as_deref())?; - match ExportLogsServiceRequest::decode(body.as_slice()) { + match ExportLogsServiceRequest::decode(body_vec.as_slice()) { Ok(batch) => { let decision = ingest_policy.decide(classify_otlp_batch_priority(&batch))?; if matches!(decision, IngestDecision::RejectRateLimited) { - write_http_response(transport, 429, "rate limit exceeded")?; - return Ok(()); + return Ok(HyperResponse::builder() + .status(StatusCode::TOO_MANY_REQUESTS) + .body(Full::new(Bytes::from("rate limit exceeded"))) + .unwrap()); } let record = WireRecord { record_type: logjet::RecordType::Logs, seq: next_seq.fetch_add(1, Ordering::Relaxed), ts_unix_ns: extract_batch_timestamp(&batch).unwrap_or_else(unix_time_nanos), - payload: body, + payload: body_vec, }; append_batch_record(&spool, record)?; - write_http_response(transport, 200, "")?; - } - Err(err) => { - write_http_response(transport, 400, &format!("decode error: {err}"))?; + Ok(HyperResponse::builder().status(StatusCode::OK).body(Full::new(Bytes::new())).unwrap()) } + Err(err) => Ok(HyperResponse::builder() + .status(StatusCode::BAD_REQUEST) + .body(Full::new(Bytes::from(format!("decode error: {err}")))) + .unwrap()), } } - - Ok(()) } fn append_batch_record(spool: &Arc, record: WireRecord) -> io::Result<()> { @@ -565,18 +512,10 @@ fn append_batch_record(spool: &Arc, record: WireRecord) -> io::Resu spool.notify_change() } -/// Appends a record to the spool. Exposed for the plugin ingest path. pub(crate) fn append_to_spool(spool: &Arc, record: WireRecord) -> io::Result<()> { append_batch_record(spool, record) } -struct ParsedHttpRequest { - method: String, - path: String, - body: Vec, - content_encoding: Option, -} - fn maybe_decompress_body(body: Vec, encoding: Option<&str>) -> io::Result> { match encoding { Some("gzip") | Some("x-gzip") => { @@ -590,72 +529,6 @@ fn maybe_decompress_body(body: Vec, encoding: Option<&str>) -> io::Result(transport: &mut T, max_batch_bytes: usize) -> io::Result { - const MAX_HEADER_BYTES: usize = 16 * 1024; - let mut buffer = Vec::new(); - let mut byte = [0u8; 1]; - - loop { - transport.read_exact(&mut byte)?; - buffer.push(byte[0]); - if buffer.len() > MAX_HEADER_BYTES { - return Err(io::Error::new(io::ErrorKind::InvalidData, "http header too large")); - } - if buffer.ends_with(b"\r\n\r\n") { - break; - } - } - - let header_end = buffer.len(); - let header_text = - std::str::from_utf8(&buffer[..header_end - 4]).map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "http header is not valid utf-8"))?; - let mut lines = header_text.lines(); - let request_line = lines.next().ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing http request line"))?; - let mut parts = request_line.split_whitespace(); - let method = parts.next().ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing http method"))?.to_string(); - let path = parts.next().ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing http path"))?.to_string(); - - let mut content_length = None; - let mut content_encoding = None; - for line in lines { - if let Some((name, value)) = line.split_once(':') { - let name_trimmed = name.trim(); - let value_trimmed = value.trim(); - if name_trimmed.eq_ignore_ascii_case("content-length") { - content_length = Some(value_trimmed.parse::().map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "invalid content-length"))?); - } else if name_trimmed.eq_ignore_ascii_case("content-encoding") { - content_encoding = Some(value_trimmed.to_string()); - } - } - } - - let content_length = content_length.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing content-length"))?; - if content_length > max_batch_bytes { - return Err(io::Error::new(io::ErrorKind::InvalidData, "payload too large")); - } - let mut body = Vec::with_capacity(content_length); - transport.take(content_length as u64).read_to_end(&mut body)?; - if body.len() != content_length { - return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "short http body")); - } - - Ok(ParsedHttpRequest { method, path, body, content_encoding }) -} - -fn write_http_response(transport: &mut T, status: u16, body: &str) -> io::Result<()> { - let status_text = match status { - 200 => "OK", - 400 => "Bad Request", - 413 => "Payload Too Large", - 429 => "Too Many Requests", - 404 => "Not Found", - 503 => "Service Unavailable", - _ => "Error", - }; - write!(transport, "HTTP/1.1 {} {}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", status, status_text, body.len(), body)?; - transport.flush() -} - fn build_grpc_server_tls_config(ingest_tls: &IngestTlsConfig) -> io::Result { let cert_file = ingest_tls .cert_file @@ -963,7 +836,7 @@ struct OtlpGrpcTracesService { #[tonic::async_trait] impl TraceService for OtlpGrpcTracesService { - async fn export(&self, request: Request) -> Result, Status> { + async fn export(&self, request: TonicRequest) -> Result, Status> { let batch = request.into_inner(); // Traces have no severity concept in OTLP, so they always classify as // BatchPriority::Unknown (lowest priority). See HTTP ingest comment above @@ -997,7 +870,7 @@ struct OtlpGrpcMetricsService { #[tonic::async_trait] impl MetricsService for OtlpGrpcMetricsService { - async fn export(&self, request: Request) -> Result, Status> { + async fn export(&self, request: TonicRequest) -> Result, Status> { let batch = request.into_inner(); // Metrics have no severity concept in OTLP, so they always classify as // BatchPriority::Unknown (lowest priority). See HTTP ingest comment above @@ -1031,7 +904,7 @@ struct OtlpGrpcLogsService { #[tonic::async_trait] impl LogsService for OtlpGrpcLogsService { - async fn export(&self, request: Request) -> Result, Status> { + async fn export(&self, request: TonicRequest) -> Result, Status> { let batch = request.into_inner(); match self.ingest_policy.decide(classify_otlp_batch_priority(&batch)).map_err(|err| Status::internal(err.to_string()))? { IngestDecision::Accept | IngestDecision::AcceptPriorityBypass => {} From fa4d2a2653851ae557898173a85481ccef96525e Mon Sep 17 00:00:00 2001 From: Bo Maryniuk Date: Sat, 16 May 2026 21:03:01 +0200 Subject: [PATCH 2/6] Add UT --- logjetd/tests/unit/daemon_utst.rs | 406 +++++++++++++++++++++++------- 1 file changed, 321 insertions(+), 85 deletions(-) diff --git a/logjetd/tests/unit/daemon_utst.rs b/logjetd/tests/unit/daemon_utst.rs index cfb18e8..dabcd46 100644 --- a/logjetd/tests/unit/daemon_utst.rs +++ b/logjetd/tests/unit/daemon_utst.rs @@ -1,8 +1,11 @@ use super::{ BatchPriority, ConnectionLimiter, IngestDecision, SharedIngestPolicy, classify_otlp_batch_priority, extract_batch_timestamp_metrics, - extract_batch_timestamp_traces, maybe_decompress_body, read_http_request, write_http_response, + extract_batch_timestamp_traces, handle_otlp_http_request, maybe_decompress_body, }; use crate::config::{IngestOverloadConfig, SeverityFloor}; +use http_body_util::Full; +use hyper::body::Bytes; +use hyper::{Method, Request, StatusCode}; use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; @@ -12,25 +15,327 @@ use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value as DataPoi use opentelemetry_proto::tonic::metrics::v1::{Gauge, Metric, NumberDataPoint, ResourceMetrics, ScopeMetrics}; use opentelemetry_proto::tonic::resource::v1::Resource; use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, ScopeSpans, Span}; -use std::io::{Cursor, Write}; +use prost::Message; +use std::io::Write; use std::sync::Arc; +use std::sync::atomic::AtomicU64; -#[test] -fn read_http_request_parses_valid_request() { - let bytes = b"POST /v1/logs HTTP/1.1\r\nHost: example\r\nContent-Length: 3\r\n\r\nabc"; - let mut cursor = Cursor::new(bytes.as_slice()); - let request = read_http_request(&mut cursor, 1024).unwrap(); - assert_eq!(request.method, "POST"); - assert_eq!(request.path, "/v1/logs"); - assert_eq!(request.body, b"abc"); +#[tokio::test] +async fn handle_otlp_http_request_logs_accepts_valid_batch() { + let spool = crate::spool::Spool::open(crate::config::StorageConfig::Buffer(crate::config::BufferConfig { + limit: crate::config::BufferLimit::Bytes(1024 * 1024), + keep_messages: 0, + })) + .unwrap(); + let shared_spool = Arc::new(super::SharedSpool::new(spool)); + let policy = Arc::new(SharedIngestPolicy::new(IngestOverloadConfig { + max_batches_per_second: 0, + priority_severity_floor: SeverityFloor::Error, + report_every_ms: 0, + })); + let next_seq = Arc::new(AtomicU64::new(1)); + + let batch = ExportLogsServiceRequest { + resource_logs: vec![ResourceLogs { + resource: Some(Resource { attributes: vec![], dropped_attributes_count: 0, entity_refs: vec![] }), + scope_logs: vec![ScopeLogs { + scope: Some(InstrumentationScope { + name: "test".to_string(), + version: String::new(), + attributes: vec![], + dropped_attributes_count: 0, + }), + log_records: vec![LogRecord { + severity_number: 13, + severity_text: "WARN".to_string(), + body: Some(AnyValue { + value: Some(opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue("warn".to_string())), + }), + ..Default::default() + }], + schema_url: String::new(), + }], + schema_url: String::new(), + }], + }; + let body = batch.encode_to_vec(); + let req = Request::builder() + .method(Method::POST) + .uri("/v1/logs") + .header("content-length", body.len().to_string()) + .body(Full::new(Bytes::from(body))) + .unwrap(); + + let response = handle_otlp_http_request(req, shared_spool, policy, next_seq, 1024 * 1024).await.unwrap(); + assert_eq!(response.status(), StatusCode::OK); } -#[test] -fn read_http_request_parses_content_encoding() { - let bytes = b"POST /v1/logs HTTP/1.1\r\nHost: example\r\nContent-Length: 3\r\nContent-Encoding: gzip\r\n\r\nabc"; - let mut cursor = Cursor::new(bytes.as_slice()); - let request = read_http_request(&mut cursor, 1024).unwrap(); - assert_eq!(request.content_encoding, Some("gzip".to_string())); +#[tokio::test] +async fn handle_otlp_http_request_metrics_accepts_valid_batch() { + let spool = crate::spool::Spool::open(crate::config::StorageConfig::Buffer(crate::config::BufferConfig { + limit: crate::config::BufferLimit::Bytes(1024 * 1024), + keep_messages: 0, + })) + .unwrap(); + let shared_spool = Arc::new(super::SharedSpool::new(spool)); + let policy = Arc::new(SharedIngestPolicy::new(IngestOverloadConfig { + max_batches_per_second: 0, + priority_severity_floor: SeverityFloor::Error, + report_every_ms: 0, + })); + let next_seq = Arc::new(AtomicU64::new(1)); + + let batch = ExportMetricsServiceRequest { + resource_metrics: vec![ResourceMetrics { + resource: Some(Resource { attributes: vec![], dropped_attributes_count: 0, entity_refs: vec![] }), + scope_metrics: vec![ScopeMetrics { + scope: Some(InstrumentationScope { + name: "test".to_string(), + version: String::new(), + attributes: vec![], + dropped_attributes_count: 0, + }), + metrics: vec![Metric { + name: "cpu".to_string(), + description: String::new(), + unit: "%".to_string(), + data: Some(opentelemetry_proto::tonic::metrics::v1::metric::Data::Gauge(Gauge { + data_points: vec![NumberDataPoint { + attributes: vec![], + start_time_unix_nano: 0, + time_unix_nano: 1_700_000_000_000_000_000, + value: Some(DataPointValue::AsDouble(42.0)), + flags: 0, + exemplars: vec![], + }], + })), + metadata: vec![], + }], + schema_url: String::new(), + }], + schema_url: String::new(), + }], + }; + let body = batch.encode_to_vec(); + let req = Request::builder() + .method(Method::POST) + .uri("/v1/metrics") + .header("content-length", body.len().to_string()) + .body(Full::new(Bytes::from(body))) + .unwrap(); + + let response = handle_otlp_http_request(req, shared_spool, policy, next_seq, 1024 * 1024).await.unwrap(); + assert_eq!(response.status(), StatusCode::OK); +} + +#[tokio::test] +async fn handle_otlp_http_request_traces_accepts_valid_batch() { + let spool = crate::spool::Spool::open(crate::config::StorageConfig::Buffer(crate::config::BufferConfig { + limit: crate::config::BufferLimit::Bytes(1024 * 1024), + keep_messages: 0, + })) + .unwrap(); + let shared_spool = Arc::new(super::SharedSpool::new(spool)); + let policy = Arc::new(SharedIngestPolicy::new(IngestOverloadConfig { + max_batches_per_second: 0, + priority_severity_floor: SeverityFloor::Error, + report_every_ms: 0, + })); + let next_seq = Arc::new(AtomicU64::new(1)); + + let batch = ExportTraceServiceRequest { + resource_spans: vec![ResourceSpans { + resource: Some(Resource { attributes: vec![], dropped_attributes_count: 0, entity_refs: vec![] }), + scope_spans: vec![ScopeSpans { + scope: Some(InstrumentationScope { + name: "test".to_string(), + version: String::new(), + attributes: vec![], + dropped_attributes_count: 0, + }), + spans: vec![Span { + trace_id: vec![1, 2, 3, 4], + span_id: vec![5, 6, 7, 8], + parent_span_id: vec![], + name: "test-span".to_string(), + kind: 1, + start_time_unix_nano: 1_700_000_000_000_000_000, + end_time_unix_nano: 1_700_000_000_000_000_001, + attributes: vec![], + dropped_attributes_count: 0, + events: vec![], + dropped_events_count: 0, + links: vec![], + dropped_links_count: 0, + status: None, + flags: 0, + trace_state: String::new(), + }], + schema_url: String::new(), + }], + schema_url: String::new(), + }], + }; + let body = batch.encode_to_vec(); + let req = Request::builder() + .method(Method::POST) + .uri("/v1/traces") + .header("content-length", body.len().to_string()) + .body(Full::new(Bytes::from(body))) + .unwrap(); + + let response = handle_otlp_http_request(req, shared_spool, policy, next_seq, 1024 * 1024).await.unwrap(); + assert_eq!(response.status(), StatusCode::OK); +} + +#[tokio::test] +async fn handle_otlp_http_request_rejects_non_post() { + let spool = crate::spool::Spool::open(crate::config::StorageConfig::Buffer(crate::config::BufferConfig { + limit: crate::config::BufferLimit::Bytes(1024), + keep_messages: 0, + })) + .unwrap(); + let shared_spool = Arc::new(super::SharedSpool::new(spool)); + let policy = Arc::new(SharedIngestPolicy::new(IngestOverloadConfig { + max_batches_per_second: 0, + priority_severity_floor: SeverityFloor::Error, + report_every_ms: 0, + })); + let next_seq = Arc::new(AtomicU64::new(1)); + + let req = Request::builder().method(Method::GET).uri("/v1/logs").body(Full::new(Bytes::new())).unwrap(); + let response = handle_otlp_http_request(req, shared_spool, policy, next_seq, 1024).await.unwrap(); + assert_eq!(response.status(), StatusCode::NOT_FOUND); +} + +#[tokio::test] +async fn handle_otlp_http_request_unknown_path_returns_404() { + let spool = crate::spool::Spool::open(crate::config::StorageConfig::Buffer(crate::config::BufferConfig { + limit: crate::config::BufferLimit::Bytes(1024), + keep_messages: 0, + })) + .unwrap(); + let shared_spool = Arc::new(super::SharedSpool::new(spool)); + let policy = Arc::new(SharedIngestPolicy::new(IngestOverloadConfig { + max_batches_per_second: 0, + priority_severity_floor: SeverityFloor::Error, + report_every_ms: 0, + })); + let next_seq = Arc::new(AtomicU64::new(1)); + + let req = Request::builder() + .method(Method::POST) + .uri("/v1/unknown") + .body(Full::new(Bytes::new())) + .unwrap(); + let response = handle_otlp_http_request(req, shared_spool, policy, next_seq, 1024).await.unwrap(); + assert_eq!(response.status(), StatusCode::NOT_FOUND); +} + +#[tokio::test] +async fn handle_otlp_http_request_rejects_body_over_limit() { + let spool = crate::spool::Spool::open(crate::config::StorageConfig::Buffer(crate::config::BufferConfig { + limit: crate::config::BufferLimit::Bytes(1024), + keep_messages: 0, + })) + .unwrap(); + let shared_spool = Arc::new(super::SharedSpool::new(spool)); + let policy = Arc::new(SharedIngestPolicy::new(IngestOverloadConfig { + max_batches_per_second: 0, + priority_severity_floor: SeverityFloor::Error, + report_every_ms: 0, + })); + let next_seq = Arc::new(AtomicU64::new(1)); + + let body = vec![0u8; 100]; + let req = Request::builder() + .method(Method::POST) + .uri("/v1/logs") + .header("content-length", body.len().to_string()) + .body(Full::new(Bytes::from(body))) + .unwrap(); + + let response = handle_otlp_http_request(req, shared_spool, policy, next_seq, 50).await.unwrap(); + assert_eq!(response.status(), StatusCode::PAYLOAD_TOO_LARGE); +} + +#[tokio::test] +async fn handle_otlp_http_request_rejects_invalid_protobuf() { + let spool = crate::spool::Spool::open(crate::config::StorageConfig::Buffer(crate::config::BufferConfig { + limit: crate::config::BufferLimit::Bytes(1024), + keep_messages: 0, + })) + .unwrap(); + let shared_spool = Arc::new(super::SharedSpool::new(spool)); + let policy = Arc::new(SharedIngestPolicy::new(IngestOverloadConfig { + max_batches_per_second: 0, + priority_severity_floor: SeverityFloor::Error, + report_every_ms: 0, + })); + let next_seq = Arc::new(AtomicU64::new(1)); + + let body = b"not valid protobuf"; + let req = Request::builder() + .method(Method::POST) + .uri("/v1/logs") + .header("content-length", body.len().to_string()) + .body(Full::new(Bytes::from(body.to_vec()))) + .unwrap(); + + let response = handle_otlp_http_request(req, shared_spool, policy, next_seq, 1024).await.unwrap(); + assert_eq!(response.status(), StatusCode::BAD_REQUEST); +} + +#[tokio::test] +async fn handle_otlp_http_request_rate_limits_when_overloaded() { + let spool = crate::spool::Spool::open(crate::config::StorageConfig::Buffer(crate::config::BufferConfig { + limit: crate::config::BufferLimit::Bytes(1024 * 1024), + keep_messages: 0, + })) + .unwrap(); + let shared_spool = Arc::new(super::SharedSpool::new(spool)); + let policy = Arc::new(SharedIngestPolicy::new(IngestOverloadConfig { + max_batches_per_second: 1, + priority_severity_floor: SeverityFloor::Error, + report_every_ms: 0, + })); + let next_seq = Arc::new(AtomicU64::new(1)); + + let batch = ExportMetricsServiceRequest { + resource_metrics: vec![ResourceMetrics { + resource: Some(Resource { attributes: vec![], dropped_attributes_count: 0, entity_refs: vec![] }), + scope_metrics: vec![ScopeMetrics { + scope: Some(InstrumentationScope { + name: "test".to_string(), + version: String::new(), + attributes: vec![], + dropped_attributes_count: 0, + }), + metrics: vec![], + schema_url: String::new(), + }], + schema_url: String::new(), + }], + }; + let body = batch.encode_to_vec(); + + let req1 = Request::builder() + .method(Method::POST) + .uri("/v1/metrics") + .header("content-length", body.len().to_string()) + .body(Full::new(Bytes::from(body.clone()))) + .unwrap(); + let response1 = handle_otlp_http_request(req1, Arc::clone(&shared_spool), Arc::clone(&policy), Arc::clone(&next_seq), 1024 * 1024).await.unwrap(); + assert_eq!(response1.status(), StatusCode::OK); + + let req2 = Request::builder() + .method(Method::POST) + .uri("/v1/metrics") + .header("content-length", body.len().to_string()) + .body(Full::new(Bytes::from(body))) + .unwrap(); + let response2 = handle_otlp_http_request(req2, shared_spool, policy, next_seq, 1024 * 1024).await.unwrap(); + assert_eq!(response2.status(), StatusCode::TOO_MANY_REQUESTS); } #[test] @@ -66,75 +371,6 @@ fn maybe_decompress_body_rejects_invalid_gzip() { assert_eq!(err.kind(), std::io::ErrorKind::InvalidData); } -#[test] -fn read_http_request_rejects_missing_content_length() { - let bytes = b"POST /v1/logs HTTP/1.1\r\nHost: example\r\n\r\nabc"; - let mut cursor = Cursor::new(bytes.as_slice()); - let err = read_http_request(&mut cursor, 1024).err().unwrap(); - assert_eq!(err.kind(), std::io::ErrorKind::InvalidData); -} - -#[test] -fn read_http_request_rejects_short_body() { - let bytes = b"POST /v1/logs HTTP/1.1\r\nHost: example\r\nContent-Length: 5\r\n\r\nabc"; - let mut cursor = Cursor::new(bytes.as_slice()); - let err = read_http_request(&mut cursor, 1024).err().unwrap(); - assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof); -} - -#[test] -fn read_http_request_rejects_invalid_request_line() { - let bytes = b"POST\r\nHost: example\r\nContent-Length: 0\r\n\r\n"; - let mut cursor = Cursor::new(bytes.as_slice()); - let err = read_http_request(&mut cursor, 1024).err().unwrap(); - assert_eq!(err.kind(), std::io::ErrorKind::InvalidData); -} - -#[test] -fn read_http_request_rejects_payloads_over_limit() { - let bytes = b"POST /v1/logs HTTP/1.1\r\nHost: example\r\nContent-Length: 6\r\n\r\nabcdef"; - let mut cursor = Cursor::new(bytes.as_slice()); - let err = read_http_request(&mut cursor, 5).err().unwrap(); - assert_eq!(err.kind(), std::io::ErrorKind::InvalidData); - assert_eq!(err.to_string(), "payload too large"); -} - -#[test] -fn write_http_response_writes_status_line() { - let mut bytes = Vec::new(); - write_http_response(&mut bytes, 404, "not found").unwrap(); - let response = String::from_utf8(bytes).unwrap(); - assert!(response.starts_with("HTTP/1.1 404 Not Found\r\n")); - assert!(response.ends_with("not found")); -} - -#[test] -fn write_http_response_supports_payload_too_large_status() { - let mut bytes = Vec::new(); - write_http_response(&mut bytes, 413, "payload too large").unwrap(); - let response = String::from_utf8(bytes).unwrap(); - assert!(response.starts_with("HTTP/1.1 413 Payload Too Large\r\n")); - assert!(response.ends_with("payload too large")); -} - -#[test] -fn write_http_response_supports_service_unavailable_status() { - let mut bytes = Vec::new(); - write_http_response(&mut bytes, 503, "busy").unwrap(); - let response = String::from_utf8(bytes).unwrap(); - assert!(response.starts_with("HTTP/1.1 503 Service Unavailable\r\n")); - assert!(response.ends_with("busy")); -} - -#[test] -fn write_http_response_supports_too_many_requests_status() { - let mut bytes = Vec::new(); - write_http_response(&mut bytes, 429, "rate limit exceeded").unwrap(); - let response = String::from_utf8(bytes).unwrap(); - assert!(response.starts_with("HTTP/1.1 429 Too Many Requests\r\n")); - assert!(response.ends_with("rate limit exceeded")); -} - #[test] fn connection_limiter_rejects_when_max_clients_is_reached() { let limiter = Arc::new(ConnectionLimiter::new(1)); From 5322c25628fbdba0bd02a880c4e964a9002cc9d2 Mon Sep 17 00:00:00 2001 From: Bo Maryniuk Date: Sat, 16 May 2026 21:14:22 +0200 Subject: [PATCH 3/6] Install tls ring --- logjetd/src/main.rs | 2 ++ logjetd/tests/bridge_flows.rs | 4 +++- logjetd/tests/common/mod.rs | 10 +++++++++- 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/logjetd/src/main.rs b/logjetd/src/main.rs index 2258ae9..ed4a0bb 100644 --- a/logjetd/src/main.rs +++ b/logjetd/src/main.rs @@ -27,6 +27,8 @@ fn main() -> ExitCode { } fn run() -> Result<(), Box> { + rustls::crypto::ring::default_provider().install_default().map_err(|_| "failed to install default Rustls crypto provider")?; + let mut command = cli::build_cli(); let mut matches = command.get_matches_mut(); let cli = Cli::from_arg_matches_mut(&mut matches)?; diff --git a/logjetd/tests/bridge_flows.rs b/logjetd/tests/bridge_flows.rs index 250a725..e8fca39 100644 --- a/logjetd/tests/bridge_flows.rs +++ b/logjetd/tests/bridge_flows.rs @@ -7,7 +7,7 @@ use std::thread; use std::time::Duration; use common::{ - ChildGuard, MockCollector, MockGrpcCollector, ReservedPort, TestDir, connect_replay_client, free_port, ljd_command, post_otlp_http, + ChildGuard, MockCollector, MockGrpcCollector, ReservedPort, TestDir, connect_replay_client, ensure_rustls_provider, free_port, ljd_command, post_otlp_http, read_replay_message, replay_messages, reserve_port, wait_for_tcp, wait_until, write_fake_grpc_tls_files, }; @@ -209,6 +209,7 @@ fn bridge_keep_fans_out_to_http_and_grpc() -> io::Result<()> { #[test] fn bridge_keep_forwards_backlog_over_grpcs() -> io::Result<()> { + ensure_rustls_provider(); let dir = TestDir::new("bridge-keep-grpcs")?; let ingest_port = free_port()?; let replay_port = free_port()?; @@ -256,6 +257,7 @@ fn bridge_keep_forwards_backlog_over_grpcs() -> io::Result<()> { #[test] fn bridge_keep_forwards_backlog_over_grpcs_mtls() -> io::Result<()> { + ensure_rustls_provider(); let dir = TestDir::new("bridge-keep-grpcs-mtls")?; let ingest_port = free_port()?; let replay_port = free_port()?; diff --git a/logjetd/tests/common/mod.rs b/logjetd/tests/common/mod.rs index e7675cf..96384ff 100644 --- a/logjetd/tests/common/mod.rs +++ b/logjetd/tests/common/mod.rs @@ -3,7 +3,7 @@ use std::io::{self, Read, Write}; use std::net::{Shutdown, TcpListener, TcpStream}; use std::path::{Path, PathBuf}; use std::process::{Child, Command, Stdio}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, OnceLock}; use std::thread; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; @@ -15,7 +15,15 @@ use opentelemetry_proto::tonic::common::v1::{AnyValue, InstrumentationScope, Key use opentelemetry_proto::tonic::logs::v1::{LogRecord, ResourceLogs, ScopeLogs, SeverityNumber}; use opentelemetry_proto::tonic::resource::v1::Resource; use prost::Message; +use rustls; use rcgen::{BasicConstraints, Certificate, CertificateParams, DistinguishedName, DnType, ExtendedKeyUsagePurpose, IsCa, SanType}; + +pub fn ensure_rustls_provider() { + static INIT: OnceLock<()> = OnceLock::new(); + INIT.get_or_init(|| { + rustls::crypto::ring::default_provider().install_default().expect("install rustls ring provider"); + }); +} use tokio::net::TcpListener as TokioTcpListener; use tokio::runtime::Builder; use tokio_stream::wrappers::TcpListenerStream; From a0daf39c9a9a173089277faeadbbf88dc7991be1 Mon Sep 17 00:00:00 2001 From: Bo Maryniuk Date: Sat, 16 May 2026 21:14:35 +0200 Subject: [PATCH 4/6] Fix UT for tls --- logjetd/tests/unit/tls_utst.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/logjetd/tests/unit/tls_utst.rs b/logjetd/tests/unit/tls_utst.rs index b8d52d0..2d67568 100644 --- a/logjetd/tests/unit/tls_utst.rs +++ b/logjetd/tests/unit/tls_utst.rs @@ -1,6 +1,14 @@ use super::{authority_host, load_client_config, load_ingest_server_config, load_server_config, parse_server_name}; use crate::config::{IngestTlsConfig, TlsConfig}; use std::path::PathBuf; +use std::sync::OnceLock; + +fn ensure_rustls_provider() { + static INIT: OnceLock<()> = OnceLock::new(); + INIT.get_or_init(|| { + rustls::crypto::ring::default_provider().install_default().expect("install rustls ring provider"); + }); +} fn demo_cert_path(name: &str) -> PathBuf { PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("..").join("demo").join("remote-drain-tls").join("certs").join(name) @@ -56,6 +64,7 @@ fn load_server_config_requires_cert_and_key() { #[test] fn load_server_config_requires_ca_when_client_certs_required() { + ensure_rustls_provider(); let tls = TlsConfig { enable: true, ca_file: None, From 5d66678d53d1935175786cdd83ce086e90db38fd Mon Sep 17 00:00:00 2001 From: Bo Maryniuk Date: Sat, 16 May 2026 21:14:48 +0200 Subject: [PATCH 5/6] Update docs --- doc/configuration.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/doc/configuration.md b/doc/configuration.md index 568c44c..362c651 100644 --- a/doc/configuration.md +++ b/doc/configuration.md @@ -329,8 +329,7 @@ Important: - default is `32` - must be greater than zero -- applies directly to thread-per-client ingest paths and current gRPC concurrency handling -- plain non-TLS `otlp-http` ingest is already serial in the current implementation +- applies to all ingest protocols (wire, otlp-http, otlp-grpc, plugin) ### `ingest.max-batches-per-second` From a9ab9c9568892e44e35ae4afc465efa74fc045b9 Mon Sep 17 00:00:00 2001 From: Bo Maryniuk Date: Sat, 16 May 2026 21:15:16 +0200 Subject: [PATCH 6/6] Lintfix --- logjetd/tests/common/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/logjetd/tests/common/mod.rs b/logjetd/tests/common/mod.rs index 96384ff..3a06927 100644 --- a/logjetd/tests/common/mod.rs +++ b/logjetd/tests/common/mod.rs @@ -15,7 +15,6 @@ use opentelemetry_proto::tonic::common::v1::{AnyValue, InstrumentationScope, Key use opentelemetry_proto::tonic::logs::v1::{LogRecord, ResourceLogs, ScopeLogs, SeverityNumber}; use opentelemetry_proto::tonic::resource::v1::Resource; use prost::Message; -use rustls; use rcgen::{BasicConstraints, Certificate, CertificateParams, DistinguishedName, DnType, ExtendedKeyUsagePurpose, IsCa, SanType}; pub fn ensure_rustls_provider() {