diff --git a/Cargo.toml b/Cargo.toml index 42a8a5b..a27c9ce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -95,6 +95,8 @@ web-sys = { version = "0.3", features = [ "RequestInit", "Response", "ResponseInit", + "TransformStream", + "WritableStream", ] } console_error_panic_hook = "0.1.7" diff --git a/crates/cf-workers/src/backend.rs b/crates/cf-workers/src/backend.rs index 727fc08..6e2f882 100644 --- a/crates/cf-workers/src/backend.rs +++ b/crates/cf-workers/src/backend.rs @@ -29,6 +29,37 @@ use worker::Fetch; #[derive(Clone)] pub struct WorkerBackend; +/// The byte length to wrap a streamed PUT body in, or `None` to forward the raw +/// stream. Returns `None` for aws-chunked bodies (S3 sizes those from +/// `x-amz-decoded-content-length`) and for bodies with no usable +/// `Content-Length`. +fn fixed_body_length(headers: &HeaderMap) -> Option { + let aws_chunked = headers + .get(http::header::CONTENT_ENCODING) + .and_then(|v| v.to_str().ok()) + .map(|v| v.contains("aws-chunked")) + .unwrap_or(false); + if aws_chunked { + return None; + } + headers + .get(http::header::CONTENT_LENGTH) + .and_then(|v| v.to_str().ok()) + .and_then(|v| v.parse::().ok()) +} + +/// Build a Cloudflare `FixedLengthStream` of the given length (parts can exceed +/// `u32::MAX`, so fall back to the BigInt constructor). +fn new_fixed_length_stream( + len: u64, +) -> Result { + if len <= u32::MAX as u64 { + worker::worker_sys::FixedLengthStream::new(len as u32) + } else { + worker::worker_sys::FixedLengthStream::new_big_int(js_sys::BigInt::from(len)) + } +} + impl ProxyBackend for WorkerBackend { type ResponseBody = web_sys::Response; type Body = JsBody; @@ -53,10 +84,31 @@ impl ProxyBackend for WorkerBackend { init.set_cache(web_sys::RequestCache::NoStore); } - // For PUT: attach the original ReadableStream directly (zero-copy!). + // For PUT: stream the body through (zero-copy). A bare `ReadableStream` + // body makes the Workers runtime send the subrequest with + // `Transfer-Encoding: chunked` and *drop* `Content-Length` — which S3 + // rejects for a non-aws-chunked payload (it can't size the object/part), + // leaving the subrequest hung until the whole body streams through. + // Wrapping the stream in a `FixedLengthStream` makes the runtime emit a + // real `Content-Length`. aws-chunked bodies are sized by S3 from + // `x-amz-decoded-content-length` and keep their chunk framing, so those + // pass through raw. if request.method == http::Method::PUT { if let Some(stream) = js_body.stream() { - init.set_body(stream); + match fixed_body_length(&request.headers) { + Some(len) => { + let fls = new_fixed_length_stream(len).map_err(|e| { + ProxyError::Internal(format!("FixedLengthStream init failed: {e:?}")) + })?; + let transform: &web_sys::TransformStream = fls.as_ref(); + // The outbound fetch consuming `readable` pulls the body + // through the transform; the pipe is driven by that + // backpressure, so it streams rather than buffers. + let _ = stream.pipe_to(&transform.writable()); + init.set_body(&transform.readable()); + } + None => init.set_body(stream), + } } } diff --git a/crates/core/src/proxy.rs b/crates/core/src/proxy.rs index e3522d5..891e6f6 100644 --- a/crates/core/src/proxy.rs +++ b/crates/core/src/proxy.rs @@ -91,6 +91,29 @@ const SIGNED_AWS_CHUNKED_UNSUPPORTED: &str = /// [`ProxyGateway::with_user_agent`] to include your application name. pub const DEFAULT_USER_AGENT: &str = concat!("multistore/", env!("CARGO_PKG_VERSION")); +/// Headers forwarded (and signed) when streaming an `aws-chunked` upload: +/// the de-chunk headers S3 needs to reconstruct the payload. +const AWS_CHUNKED_FORWARD_HEADERS: &[&str] = &[ + "content-type", + "content-encoding", + "x-amz-decoded-content-length", + "x-amz-trailer", +]; + +/// Headers forwarded (and signed) when streaming a *plain* (non-aws-chunked) +/// `UploadPart` with `UNSIGNED-PAYLOAD`. The checksum headers are signed so S3 +/// still validates part integrity even though the payload itself is unsigned. +const PLAIN_PART_FORWARD_HEADERS: &[&str] = &[ + "content-type", + "content-md5", + "x-amz-sdk-checksum-algorithm", + "x-amz-checksum-crc32", + "x-amz-checksum-crc32c", + "x-amz-checksum-crc64nvme", + "x-amz-checksum-sha1", + "x-amz-checksum-sha256", +]; + // Re-export types that were historically defined here for backwards compatibility. pub use crate::route_handler::{ filter_response_headers, ForwardRequest, HandlerAction, PendingRequest, ProxyResult, @@ -296,6 +319,24 @@ where Ok(()) } + /// Whether this operation's body must be buffered before forwarding + /// (multipart control ops + batch delete, which parse the body to authorize + /// or re-sign it). Pure and synchronous — the classification comes straight + /// from the parsed S3 operation with no I/O — so `handle_request` can read + /// the body in the request's own I/O context ahead of any cross-request + /// await. PutObject and UploadPart stream zero-copy and are excluded; this + /// set must stay in sync with the `NeedsBody` arms of `dispatch_operation`. + fn op_needs_buffered_body(&self, req: &RequestInfo<'_>) -> bool { + let host_style = determine_host_style(req.headers, self.virtual_host_domain.as_deref()); + matches!( + request::parse_s3_request(req.method, req.path, req.query, req.headers, host_style), + Ok(S3Operation::CreateMultipartUpload { .. } + | S3Operation::CompleteMultipartUpload { .. } + | S3Operation::AbortMultipartUpload { .. } + | S3Operation::DeleteObjects { .. }) + ) + } + /// Inject a `Server-Timing` header into the response headers if enabled. fn maybe_inject_server_timing( &self, @@ -400,6 +441,33 @@ where }; } + // Buffered-body operations (multipart control ops + batch delete) must + // have their body read in THIS request's I/O context, before + // resolution's cross-request awaits (bucket lookup, STS exchange). On + // Cloudflare Workers the wasm-bindgen futures queue is shared across + // concurrent requests, so a body read deferred past an await can resume + // under a different request's I/O context and fail with "Cannot perform + // I/O on behalf of a different request". PutObject/UploadPart stream + // zero-copy and are excluded by the classifier. + let mut body = Some(body); + let mut prebuffered: Option = None; + if self.op_needs_buffered_body(req) { + match collect_body(body.take().expect("body present")).await { + Ok(bytes) => prebuffered = Some(bytes), + Err(e) => { + tracing::error!(error = %e, "failed to read request body"); + let mut r = error_response( + &ProxyError::Internal("failed to read request body".into()), + req.path, + "", + self.debug_errors, + ); + self.maybe_inject_server_timing(&mut r.headers, total_start, None, None); + return GatewayResponse::Response(r); + } + } + } + // Resolve via proxy pipeline (with metadata for after_dispatch) let dispatch_start = chrono::Utc::now(); let (action, metadata) = self.resolve_request_with_metadata(req).await; @@ -422,7 +490,8 @@ where } HandlerAction::Forward(fwd) => { let backend_start = chrono::Utc::now(); - match self.backend.forward(fwd, body).await { + let fwd_body = body.take().expect("streaming op retains its body"); + match self.backend.forward(fwd, fwd_body).await { Ok(mut resp) => { resp.headers = filter_response_headers(&resp.headers); let s = resp.status; @@ -451,8 +520,12 @@ where } HandlerAction::NeedsBody(pending) => { let backend_start = chrono::Utc::now(); - match collect_body(body).await { - Ok(bytes) => { + // The body was pre-read above: op_needs_buffered_body covers + // every NeedsBody op. A miss means the classifier drifted from + // dispatch_operation's NeedsBody arms — fail closed rather than + // re-read late (which would re-expose the cross-request hazard). + match prebuffered.take() { + Some(bytes) => { let result = self.handle_with_body(pending, bytes).await; let s = result.status; let rb = response_body_bytes(&result.body); @@ -464,11 +537,14 @@ where Some(backend_start), ) } - Err(e) => { - tracing::error!(error = %e, "failed to read request body"); + None => { + tracing::error!( + operation = ?metadata.operation, + "NeedsBody operation was not pre-buffered (classifier drift)" + ); let err_resp = error_response( - &ProxyError::Internal("failed to read request body".into()), - "", + &ProxyError::Internal("request body unavailable".into()), + req.path, &metadata.request_id, self.debug_errors, ); @@ -882,9 +958,11 @@ where .await?; Ok(HandlerAction::Response(result)) } - // UploadPart carries the part body, which modern clients also send - // as aws-chunked — same streaming re-sign / reject handling as - // PutObject. A plain part still buffers via the raw-signed path. + // UploadPart carries the part body. aws-chunked parts re-sign and + // stream; a plain part streams too, via UNSIGNED-PAYLOAD header + // signing — never buffered. Buffering a part would materialize it + // into memory and, on Workers, defer the body read past the + // resolution awaits into another request's I/O context. S3Operation::UploadPart { .. } => { Self::require_s3_backend(bucket_config)?; self.check_upload_size(original_headers)?; @@ -894,7 +972,20 @@ where { return Ok(HandlerAction::Forward(fwd)); } - Ok(HandlerAction::NeedsBody(pending())) + // Plain (non-aws-chunked) part: stream it with UNSIGNED-PAYLOAD, + // forwarding+signing the checksum headers so S3 still validates + // part integrity. Mirrors PutObject's streaming write. + let fwd = self + .build_streaming_forward( + bucket_config, + operation, + UNSIGNED_PAYLOAD, + PLAIN_PART_FORWARD_HEADERS, + original_headers, + request_id, + ) + .await?; + Ok(HandlerAction::Forward(fwd)) } // Multipart control operations carry only a small (XML or empty) // body, which is buffered and re-signed. @@ -985,6 +1076,7 @@ where config, operation, sentinel, + AWS_CHUNKED_FORWARD_HEADERS, original_headers, request_id, ) @@ -1010,42 +1102,38 @@ where } } - /// Build a header-signed streaming PUT for an `aws-chunked` - /// *unsigned-payload* upload (PutObject or UploadPart). + /// Build a header-signed streaming PUT, re-signing only the request seed + /// with the backend credentials and letting the runtime stream the body + /// through untouched. Zero-copy, no buffering. Two callers: /// - /// These can't be presigned (a presigned URL signs `UNSIGNED-PAYLOAD`, which - /// S3 won't de-chunk) and shouldn't be buffered (memory). Instead we re-sign - /// only the request seed with the backend credentials — reusing the client's - /// `STREAMING-…` `x-amz-content-sha256` and the de-chunk headers — and let - /// the runtime stream the chunk framing through untouched for S3 to - /// de-chunk. Zero-copy, no buffering. + /// - aws-chunked uploads (`payload_hash` = the client's `STREAMING-…` + /// sentinel, `forward_header_names` = the de-chunk headers): can't be + /// presigned (a presigned URL signs `UNSIGNED-PAYLOAD`, which S3 won't + /// de-chunk), so S3 reconstructs the payload from the chunk framing. + /// - plain `UploadPart` (`payload_hash` = `UNSIGNED-PAYLOAD`, + /// `forward_header_names` = the checksum headers): streams a raw part + /// instead of materializing it; the signed checksum headers let S3 still + /// validate part integrity. /// - /// Only headers that are stable through the runtime's streaming fetch are - /// signed. `Content-Length` is forwarded but left *unsigned*: the transfer - /// framing is the runtime's to manage, so signing it risks a mismatch — S3 - /// sizes the payload from `x-amz-decoded-content-length` and the chunk - /// framing regardless. + /// Only headers stable through the runtime's streaming fetch are signed. + /// `Content-Length` is forwarded but left *unsigned*: the transfer framing + /// is the runtime's to manage, so signing it risks a mismatch. async fn build_streaming_forward( &self, config: &BucketConfig, operation: &S3Operation, payload_hash: &str, + forward_header_names: &[&'static str], original_headers: &HeaderMap, request_id: &str, ) -> Result { - // Caller (`try_streaming_forward`) has already gated on an S3 backend; - // this path hardcodes S3 SigV4 seed signing (`build_backend_url` + - // `sign_s3_request`). + // Caller has already gated on an S3 backend; this path hardcodes S3 + // SigV4 seed signing (`build_backend_url` + `sign_s3_request`). let url = url::Url::parse(&build_backend_url(config, operation)?) .map_err(|e| ProxyError::Internal(format!("invalid backend URL: {e}")))?; let mut headers = HeaderMap::new(); - for name in &[ - "content-type", - "content-encoding", - "x-amz-decoded-content-length", - "x-amz-trailer", - ] { + for name in forward_header_names { if let Some(v) = original_headers.get(*name) { headers.insert(*name, v.clone()); } @@ -1068,7 +1156,11 @@ where } headers.insert(http::header::USER_AGENT, self.user_agent.parse().unwrap()); - tracing::debug!(path = url.path(), "aws-chunked write via streaming re-sign"); + tracing::debug!( + path = url.path(), + payload_hash, + "streaming write via backend re-sign" + ); Ok(ForwardRequest { method: Method::PUT, url, @@ -1987,6 +2079,84 @@ mod tests { }); } + /// A plain (non-aws-chunked) part streams through with UNSIGNED-PAYLOAD + /// header signing instead of being buffered, carrying the part query and + /// preserving the client's checksum header so S3 still validates integrity. + #[test] + fn upload_part_plain_streams_unsigned_preserving_checksum() { + run(async { + let gw = gateway(); + let mut headers = HeaderMap::new(); + headers.insert("content-length", "7".parse().unwrap()); + headers.insert("x-amz-checksum-crc32", "AAAAAA==".parse().unwrap()); + let action = gw + .resolve_request( + Method::PUT, + "/test-bucket/key.bin", + Some("partNumber=2&uploadId=xyz"), + &headers, + None, + ) + .await; + match action { + HandlerAction::Forward(fwd) => { + let q = fwd.url.query().unwrap_or(""); + assert!( + q.contains("partNumber=2") && q.contains("uploadId=xyz"), + "plain UploadPart must carry partNumber/uploadId, got {q:?}" + ); + // Streamed, not buffered: the seed is signed UNSIGNED-PAYLOAD. + assert_eq!( + fwd.headers.get("x-amz-content-sha256").unwrap(), + "UNSIGNED-PAYLOAD" + ); + // Checksum forwarded (and signed) so S3 validates the part. + assert_eq!(fwd.headers.get("x-amz-checksum-crc32").unwrap(), "AAAAAA=="); + } + other => panic!( + "expected Forward (stream, not buffer), got {:?}", + std::mem::discriminant(&other) + ), + } + }); + } + + /// The eager-collect classifier must match exactly the operations that + /// resolve to `NeedsBody` — multipart control ops and batch delete — and + /// must exclude the zero-copy streaming/read ops. + #[test] + fn op_needs_buffered_body_matches_needsbody_ops() { + let gw = gateway(); + let h = HeaderMap::new(); + let buffered = |m: &Method, path: &'static str, q: Option<&'static str>| { + gw.op_needs_buffered_body(&RequestInfo::new(m, path, q, &h, None)) + }; + + // Multipart control ops + batch delete buffer their (small) body. + assert!(buffered(&Method::POST, "/test-bucket/key", Some("uploads"))); + assert!(buffered( + &Method::POST, + "/test-bucket/key", + Some("uploadId=abc") + )); + assert!(buffered( + &Method::DELETE, + "/test-bucket/key", + Some("uploadId=abc") + )); + assert!(buffered(&Method::POST, "/test-bucket", Some("delete"))); + + // Streamed / read ops never buffer. + assert!(!buffered(&Method::PUT, "/test-bucket/key", None)); + assert!(!buffered( + &Method::PUT, + "/test-bucket/key", + Some("partNumber=1&uploadId=abc") + )); + assert!(!buffered(&Method::GET, "/test-bucket/key", None)); + assert!(!buffered(&Method::GET, "/test-bucket", None)); + } + // -- Middleware test types ----------------------------------------------- struct BlockMiddleware;