From a9e24677525c4e08dcf8aeec99b1ff177c81e293 Mon Sep 17 00:00:00 2001 From: Anthony Lukach Date: Tue, 23 Jun 2026 14:30:25 -0700 Subject: [PATCH 01/21] feat(core): batch delete, copy rejection, wider PUT headers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extends data-edit support beyond the existing single-object write/delete and multipart paths: - DeleteObjects (`POST /{bucket}?delete`): new `S3Operation::DeleteObjects`, parsed from the `delete` subresource. The keys live in the body, so it goes through the `NeedsBody` path. `api/delete.rs` owns the pure XML parse/build/ merge; `proxy.rs::execute_delete_objects` authorizes each key individually (S3 partial-result semantics — denied keys become per-key `AccessDenied` entries and never reach the backend) and forwards the rest with the required `Content-MD5` (new `md-5` dep). S3-gated like multipart. - Authorization: `authorize` does a coarse bucket-level check for DeleteObjects (it carries no key); new `auth::key_authorized` does the per-key prefix check. `PendingRequest` now carries the resolved identity so the body handler can run it. Anonymous callers still cannot write. - CopyObject/UploadPartCopy: a `PUT` with `x-amz-copy-source` is now rejected with `501 NotImplemented` (new `ProxyError::NotImplemented`) instead of being mis-parsed as an empty PutObject that would overwrite the destination. - PutObject: forward standard HTTP entity headers (Content-Disposition, -Encoding, -Language, Cache-Control, Expires) alongside type/length/md5. `x-amz-*` write headers stay dropped — S3 rejects unsigned `x-amz-*` on presigned URLs, so those need a header-signing path (deferred). Co-Authored-By: Claude Opus 4.8 --- Cargo.lock | 1 + Cargo.toml | 1 + crates/core/Cargo.toml | 1 + crates/core/src/api/delete.rs | 253 +++++++++++++++++++ crates/core/src/api/mod.rs | 1 + crates/core/src/api/request.rs | 90 ++++++- crates/core/src/auth/authorize.rs | 120 ++++++++- crates/core/src/auth/mod.rs | 2 +- crates/core/src/backend/multipart.rs | 9 + crates/core/src/error.rs | 8 + crates/core/src/proxy.rs | 360 ++++++++++++++++++++++++++- crates/core/src/route_handler.rs | 6 +- crates/core/src/types.rs | 19 +- 13 files changed, 854 insertions(+), 17 deletions(-) create mode 100644 crates/core/src/api/delete.rs diff --git a/Cargo.lock b/Cargo.lock index a8e8ac5..d5cf6e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1132,6 +1132,7 @@ dependencies = [ "hmac", "http", "matchit 0.8.4", + "md-5", "object_store", "percent-encoding", "quick-xml 0.37.5", diff --git a/Cargo.toml b/Cargo.toml index 859b6a2..a7910cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,6 +49,7 @@ matchit = "0.8" # Crypto hmac = "0.12" sha2 = { version = "0.10", features = ["oid"] } +md-5 = "0.10" rsa = "0.9" aes-gcm = "0.10" diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 6cca716..5eea88e 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -25,6 +25,7 @@ url.workspace = true percent-encoding.workspace = true hmac.workspace = true sha2.workspace = true +md-5.workspace = true quick-xml.workspace = true tracing.workspace = true object_store.workspace = true diff --git a/crates/core/src/api/delete.rs b/crates/core/src/api/delete.rs new file mode 100644 index 0000000..d704583 --- /dev/null +++ b/crates/core/src/api/delete.rs @@ -0,0 +1,253 @@ +//! Batch delete (`DeleteObjects`) request/response handling. +//! +//! S3's batch delete (`POST /{bucket}?delete`) carries the keys to delete in an +//! XML request body and returns a per-key result. This module owns the pure +//! parsing and serialization: reading the inbound `` body, building the +//! body forwarded to the backend, and merging the backend's `` +//! with the proxy's own per-key authorization decisions. +//! +//! Per-key authorization (via [`auth::key_authorized`](crate::auth::key_authorized)) +//! and backend forwarding live in the gateway; everything here is runtime- and +//! I/O-free so it can be exercised directly in unit tests. + +use crate::error::ProxyError; +use serde::{Deserialize, Serialize}; + +/// Parsed inbound `` request body. +#[derive(Debug, Deserialize)] +#[serde(rename = "Delete")] +pub struct DeleteRequest { + /// When true, successful deletions are omitted from the response — only + /// errors are reported. + #[serde(default, rename = "Quiet")] + pub quiet: bool, + /// The objects to delete. + #[serde(default, rename = "Object")] + pub objects: Vec, +} + +/// A single `` entry in a batch-delete request. +#[derive(Debug, Deserialize)] +pub struct DeleteObjectEntry { + /// The (client-facing) object key to delete. + #[serde(rename = "Key")] + pub key: String, +} + +impl DeleteRequest { + /// Parse a batch-delete request body. + /// + /// Returns an error if the body is not well-formed XML or names no objects + /// (S3 rejects an empty delete with `MalformedXML`). + pub fn parse(body: &[u8]) -> Result { + let req: DeleteRequest = quick_xml::de::from_reader(body) + .map_err(|e| ProxyError::InvalidRequest(format!("malformed delete body: {e}")))?; + if req.objects.is_empty() { + return Err(ProxyError::InvalidRequest( + "delete request names no objects".into(), + )); + } + Ok(req) + } + + /// The client-facing keys named in the request, in order. + pub fn keys(&self) -> impl Iterator { + self.objects.iter().map(|o| o.key.as_str()) + } +} + +/// Build the `` XML body forwarded to the backend. +/// +/// `backend_keys` are the keys already mapped into the backend's key space +/// (i.e. with any `backend_prefix` applied). `Quiet` is always `false` so the +/// backend reports each deletion explicitly, letting the proxy map results back +/// to client keys before applying the client's own quiet preference. +pub fn build_backend_delete_body(backend_keys: &[String]) -> String { + #[derive(Serialize)] + #[serde(rename = "Delete")] + struct Body<'a> { + #[serde(rename = "Quiet")] + quiet: bool, + #[serde(rename = "Object")] + objects: Vec>, + } + #[derive(Serialize)] + struct Obj<'a> { + #[serde(rename = "Key")] + key: &'a str, + } + let body = Body { + quiet: false, + objects: backend_keys.iter().map(|k| Obj { key: k }).collect(), + }; + format!( + "\n{}", + quick_xml::se::to_string(&body).unwrap_or_default() + ) +} + +/// A per-key error in a ``. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct DeleteError { + /// The object key the error applies to. + #[serde(rename = "Key")] + pub key: String, + /// S3 error code (e.g. `AccessDenied`). + #[serde(rename = "Code")] + pub code: String, + /// Human-readable message. + #[serde(rename = "Message")] + pub message: String, +} + +/// The backend's keys that were deleted and any per-key errors it reported. +#[derive(Debug, Default)] +pub struct BackendOutcome { + /// Backend keys reported as deleted. + pub deleted: Vec, + /// Per-key errors reported by the backend (backend key space). + pub errors: Vec, +} + +/// Parse a backend `` response. +/// +/// Tolerates the extra elements S3 includes (`VersionId`, `DeleteMarker`, …) and +/// returns only what the proxy needs to rebuild the client response. +pub fn parse_backend_result(xml: &[u8]) -> Result { + #[derive(Deserialize)] + #[serde(rename = "DeleteResult")] + struct Result { + #[serde(default, rename = "Deleted")] + deleted: Vec, + #[serde(default, rename = "Error")] + errors: Vec, + } + #[derive(Deserialize)] + struct Deleted { + #[serde(rename = "Key")] + key: String, + } + let parsed: Result = quick_xml::de::from_reader(xml) + .map_err(|e| ProxyError::BackendError(format!("malformed delete result: {e}")))?; + Ok(BackendOutcome { + deleted: parsed.deleted.into_iter().map(|d| d.key).collect(), + errors: parsed.errors, + }) +} + +/// Serialize a client-facing ``. +/// +/// `deleted` and `errors` are in client key space. In `quiet` mode the +/// `` entries are omitted (S3 semantics); errors are always reported. +pub fn build_delete_result(deleted: &[String], errors: &[DeleteError], quiet: bool) -> String { + #[derive(Serialize)] + #[serde(rename = "DeleteResult")] + struct Result<'a> { + #[serde(rename = "Deleted")] + deleted: Vec>, + #[serde(rename = "Error")] + errors: &'a [DeleteError], + } + #[derive(Serialize)] + struct Deleted<'a> { + #[serde(rename = "Key")] + key: &'a str, + } + let deleted = if quiet { + Vec::new() + } else { + deleted.iter().map(|k| Deleted { key: k }).collect() + }; + let result = Result { deleted, errors }; + format!( + "\n{}", + quick_xml::se::to_string(&result).unwrap_or_default() + ) +} + +#[cfg(test)] +mod tests { + use super::*; + + const SAMPLE: &[u8] = br#" + + a.txt + nested/b.txt + "#; + + #[test] + fn parses_keys_and_quiet_default_false() { + let req = DeleteRequest::parse(SAMPLE).unwrap(); + assert!(!req.quiet); + let keys: Vec<_> = req.keys().collect(); + assert_eq!(keys, vec!["a.txt", "nested/b.txt"]); + } + + #[test] + fn parses_quiet_flag() { + let body = br#"truek"#; + let req = DeleteRequest::parse(body).unwrap(); + assert!(req.quiet); + } + + #[test] + fn empty_delete_is_rejected() { + let body = br#""#; + assert!(DeleteRequest::parse(body).is_err()); + } + + #[test] + fn malformed_body_is_rejected() { + assert!(DeleteRequest::parse(b"not xml").is_err()); + } + + #[test] + fn backend_body_lists_each_key_non_quiet() { + let body = build_backend_delete_body(&["p/a.txt".into(), "p/b.txt".into()]); + assert!(body.contains("false")); + assert!(body.contains("p/a.txt")); + assert!(body.contains("p/b.txt")); + } + + #[test] + fn parses_backend_result_with_extra_elements() { + let xml = br#" + + p/a.txttruev1 + p/c.txtInternalErroroops + "#; + let out = parse_backend_result(xml).unwrap(); + assert_eq!(out.deleted, vec!["p/a.txt"]); + assert_eq!(out.errors.len(), 1); + assert_eq!(out.errors[0].key, "p/c.txt"); + assert_eq!(out.errors[0].code, "InternalError"); + } + + #[test] + fn build_result_omits_deleted_when_quiet() { + let deleted = vec!["a.txt".to_string()]; + let errors = vec![DeleteError { + key: "secret/x".into(), + code: "AccessDenied".into(), + message: "denied".into(), + }]; + let verbose = build_delete_result(&deleted, &errors, false); + assert!(verbose.contains("a.txt")); + assert!(verbose.contains("secret/x")); + assert!(verbose.contains("AccessDenied")); + + let quiet = build_delete_result(&deleted, &errors, true); + assert!(!quiet.contains("")); + // Errors are always reported, even in quiet mode. + assert!(quiet.contains("AccessDenied")); + } + + #[test] + fn keys_are_xml_escaped() { + // A key with XML-significant characters must be escaped in both the + // backend body and the result. + let body = build_backend_delete_body(&["a&b.txt".into()]); + assert!(body.contains("a&b<c>.txt")); + assert!(!body.contains("a&b.txt")); + } +} diff --git a/crates/core/src/api/mod.rs b/crates/core/src/api/mod.rs index 7e1a287..2462d9a 100644 --- a/crates/core/src/api/mod.rs +++ b/crates/core/src/api/mod.rs @@ -1,5 +1,6 @@ //! S3 API parsing and response serialization. +pub mod delete; pub mod list; pub mod list_rewrite; pub mod request; diff --git a/crates/core/src/api/request.rs b/crates/core/src/api/request.rs index d26dd0a..df41a38 100644 --- a/crates/core/src/api/request.rs +++ b/crates/core/src/api/request.rs @@ -12,9 +12,21 @@ pub fn parse_s3_request( method: &Method, uri_path: &str, query: Option<&str>, - _headers: &http::HeaderMap, + headers: &http::HeaderMap, host_style: HostStyle, ) -> Result { + // Server-side copy (CopyObject / UploadPartCopy) arrives as a PUT carrying + // `x-amz-copy-source`. It is not supported: forwarding such a request via a + // presigned URL would drop the copy-source header and silently overwrite the + // destination with an empty body. Reject it explicitly so the failure is + // unambiguous rather than corrupting data. See + // .plans/2026-06-23-data-edit-operations-design.md for the deferred design. + if *method == Method::PUT && headers.contains_key("x-amz-copy-source") { + return Err(ProxyError::NotImplemented( + "server-side copy (x-amz-copy-source) is not supported".into(), + )); + } + // GET / with path-style → ListBuckets (no bucket in path) if matches!(host_style, HostStyle::Path) && uri_path.trim_start_matches('/').is_empty() { if *method == Method::GET { @@ -54,6 +66,7 @@ pub fn build_s3_operation( .map(|(_, v)| v.clone()); let has_uploads = query_params.iter().any(|(k, _)| k == "uploads"); + let has_delete = query_params.iter().any(|(k, _)| k == "delete"); match *method { Method::GET => { @@ -85,6 +98,14 @@ pub fn build_s3_operation( part_number, }) } else { + // `x-amz-copy-source` (CopyObject / UploadPartCopy) is rejected + // upstream in `parse_s3_request`. Callers that invoke + // `build_s3_operation` directly (custom resolvers) are + // responsible for their own copy-source handling. + // ponytail: deferred — trailer checksums (`x-amz-checksum-*`) + // sent on writes are dropped, not forwarded; they need the + // header-signing forward path. See + // .plans/2026-06-23-data-edit-operations-design.md. Ok(S3Operation::PutObject { bucket, key }) } } @@ -97,6 +118,11 @@ pub fn build_s3_operation( key, upload_id, }) + } else if has_delete { + // Batch delete: `POST /{bucket}?delete` with an XML body listing + // keys. The keys (and per-key authorization) are handled once the + // body is materialized. + Ok(S3Operation::DeleteObjects { bucket }) } else { Err(ProxyError::InvalidRequest( "unsupported POST operation".into(), @@ -111,6 +137,10 @@ pub fn build_s3_operation( upload_id, }) } else if !key.is_empty() { + // ponytail: deferred — versioned delete (`?versionId=`) and MFA + // delete are not handled; the version is ignored and the current + // object is deleted. Upgrade path: thread version-id through the + // forward. See .plans/2026-06-23-data-edit-operations-design.md. Ok(S3Operation::DeleteObject { bucket, key }) } else { Err(ProxyError::InvalidRequest( @@ -154,3 +184,61 @@ fn parse_query_params(query: Option<&str>) -> Vec<(String, String)> { }) .unwrap_or_default() } + +#[cfg(test)] +mod tests { + use super::*; + + fn parse( + method: Method, + path: &str, + query: Option<&str>, + headers: &http::HeaderMap, + ) -> Result { + parse_s3_request(&method, path, query, headers, HostStyle::Path) + } + + #[test] + fn batch_delete_parses_as_delete_objects() { + let op = parse( + Method::POST, + "/my-bucket", + Some("delete"), + &http::HeaderMap::new(), + ) + .unwrap(); + assert!( + matches!(op, S3Operation::DeleteObjects { ref bucket } if bucket == "my-bucket"), + "POST ?delete should parse as DeleteObjects, got {op:?}" + ); + } + + #[test] + fn post_without_known_subresource_is_rejected() { + let err = parse( + Method::POST, + "/my-bucket/key", + None, + &http::HeaderMap::new(), + ) + .unwrap_err(); + assert!(matches!(err, ProxyError::InvalidRequest(_))); + } + + #[test] + fn copy_source_put_is_rejected_not_implemented() { + let mut headers = http::HeaderMap::new(); + headers.insert("x-amz-copy-source", "/src-bucket/src-key".parse().unwrap()); + let err = parse(Method::PUT, "/dst-bucket/dst-key", None, &headers).unwrap_err(); + assert!( + matches!(err, ProxyError::NotImplemented(_)), + "copy-source PUT must be rejected as NotImplemented, got {err:?}" + ); + } + + #[test] + fn plain_put_still_parses_as_put_object() { + let op = parse(Method::PUT, "/b/k.txt", None, &http::HeaderMap::new()).unwrap(); + assert!(matches!(op, S3Operation::PutObject { .. })); + } +} diff --git a/crates/core/src/auth/authorize.rs b/crates/core/src/auth/authorize.rs index 07134bd..99e3ec2 100644 --- a/crates/core/src/auth/authorize.rs +++ b/crates/core/src/auth/authorize.rs @@ -60,6 +60,12 @@ pub fn authorize( _ => operation.key().to_string(), }; + // Batch delete carries no key here — the keys live in the request body. This + // is only a coarse check that *some* scope grants DeleteObject on the bucket; + // each key is authorized individually (against its prefix) once the body is + // parsed. See [`key_authorized`]. + let ignore_prefix = matches!(operation, S3Operation::DeleteObjects { .. }); + // Check if any scope grants access let authorized = scopes.iter().any(|scope| { if scope.bucket != bucket { @@ -69,8 +75,8 @@ pub fn authorize( return false; } // Check prefix restrictions - if scope.prefixes.is_empty() { - return true; // Full bucket access + if ignore_prefix || scope.prefixes.is_empty() { + return true; // Full bucket access (or deferred per-key check) } scope .prefixes @@ -92,6 +98,31 @@ pub fn authorize( } } +/// Check whether `identity` may perform `action` on a single `key` in `bucket`. +/// +/// Used for per-key authorization of batch operations such as +/// [`DeleteObjects`](crate::types::S3Operation::DeleteObjects), where the coarse +/// [`authorize`] check only verified that *some* scope grants the action on the +/// bucket. Anonymous identities are never authorized here — this is only used +/// for write actions, which anonymous callers can never perform. +pub fn key_authorized( + identity: &ResolvedIdentity, + bucket: &str, + action: Action, + key: &str, +) -> bool { + let scopes = match identity { + ResolvedIdentity::Anonymous => return false, + ResolvedIdentity::Authenticated(id) => &id.allowed_scopes, + }; + scopes.iter().any(|scope| { + scope.bucket == bucket + && scope.actions.contains(&action) + && (scope.prefixes.is_empty() + || scope.prefixes.iter().any(|p| key_matches_prefix(key, p))) + }) +} + #[cfg(test)] mod tests { use super::*; @@ -121,4 +152,89 @@ mod tests { assert!(!key_matches_prefix("other/file.txt", "data/")); assert!(!key_matches_prefix("other/file.txt", "data")); } + + use crate::types::{AccessScope, AuthenticatedIdentity, BucketConfig}; + + fn identity_with(scope: AccessScope) -> ResolvedIdentity { + ResolvedIdentity::Authenticated(AuthenticatedIdentity { + principal_name: "tester".into(), + allowed_scopes: vec![scope], + }) + } + + fn bucket(name: &str, anonymous: bool) -> BucketConfig { + BucketConfig { + name: name.into(), + backend_type: "s3".into(), + backend_prefix: None, + anonymous_access: anonymous, + allowed_roles: vec![], + backend_options: Default::default(), + } + } + + #[test] + fn key_authorized_enforces_prefix_per_key() { + let id = identity_with(AccessScope { + bucket: "b".into(), + prefixes: vec!["data/".into()], + actions: vec![Action::DeleteObject], + }); + assert!(key_authorized(&id, "b", Action::DeleteObject, "data/x.txt")); + assert!(!key_authorized( + &id, + "b", + Action::DeleteObject, + "other/x.txt" + )); + // Wrong bucket / wrong action are denied. + assert!(!key_authorized( + &id, + "other", + Action::DeleteObject, + "data/x.txt" + )); + assert!(!key_authorized(&id, "b", Action::PutObject, "data/x.txt")); + } + + #[test] + fn key_authorized_denies_anonymous() { + assert!(!key_authorized( + &ResolvedIdentity::Anonymous, + "b", + Action::DeleteObject, + "anything" + )); + } + + #[test] + fn delete_objects_coarse_authz_ignores_prefix() { + // A prefix-scoped caller passes the coarse batch-delete check even though + // the operation carries no key; per-key enforcement happens later. + let id = identity_with(AccessScope { + bucket: "b".into(), + prefixes: vec!["data/".into()], + actions: vec![Action::DeleteObject], + }); + let op = S3Operation::DeleteObjects { bucket: "b".into() }; + assert!(authorize(&id, &op, &bucket("b", false)).is_ok()); + } + + #[test] + fn delete_objects_denied_without_delete_action() { + let id = identity_with(AccessScope { + bucket: "b".into(), + prefixes: vec![], + actions: vec![Action::GetObject], + }); + let op = S3Operation::DeleteObjects { bucket: "b".into() }; + assert!(authorize(&id, &op, &bucket("b", false)).is_err()); + } + + #[test] + fn delete_objects_denied_for_anonymous() { + let op = S3Operation::DeleteObjects { bucket: "b".into() }; + // Even on an anonymous-readable bucket, batch delete is a write. + assert!(authorize(&ResolvedIdentity::Anonymous, &op, &bucket("b", true)).is_err()); + } } diff --git a/crates/core/src/auth/mod.rs b/crates/core/src/auth/mod.rs index 496c0e0..6561acd 100644 --- a/crates/core/src/auth/mod.rs +++ b/crates/core/src/auth/mod.rs @@ -9,7 +9,7 @@ mod authorize; pub mod identity; pub mod sigv4; -pub use authorize::authorize; +pub use authorize::{authorize, key_authorized}; pub use identity::resolve_identity; pub use sigv4::{parse_sigv4_auth, verify_sigv4_signature, SigV4Auth}; diff --git a/crates/core/src/backend/multipart.rs b/crates/core/src/backend/multipart.rs index 11a13c2..1ddd869 100644 --- a/crates/core/src/backend/multipart.rs +++ b/crates/core/src/backend/multipart.rs @@ -22,6 +22,15 @@ pub fn build_backend_url( let bucket = config.option("bucket_name").unwrap_or(""); let bucket_is_empty = bucket.is_empty(); + // Batch delete targets the bucket, not a key: `{base}[/{bucket}]?delete`. + if matches!(operation, S3Operation::DeleteObjects { .. }) { + return Ok(if bucket_is_empty { + format!("{base}?delete") + } else { + format!("{base}/{bucket}?delete") + }); + } + let mut key = String::new(); if let Some(prefix) = &config.backend_prefix { key.push_str(prefix.trim_end_matches('/')); diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs index 1c5bbcf..ecddae4 100644 --- a/crates/core/src/error.rs +++ b/crates/core/src/error.rs @@ -25,6 +25,12 @@ pub enum ProxyError { #[error("invalid request: {0}")] InvalidRequest(String), + /// The requested S3 operation is recognized but not supported by the proxy + /// (e.g. server-side copy via `x-amz-copy-source`). Maps to S3's + /// `501 NotImplemented`. + #[error("not implemented: {0}")] + NotImplemented(String), + /// The request contains no authentication credentials. #[error("missing authentication")] MissingAuth, @@ -82,6 +88,7 @@ impl ProxyError { Self::AccessDenied => "AccessDenied", Self::SignatureDoesNotMatch => "SignatureDoesNotMatch", Self::InvalidRequest(_) => "InvalidRequest", + Self::NotImplemented(_) => "NotImplemented", Self::MissingAuth => "AccessDenied", Self::ExpiredCredentials => "ExpiredToken", Self::InvalidOidcToken(_) => "InvalidIdentityToken", @@ -102,6 +109,7 @@ impl ProxyError { Self::AccessDenied | Self::MissingAuth | Self::ExpiredCredentials => 403, Self::SignatureDoesNotMatch => 403, Self::InvalidRequest(_) => 400, + Self::NotImplemented(_) => 501, Self::InvalidOidcToken(_) => 400, Self::RoleNotFound(_) => 403, Self::PreconditionFailed => 412, diff --git a/crates/core/src/proxy.rs b/crates/core/src/proxy.rs index a7da8c0..02270ef 100644 --- a/crates/core/src/proxy.rs +++ b/crates/core/src/proxy.rs @@ -65,7 +65,7 @@ use crate::middleware::{ use crate::registry::{BucketRegistry, CredentialRegistry}; use crate::route_handler::{ProxyResponseBody, RequestInfo}; use crate::router::Router; -use crate::types::{BucketConfig, ResolvedIdentity, S3Operation}; +use crate::types::{Action, BackendType, BucketConfig, ResolvedIdentity, S3Operation}; use bytes::Bytes; use http::{HeaderMap, Method}; use object_store::list::PaginatedListOptions; @@ -650,18 +650,23 @@ where ) } - /// Phase 2: Complete a multipart operation with the request body. + /// Phase 2: Complete a body-bearing operation with the materialized body. /// - /// Called by the runtime after materializing the body for a `NeedsBody` action. - /// Middleware is not re-run here — it already executed during phase 1 - /// when the `NeedsBody` action was produced. + /// Called by the runtime after materializing the body for a `NeedsBody` + /// action — multipart operations and batch delete. Middleware is not re-run + /// here — it already executed during phase 1 when the `NeedsBody` action was + /// produced. pub async fn handle_with_body(&self, pending: PendingRequest, body: Bytes) -> ProxyResult { - match self.execute_multipart(&pending, body).await { + let result = match &pending.operation { + S3Operation::DeleteObjects { .. } => self.execute_delete_objects(&pending, body).await, + _ => self.execute_multipart(&pending, body).await, + }; + match result { Ok(result) => { tracing::info!( request_id = %pending.request_id, status = result.status, - "multipart request completed" + "body request completed" ); result } @@ -671,7 +676,7 @@ where error = %err, status = err.status_code(), s3_code = %err.s3_error_code(), - "multipart request failed" + "body request failed" ); error_response( &err, @@ -765,7 +770,24 @@ where bucket_config, key, original_headers, - &["content-type", "content-length", "content-md5"], + // Standard HTTP entity headers are safe to forward to a + // presigned URL: S3 applies them even though they are not + // part of the (host-only) presigned signature. `x-amz-*` + // write headers (metadata, SSE, tagging, storage-class, + // checksums) are deliberately NOT forwarded here — S3 + // rejects unsigned `x-amz-*` headers on presigned + // requests, so they need the header-signing path. See + // .plans/2026-06-23-data-edit-operations-design.md. + &[ + "content-type", + "content-length", + "content-md5", + "content-disposition", + "content-encoding", + "content-language", + "cache-control", + "expires", + ], request_id, ) .await?; @@ -813,6 +835,24 @@ where bucket_config: bucket_config.clone(), original_headers: original_headers.clone(), request_id: request_id.to_string(), + identity: ctx.identity.clone(), + })) + } + // Batch delete needs the body to read the key list and authorize + // each key individually. + S3Operation::DeleteObjects { .. } => { + if bucket_config.parsed_backend_type() != Some(BackendType::S3) { + return Err(ProxyError::NotImplemented(format!( + "batch delete not supported for '{}' backends", + bucket_config.backend_type + ))); + } + Ok(HandlerAction::NeedsBody(PendingRequest { + operation: operation.clone(), + bucket_config: bucket_config.clone(), + original_headers: original_headers.clone(), + request_id: request_id.to_string(), + identity: ctx.identity.clone(), })) } _ => Err(ProxyError::Internal("unexpected operation".into())), @@ -1028,6 +1068,113 @@ where body: ProxyResponseBody::from_bytes(raw_resp.body), }) } + + /// Execute a batch delete (`DeleteObjects`) via raw signed HTTP. + /// + /// Each key in the request body is authorized individually against the + /// caller's scopes (the earlier [`authorize`](crate::auth::authorize) check + /// only verified the caller may delete *something* in the bucket). Keys the + /// caller is not allowed to delete are reported as per-key `AccessDenied` + /// errors (S3's partial-result semantics) rather than failing the whole + /// request; the remaining keys are forwarded to the backend. + async fn execute_delete_objects( + &self, + pending: &PendingRequest, + body: Bytes, + ) -> Result { + use crate::api::delete; + + let config = &pending.bucket_config; + let bucket = pending.operation.bucket().unwrap_or_default(); + + let request = delete::DeleteRequest::parse(&body)?; + let quiet = request.quiet; + + // Partition keys by per-key authorization. + let mut allowed_client: Vec = Vec::new(); + let mut allowed_backend: Vec = Vec::new(); + let mut errors: Vec = Vec::new(); + for key in request.keys() { + if crate::auth::key_authorized(&pending.identity, bucket, Action::DeleteObject, key) { + allowed_client.push(key.to_string()); + allowed_backend.push(apply_backend_prefix(config, key)); + } else { + errors.push(delete::DeleteError { + key: key.to_string(), + code: "AccessDenied".into(), + message: "Access Denied".into(), + }); + } + } + + let mut deleted_client: Vec = Vec::new(); + + if !allowed_backend.is_empty() { + let backend_body = Bytes::from(delete::build_backend_delete_body(&allowed_backend)); + let backend_url = build_backend_url(config, &pending.operation)?; + + let mut headers = HeaderMap::new(); + headers.insert("content-type", "application/xml".parse().unwrap()); + // S3 requires a Content-MD5 (or trailing checksum) on DeleteObjects. + headers.insert( + "content-md5", + content_md5(&backend_body) + .parse() + .map_err(|_| ProxyError::Internal("invalid content-md5 header".into()))?, + ); + headers.insert(http::header::USER_AGENT, self.user_agent.parse().unwrap()); + + let payload_hash = hash_payload(&backend_body); + sign_s3_request( + &Method::POST, + &backend_url, + &mut headers, + config, + &payload_hash, + )?; + + let raw_resp = self + .backend + .send_raw(Method::POST, backend_url, headers, backend_body) + .await?; + + tracing::debug!(status = raw_resp.status, "batch delete backend response"); + + if raw_resp.status >= 300 { + return Err(ProxyError::BackendError(format!( + "backend rejected batch delete with status {}", + raw_resp.status + ))); + } + + match delete::parse_backend_result(&raw_resp.body) { + Ok(outcome) => { + for k in outcome.deleted { + deleted_client.push(strip_backend_prefix(config, &k)); + } + for mut e in outcome.errors { + e.key = strip_backend_prefix(config, &e.key); + errors.push(e); + } + } + Err(e) => { + // The backend returned 2xx but an unparseable body. Treat the + // forwarded keys as deleted rather than failing the request. + tracing::warn!(error = %e, "could not parse backend delete result; assuming success"); + deleted_client.extend(allowed_client.iter().cloned()); + } + } + } + + let xml = delete::build_delete_result(&deleted_client, &errors, quiet); + let mut resp_headers = HeaderMap::new(); + resp_headers.insert("content-type", "application/xml".parse().unwrap()); + Ok(ProxyResult { + status: 200, + headers: resp_headers, + body: ProxyResponseBody::from_bytes(Bytes::from(xml)), + }) + } } impl Dispatch for ProxyGateway @@ -1087,6 +1234,46 @@ fn build_object_path(config: &BucketConfig, key: &str) -> object_store::path::Pa } } +/// Map a client-visible key into the backend key space by prepending +/// `backend_prefix` (the string counterpart of [`build_object_path`]). +fn apply_backend_prefix(config: &BucketConfig, key: &str) -> String { + match &config.backend_prefix { + Some(prefix) => { + let p = prefix.trim_end_matches('/'); + if p.is_empty() { + key.to_string() + } else { + format!("{p}/{key}") + } + } + None => key.to_string(), + } +} + +/// Strip `backend_prefix` from a backend key to recover the client-visible key. +fn strip_backend_prefix(config: &BucketConfig, key: &str) -> String { + match &config.backend_prefix { + Some(prefix) => { + let p = prefix.trim_end_matches('/'); + if p.is_empty() { + key.to_string() + } else { + key.strip_prefix(&format!("{p}/")) + .unwrap_or(key) + .to_string() + } + } + None => key.to_string(), + } +} + +/// Compute the base64-encoded MD5 of `body` for the `Content-MD5` header. +fn content_md5(body: &[u8]) -> String { + use base64::Engine; + use md5::{Digest, Md5}; + base64::engine::general_purpose::STANDARD.encode(Md5::digest(body)) +} + #[cfg(test)] mod tests { use super::*; @@ -1581,4 +1768,159 @@ mod tests { ); }); } + + // -- Batch delete (DeleteObjects) ----------------------------------------- + + /// Backend that captures the forwarded delete body and returns a canned + /// `DeleteResult` marking `allowed/a.txt` deleted. + #[derive(Clone)] + struct DeleteMockBackend { + captured: Arc>>, + } + + impl ProxyBackend for DeleteMockBackend { + type ResponseBody = (); + type Body = (); + + async fn forward( + &self, + _request: ForwardRequest, + _body: (), + ) -> Result, ProxyError> { + unimplemented!() + } + + fn create_paginated_store( + &self, + _config: &BucketConfig, + ) -> Result, ProxyError> { + unimplemented!() + } + + fn create_signer(&self, config: &BucketConfig) -> Result, ProxyError> { + crate::backend::build_signer(config) + } + + async fn send_raw( + &self, + _method: http::Method, + _url: String, + _headers: HeaderMap, + body: Bytes, + ) -> Result { + *self.captured.lock().unwrap() = Some(body); + Ok(RawResponse { + status: 200, + headers: HeaderMap::new(), + body: Bytes::from_static( + b"allowed/a.txt", + ), + }) + } + } + + #[test] + fn batch_delete_filters_unauthorized_keys_per_key() { + use crate::types::{AccessScope, AuthenticatedIdentity}; + run(async { + let captured = Arc::new(std::sync::Mutex::new(None)); + let backend = DeleteMockBackend { + captured: captured.clone(), + }; + let gw = ProxyGateway::new(backend, MockRegistry, MockCreds, None); + + let identity = ResolvedIdentity::Authenticated(AuthenticatedIdentity { + principal_name: "tester".into(), + allowed_scopes: vec![AccessScope { + bucket: "test-bucket".into(), + prefixes: vec!["allowed/".into()], + actions: vec![Action::DeleteObject], + }], + }); + + let pending = PendingRequest { + operation: S3Operation::DeleteObjects { + bucket: "test-bucket".into(), + }, + bucket_config: test_bucket_config("test-bucket"), + original_headers: HeaderMap::new(), + request_id: "rid".into(), + identity, + }; + + let body = Bytes::from_static( + br#"allowed/a.txtdenied/b.txt"#, + ); + + let result = gw.handle_with_body(pending, body).await; + assert_eq!(result.status, 200); + + let xml = match result.body { + ProxyResponseBody::Bytes(b) => String::from_utf8(b.to_vec()).unwrap(), + ProxyResponseBody::Empty => panic!("expected a body"), + }; + // Authorized key deleted; unauthorized key reported as AccessDenied. + assert!( + xml.contains("allowed/a.txt"), + "{xml}" + ); + assert!(xml.contains("denied/b.txt"), "{xml}"); + assert!(xml.contains("AccessDenied"), "{xml}"); + + // The denied key must never be forwarded to the backend. + let sent = captured + .lock() + .unwrap() + .clone() + .expect("backend was called"); + let sent = String::from_utf8(sent.to_vec()).unwrap(); + assert!(sent.contains("allowed/a.txt"), "forwarded body: {sent}"); + assert!( + !sent.contains("denied/b.txt"), + "denied key leaked to backend: {sent}" + ); + }); + } + + #[test] + fn batch_delete_all_denied_skips_backend() { + use crate::types::{AccessScope, AuthenticatedIdentity}; + run(async { + let captured = Arc::new(std::sync::Mutex::new(None)); + let backend = DeleteMockBackend { + captured: captured.clone(), + }; + let gw = ProxyGateway::new(backend, MockRegistry, MockCreds, None); + + // Scope grants only a different prefix → every requested key is denied. + let identity = ResolvedIdentity::Authenticated(AuthenticatedIdentity { + principal_name: "tester".into(), + allowed_scopes: vec![AccessScope { + bucket: "test-bucket".into(), + prefixes: vec!["other/".into()], + actions: vec![Action::DeleteObject], + }], + }); + + let pending = PendingRequest { + operation: S3Operation::DeleteObjects { + bucket: "test-bucket".into(), + }, + bucket_config: test_bucket_config("test-bucket"), + original_headers: HeaderMap::new(), + request_id: "rid".into(), + identity, + }; + + let body = + Bytes::from_static(br#"secret/a.txt"#); + let result = gw.handle_with_body(pending, body).await; + assert_eq!(result.status, 200); + // Backend must not be contacted when nothing is authorized. + assert!( + captured.lock().unwrap().is_none(), + "backend should be skipped" + ); + }); + } } diff --git a/crates/core/src/route_handler.rs b/crates/core/src/route_handler.rs index d18d5b9..a7e9e62 100644 --- a/crates/core/src/route_handler.rs +++ b/crates/core/src/route_handler.rs @@ -97,12 +97,16 @@ impl ProxyResult { } } -/// Opaque state for a multipart operation that needs the request body. +/// Opaque state for an operation that needs the request body before it can be +/// completed (multipart operations and batch delete). pub struct PendingRequest { pub(crate) operation: crate::types::S3Operation, pub(crate) bucket_config: crate::types::BucketConfig, pub(crate) original_headers: HeaderMap, pub(crate) request_id: String, + /// The resolved caller identity, needed for per-key authorization of batch + /// operations (e.g. `DeleteObjects`). Unused by multipart operations. + pub(crate) identity: crate::types::ResolvedIdentity, } /// Response headers that must NOT be forwarded to clients. diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index 64a33e6..b70f307 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -346,6 +346,12 @@ pub enum S3Operation { bucket: String, key: String, }, + /// Batch delete (`POST /{bucket}?delete`). The keys to delete live in the + /// request body, so this operation carries only the bucket — the body is + /// parsed and each key authorized individually once it arrives. + DeleteObjects { + bucket: String, + }, ListBucket { bucket: String, /// Raw query string from the incoming request, forwarded to the backend. @@ -370,7 +376,8 @@ impl S3Operation { http::Method::DELETE } S3Operation::CreateMultipartUpload { .. } - | S3Operation::CompleteMultipartUpload { .. } => http::Method::POST, + | S3Operation::CompleteMultipartUpload { .. } + | S3Operation::DeleteObjects { .. } => http::Method::POST, } } @@ -386,6 +393,9 @@ impl S3Operation { S3Operation::CompleteMultipartUpload { .. } => Action::CompleteMultipartUpload, S3Operation::AbortMultipartUpload { .. } => Action::AbortMultipartUpload, S3Operation::DeleteObject { .. } => Action::DeleteObject, + // Batch delete authorizes as DeleteObject; each key in the body is + // checked individually against the caller's scopes. + S3Operation::DeleteObjects { .. } => Action::DeleteObject, S3Operation::ListBuckets => Action::ListBucket, } } @@ -401,7 +411,8 @@ impl S3Operation { | S3Operation::UploadPart { bucket, .. } | S3Operation::CompleteMultipartUpload { bucket, .. } | S3Operation::AbortMultipartUpload { bucket, .. } - | S3Operation::DeleteObject { bucket, .. } => Some(bucket), + | S3Operation::DeleteObject { bucket, .. } + | S3Operation::DeleteObjects { bucket } => Some(bucket), S3Operation::ListBuckets => None, } } @@ -417,7 +428,9 @@ impl S3Operation { | S3Operation::CompleteMultipartUpload { key, .. } | S3Operation::AbortMultipartUpload { key, .. } | S3Operation::DeleteObject { key, .. } => key, - S3Operation::ListBucket { .. } | S3Operation::ListBuckets => "", + S3Operation::ListBucket { .. } + | S3Operation::ListBuckets + | S3Operation::DeleteObjects { .. } => "", } } } From 343645d3ec70cfacd9e7e82338c07502cd6d9373 Mon Sep 17 00:00:00 2001 From: Anthony Lukach Date: Tue, 23 Jun 2026 14:30:31 -0700 Subject: [PATCH 02/21] docs: document batch delete, copy rejection, and write-header limits MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - reference/operations.md: add DeleteObjects, document per-key batch-delete authorization, write request-header forwarding, and the new limitations (server-side copy unsupported, x-amz-* write headers dropped, versioned delete ignored). - integration: add a batch-delete roundtrip test (put 3 → delete_objects → assert all gone). Co-Authored-By: Claude Opus 4.8 --- docs/reference/operations.md | 16 ++++++++++++++-- tests/integration/test_integration.py | 20 ++++++++++++++++++++ 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/docs/reference/operations.md b/docs/reference/operations.md index 24f8abd..fe3f755 100644 --- a/docs/reference/operations.md +++ b/docs/reference/operations.md @@ -8,6 +8,7 @@ | HeadObject | `HEAD /{bucket}/{key}` | Forward | Get file metadata | | PutObject | `PUT /{bucket}/{key}` | Forward | Upload a file | | DeleteObject | `DELETE /{bucket}/{key}` | Forward | Delete a file | +| DeleteObjects | `POST /{bucket}?delete` | NeedsBody | Batch-delete up to 1000 keys (`aws s3 rm --recursive`, `delete_objects`) | | ListBucket | `GET /{bucket}` | Response | List objects in a bucket (ListObjectsV1 and V2) | | ListBuckets | `GET /` | Response | List all virtual buckets | | CreateMultipartUpload | `POST /{bucket}/{key}?uploads` | NeedsBody | Initiate a multipart upload | @@ -19,7 +20,15 @@ - **Forward** — A presigned URL is generated and returned to the runtime, which executes it with its native HTTP client. Bodies stream directly between client and backend without buffering. - **Response** — The handler builds a complete response (XML for LIST, error responses) and returns it. No presigned URL involved. -- **NeedsBody** — The runtime collects the request body, then the handler signs and sends the request via raw HTTP (`backend.send_raw()`). Multipart only. +- **NeedsBody** — The runtime collects the request body, then the handler signs and sends the request via raw HTTP (`backend.send_raw()`). Used by multipart and batch delete. + +### Batch delete authorization + +`DeleteObjects` carries its keys in the request body, so authorization happens in two stages: the bucket-level check confirms the caller may delete *something* in the bucket, then **each key in the body is authorized individually** against the caller's allowed prefixes. Keys the caller is not permitted to delete are returned as per-key `AccessDenied` entries in the `DeleteResult` (S3's partial-result semantics) and are never forwarded to the backend; authorized keys are deleted regardless. Anonymous callers cannot batch-delete. + +### Writes and request headers + +`PutObject` forwards the request body plus standard HTTP entity headers (`Content-Type`, `Content-Disposition`, `Content-Encoding`, `Content-Language`, `Cache-Control`, `Expires`, `Content-MD5`) to a presigned URL. `x-amz-*` headers (user metadata `x-amz-meta-*`, storage class, tagging, ACLs, SSE, and checksum headers such as `x-amz-checksum-*`) are **not** forwarded: S3 rejects unsigned `x-amz-*` headers on presigned requests, and the proxy presigns over `host` only. Supporting those headers requires a header-signing forward path — see the design note in `.plans/`. ## STS Operations @@ -41,5 +50,8 @@ Handled by OIDC discovery closures (registered on the `Router` via `OidcRouterEx ## Limitations > [!WARNING] -> - **Multipart is S3 only** — Multipart operations use raw HTTP with `S3RequestSigner` and are gated to `backend_type = "s3"`. Non-S3 backends should use single PUT requests. +> - **Multipart and batch delete are S3 only** — Both use raw HTTP with `S3RequestSigner` and are gated to `backend_type = "s3"`. Non-S3 backends should use single PUT/DELETE requests. > - **DeleteObject does not return confirmation** — The proxy forwards the DELETE and returns the backend's response status. +> - **Server-side copy is not supported** — A `PUT` carrying `x-amz-copy-source` (CopyObject / UploadPartCopy) is rejected with `501 NotImplemented` rather than silently overwriting the destination. +> - **`x-amz-*` write headers are dropped** — Object metadata, storage class, tagging, ACLs, SSE, and checksum headers on writes are not forwarded (see "Writes and request headers" above). +> - **Versioned/MFA delete is not handled** — A `?versionId=` on a delete is ignored; the current object version is deleted. diff --git a/tests/integration/test_integration.py b/tests/integration/test_integration.py index 01071b3..0c5b891 100644 --- a/tests/integration/test_integration.py +++ b/tests/integration/test_integration.py @@ -170,6 +170,26 @@ def test_head_object(self): # Cleanup client.delete_object(Bucket="private-uploads", Key=key) + def test_batch_delete(self): + client = static_client() + keys = [f"test-batch-{uuid.uuid4()}.txt" for _ in range(3)] + for key in keys: + client.put_object(Bucket="private-uploads", Key=key, Body=b"batch") + + resp = client.delete_objects( + Bucket="private-uploads", + Delete={"Objects": [{"Key": k} for k in keys]}, + ) + deleted = {d["Key"] for d in resp.get("Deleted", [])} + assert deleted == set(keys), resp + assert not resp.get("Errors"), resp + + # All keys are gone. + for key in keys: + with pytest.raises(ClientError) as exc_info: + client.get_object(Bucket="private-uploads", Key=key) + assert exc_info.value.response["Error"]["Code"] in ("NoSuchKey", "404") + # --------------------------------------------------------------------------- # Static credential reads From e93094015e11b65a2060d9602e27826392d94565 Mon Sep 17 00:00:00 2001 From: Anthony Lukach Date: Tue, 23 Jun 2026 15:19:34 -0700 Subject: [PATCH 03/21] test(integration): exercise multipart uploads end-to-end MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Multipart had zero end-to-end coverage — and the integration static credential and github-actions role weren't even granted the multipart actions, so the path could never have been tested. - Grant create/upload_part/complete/abort_multipart_upload on private-uploads for both the static credential and the github-actions role. - Add TestMultipartUploads: a high-level transfer-manager round-trip (forces Create + 2x UploadPart + Complete), an explicit low-level round-trip, and an Abort test. Each verifies the object bytes (or that an aborted upload cannot be completed). Co-Authored-By: Claude Opus 4.8 --- examples/cf-workers/wrangler.integration.toml | 4 +- tests/integration/test_integration.py | 95 +++++++++++++++++++ 2 files changed, 97 insertions(+), 2 deletions(-) diff --git a/examples/cf-workers/wrangler.integration.toml b/examples/cf-workers/wrangler.integration.toml index 00511e8..b73a551 100644 --- a/examples/cf-workers/wrangler.integration.toml +++ b/examples/cf-workers/wrangler.integration.toml @@ -60,7 +60,7 @@ bucket = "public-data" prefixes = [] [[vars.PROXY_CONFIG.credentials.allowed_scopes]] -actions = ["get_object", "head_object", "put_object", "delete_object", "list_bucket"] +actions = ["get_object", "head_object", "put_object", "delete_object", "list_bucket", "create_multipart_upload", "upload_part", "complete_multipart_upload", "abort_multipart_upload"] bucket = "private-uploads" prefixes = [] @@ -81,7 +81,7 @@ bucket = "public-data" prefixes = [] [[vars.PROXY_CONFIG.roles.allowed_scopes]] -actions = ["get_object", "head_object", "put_object", "delete_object", "list_bucket"] +actions = ["get_object", "head_object", "put_object", "delete_object", "list_bucket", "create_multipart_upload", "upload_part", "complete_multipart_upload", "abort_multipart_upload"] bucket = "private-uploads" prefixes = [] diff --git a/tests/integration/test_integration.py b/tests/integration/test_integration.py index 0c5b891..ea370c7 100644 --- a/tests/integration/test_integration.py +++ b/tests/integration/test_integration.py @@ -12,13 +12,18 @@ import os import uuid import xml.etree.ElementTree as ET +from io import BytesIO import boto3 import pytest import requests +from boto3.s3.transfer import TransferConfig from botocore.config import Config from botocore.exceptions import ClientError +# S3 requires every multipart part except the last to be at least 5 MiB. +MIB = 1024 * 1024 + PROXY_URL = os.environ.get("PROXY_URL", "http://localhost:8787") @@ -191,6 +196,96 @@ def test_batch_delete(self): assert exc_info.value.response["Error"]["Code"] in ("NoSuchKey", "404") +# --------------------------------------------------------------------------- +# Multipart uploads +# --------------------------------------------------------------------------- + +class TestMultipartUploads: + """Exercise the full multipart upload path (Create/UploadPart/Complete/Abort).""" + + def test_multipart_roundtrip_high_level(self): + """boto3's transfer manager: forces Create + 2x UploadPart + Complete.""" + client = static_client() + key = f"test-multipart-{uuid.uuid4()}.bin" + # 6 MiB → two parts (5 MiB + 1 MiB) at a 5 MiB chunk size. + body = b"multipart-payload-block!" * (6 * MIB // 24 + 1) + body = body[: 6 * MIB] + config = TransferConfig( + multipart_threshold=5 * MIB, + multipart_chunksize=5 * MIB, + max_concurrency=1, + use_threads=False, + ) + + client.upload_fileobj(BytesIO(body), "private-uploads", key, Config=config) + resp = client.get_object(Bucket="private-uploads", Key=key) + assert resp["Body"].read() == body + + client.delete_object(Bucket="private-uploads", Key=key) + + def test_multipart_low_level_explicit(self): + """Drive Create/UploadPart/Complete directly and verify the round-trip.""" + client = static_client() + key = f"test-mpu-explicit-{uuid.uuid4()}.bin" + part1 = b"A" * (5 * MIB) + part2 = b"B" * (2 * MIB) + + create = client.create_multipart_upload(Bucket="private-uploads", Key=key) + upload_id = create["UploadId"] + assert upload_id + + parts = [] + for num, chunk in enumerate([part1, part2], start=1): + up = client.upload_part( + Bucket="private-uploads", + Key=key, + PartNumber=num, + UploadId=upload_id, + Body=chunk, + ) + parts.append({"PartNumber": num, "ETag": up["ETag"]}) + + client.complete_multipart_upload( + Bucket="private-uploads", + Key=key, + UploadId=upload_id, + MultipartUpload={"Parts": parts}, + ) + + resp = client.get_object(Bucket="private-uploads", Key=key) + assert resp["Body"].read() == part1 + part2 + + client.delete_object(Bucket="private-uploads", Key=key) + + def test_multipart_abort(self): + """AbortMultipartUpload tears down an in-progress upload.""" + client = static_client() + key = f"test-mpu-abort-{uuid.uuid4()}.bin" + + create = client.create_multipart_upload(Bucket="private-uploads", Key=key) + upload_id = create["UploadId"] + + client.upload_part( + Bucket="private-uploads", + Key=key, + PartNumber=1, + UploadId=upload_id, + Body=b"C" * (5 * MIB), + ) + client.abort_multipart_upload( + Bucket="private-uploads", Key=key, UploadId=upload_id + ) + + # Completing an aborted upload must fail. + with pytest.raises(ClientError): + client.complete_multipart_upload( + Bucket="private-uploads", + Key=key, + UploadId=upload_id, + MultipartUpload={"Parts": [{"PartNumber": 1, "ETag": "x"}]}, + ) + + # --------------------------------------------------------------------------- # Static credential reads # --------------------------------------------------------------------------- From d323d4246ed18a9abb3d52c95a907400ca1c2188 Mon Sep 17 00:00:00 2001 From: Anthony Lukach Date: Tue, 23 Jun 2026 16:38:45 -0700 Subject: [PATCH 04/21] fix(auth): canonicalize value-less query params in SigV4 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `canonicalize_query_string` sorted the raw `&`-split parts but never normalized a value-less flag parameter to `key=value`. SigV4 requires `?uploads` to canonicalize as `uploads=` (trailing `=`); clients and backends both sign it that way. The proxy reconstructed `uploads`, so every operation whose query is a value-less flag failed signature verification. This silently broke CreateMultipartUpload (`?uploads`) and DeleteObjects (`?delete`) on both sides of the proxy — they were never exercised by the test suite, so it went unnoticed. The integration tests added alongside this change surface it as `SignatureDoesNotMatch`. - `auth/sigv4.rs`: append `=` to value-less params (and drop empty segments); unit tests for flag, valued, mixed, and empty-segment cases. - `backend/request_signer.rs`: route the outbound canonical query through the same helper so the proxy's own signature to the backend matches too. Co-Authored-By: Claude Opus 4.8 --- crates/core/src/auth/sigv4.rs | 67 ++++++++++++++++++++++- crates/core/src/backend/request_signer.rs | 8 ++- 2 files changed, 70 insertions(+), 5 deletions(-) diff --git a/crates/core/src/auth/sigv4.rs b/crates/core/src/auth/sigv4.rs index 12da1e7..a670225 100644 --- a/crates/core/src/auth/sigv4.rs +++ b/crates/core/src/auth/sigv4.rs @@ -154,12 +154,29 @@ pub fn verify_sigv4_signature( Ok(matched) } -/// Sort query string parameters for SigV4 canonical request construction. +/// Build the SigV4 canonical query string: every parameter as `key=value`, +/// sorted. +/// +/// A value-less flag parameter (e.g. `?uploads`, `?delete`) is canonicalized +/// with an empty value and a trailing `=` per the SigV4 spec. Clients (and +/// backends) sign it that way, so the proxy must reconstruct it identically on +/// both the inbound-verification and outbound-signing sides or the signature +/// will not match. Empty segments (from a stray `&`) are dropped. pub(crate) fn canonicalize_query_string(query: &str) -> String { if query.is_empty() { return String::new(); } - let mut parts: Vec<&str> = query.split('&').collect(); + let mut parts: Vec = query + .split('&') + .filter(|p| !p.is_empty()) + .map(|p| { + if p.contains('=') { + p.to_string() + } else { + format!("{p}=") + } + }) + .collect(); parts.sort_unstable(); parts.join("&") } @@ -180,3 +197,49 @@ pub(crate) fn constant_time_eq(a: &[u8], b: &[u8]) -> bool { .fold(0u8, |acc, (x, y)| acc | (x ^ y)) == 0 } + +#[cfg(test)] +mod tests { + use super::canonicalize_query_string; + + #[test] + fn empty_query_is_empty() { + assert_eq!(canonicalize_query_string(""), ""); + } + + #[test] + fn value_less_flag_gets_trailing_equals() { + // The bug that broke multipart and batch delete: `?uploads` / `?delete` + // must canonicalize to `uploads=` / `delete=`. + assert_eq!(canonicalize_query_string("uploads"), "uploads="); + assert_eq!(canonicalize_query_string("delete"), "delete="); + } + + #[test] + fn valued_params_are_sorted_and_unchanged() { + assert_eq!( + canonicalize_query_string("list-type=2&prefix=foo"), + "list-type=2&prefix=foo" + ); + // Sorting is by the full encoded parameter. + assert_eq!( + canonicalize_query_string("partNumber=1&uploadId=abc"), + "partNumber=1&uploadId=abc" + ); + assert_eq!(canonicalize_query_string("b=2&a=1"), "a=1&b=2"); + } + + #[test] + fn mixed_flag_and_valued_params() { + // Real shape of a versioned delete-style request. + assert_eq!( + canonicalize_query_string("versionId=v1&delete"), + "delete=&versionId=v1" + ); + } + + #[test] + fn stray_empty_segments_are_dropped() { + assert_eq!(canonicalize_query_string("delete&"), "delete="); + } +} diff --git a/crates/core/src/backend/request_signer.rs b/crates/core/src/backend/request_signer.rs index ad023fb..bf693d3 100644 --- a/crates/core/src/backend/request_signer.rs +++ b/crates/core/src/backend/request_signer.rs @@ -5,7 +5,7 @@ //! (CreateMultipartUpload, UploadPart, CompleteMultipartUpload, //! AbortMultipartUpload) that go through [`backend::ProxyBackend::send_raw`](crate::backend::ProxyBackend::send_raw). -use crate::auth::sigv4::hmac_sha256; +use crate::auth::sigv4::{canonicalize_query_string, hmac_sha256}; use crate::error::ProxyError; use http::HeaderMap; @@ -75,9 +75,11 @@ impl S3RequestSigner { }; headers.insert("host", host_header.parse().unwrap()); - // Canonical request + // Canonical request. The query must be in SigV4 canonical form — every + // parameter as `key=value` (value-less flags like `?uploads`/`?delete` + // get a trailing `=`), sorted — or the backend rejects the signature. let canonical_uri = url.path(); - let canonical_querystring = url.query().unwrap_or(""); + let canonical_querystring = canonicalize_query_string(url.query().unwrap_or("")); let mut signed_header_names: Vec<&str> = headers.keys().map(|k| k.as_str()).collect(); signed_header_names.sort(); From 42c17346bebf06025da3bc2d6cb034ee85848790 Mon Sep 17 00:00:00 2001 From: Anthony Lukach Date: Tue, 23 Jun 2026 16:38:45 -0700 Subject: [PATCH 05/21] test(integration): strengthen PUT coverage - test_put_larger_body_single_request: a 2 MiB single PUT round-trips intact (non-trivial, streamed, still below the multipart threshold). - test_put_preserves_content_headers: Content-Type / Content-Disposition / Cache-Control set on PUT survive the round-trip, exercising the widened PUT forward allowlist. `x-amz-meta-*` is intentionally not asserted (deferred). Co-Authored-By: Claude Opus 4.8 --- tests/integration/test_integration.py | 41 +++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/tests/integration/test_integration.py b/tests/integration/test_integration.py index ea370c7..bc08438 100644 --- a/tests/integration/test_integration.py +++ b/tests/integration/test_integration.py @@ -140,6 +140,47 @@ def test_put_then_get_roundtrip(self): # Cleanup client.delete_object(Bucket="private-uploads", Key=key) + def test_put_larger_body_single_request(self): + """A non-trivial single PUT (streamed, not buffered) round-trips intact.""" + client = static_client() + key = f"test-large-{uuid.uuid4()}.bin" + # 2 MiB: well above the trivial happy-path size, still a single PUT + # (put_object never switches to multipart). + body = bytes((i % 251 for i in range(2 * MIB))) + + client.put_object(Bucket="private-uploads", Key=key, Body=body) + resp = client.get_object(Bucket="private-uploads", Key=key) + data = resp["Body"].read() + assert len(data) == len(body) + assert data == body + + client.delete_object(Bucket="private-uploads", Key=key) + + def test_put_preserves_content_headers(self): + """Standard entity headers set on PUT survive the round-trip. + + Exercises the widened PUT forward allowlist (Content-Type was always + forwarded; Content-Disposition / Cache-Control are new). Note: + `x-amz-meta-*` user metadata is intentionally NOT forwarded (it requires + the deferred header-signing path), so it is not asserted. + """ + client = static_client() + key = f"test-headers-{uuid.uuid4()}.txt" + client.put_object( + Bucket="private-uploads", + Key=key, + Body=b"payload with content metadata", + ContentType="application/json", + ContentDisposition='attachment; filename="report.json"', + CacheControl="max-age=3600", + ) + resp = client.get_object(Bucket="private-uploads", Key=key) + assert resp["ContentType"] == "application/json" + assert resp["ContentDisposition"] == 'attachment; filename="report.json"' + assert resp["CacheControl"] == "max-age=3600" + + client.delete_object(Bucket="private-uploads", Key=key) + def test_list_after_write(self): client = static_client() key = f"test-list-{uuid.uuid4()}.txt" From f2b781a4f2e8df005d9a2a61dad4ef0694505e10 Mon Sep 17 00:00:00 2001 From: Anthony Lukach Date: Tue, 23 Jun 2026 16:50:43 -0700 Subject: [PATCH 06/21] fix(cf-workers): don't consume body before converting send_raw response MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `send_raw` read the response body via `worker_resp.bytes().await` and *then* converted `worker::Response` into `web_sys::Response` to read the headers. That conversion panics ("unreachable") once the body has been consumed, so every `send_raw` response aborted the worker with a bare 500 — breaking all multipart operations and batch delete on the edge runtime. It went unnoticed because no test exercised the raw-HTTP path on Workers. Convert to `web_sys::Response` and read the headers first (as `forward()` already does), then read the body via `arrayBuffer()`. Verified end-to-end against `wrangler dev` + MinIO: multipart round-trip, abort, and batch delete all pass. Co-Authored-By: Claude Opus 4.8 --- crates/cf-workers/src/backend.rs | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/crates/cf-workers/src/backend.rs b/crates/cf-workers/src/backend.rs index 3c6e11b..8ba5f42 100644 --- a/crates/cf-workers/src/backend.rs +++ b/crates/cf-workers/src/backend.rs @@ -146,22 +146,31 @@ impl ProxyBackend for WorkerBackend { // Fetch via worker let worker_req: worker::Request = ws_request.into(); - let mut worker_resp = Fetch::Request(worker_req) + let worker_resp = Fetch::Request(worker_req) .send() .await .map_err(|e| ProxyError::BackendError(format!("fetch failed: {}", e)))?; let status = worker_resp.status_code(); - // Read response body as bytes (multipart responses are small) - let resp_bytes = worker_resp - .bytes() - .await - .map_err(|e| ProxyError::Internal(format!("failed to read response: {}", e)))?; - + // Convert to `web_sys::Response` and read the headers BEFORE consuming + // the body. The `worker::Response → web_sys::Response` conversion panics + // once the body has been read, so reading bytes first (and converting + // after) blew up `send_raw` on every multipart/batch-delete response. + // `forward()` relies on the same before-body ordering. let ws_response: web_sys::Response = worker_resp.into(); let resp_headers = headermap_from_js(&ws_response.headers()); + // Read the (small) response body via `arrayBuffer()`. + let buf = wasm_bindgen_futures::JsFuture::from( + ws_response + .array_buffer() + .map_err(|e| ProxyError::Internal(format!("arrayBuffer() failed: {:?}", e)))?, + ) + .await + .map_err(|e| ProxyError::Internal(format!("failed to read response: {:?}", e)))?; + let resp_bytes = js_sys::Uint8Array::new(&buf).to_vec(); + Ok(RawResponse { status, headers: resp_headers, From eb49944eb6d74b08f8efe7ab94698ecfb271fcb0 Mon Sep 17 00:00:00 2001 From: Anthony Lukach Date: Tue, 23 Jun 2026 16:57:10 -0700 Subject: [PATCH 07/21] chore: add `make test-integration` for one-command local runs Running the integration suite locally previously meant six manual steps (docker compose, worker-build, wrangler dev, wait, pytest, cleanup). Wrap them in a single command that mirrors CI: MinIO via docker compose behind the Workers runtime (wrangler dev), with wrangler torn down on exit. - scripts/integration-test.sh: orchestrates the stack; polls a seeded public object for MinIO readiness (avoids `--wait`, which fails on the one-shot seeder), auto-installs worker-build if missing, and traps cleanup. - Makefile: `make test-integration` (extra pytest args via ARGS). Co-Authored-By: Claude Opus 4.8 --- Makefile | 7 +++- scripts/integration-test.sh | 78 +++++++++++++++++++++++++++++++++++++ 2 files changed, 84 insertions(+), 1 deletion(-) create mode 100755 scripts/integration-test.sh diff --git a/Makefile b/Makefile index 42dcf55..9c39ed4 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: check test run-server run-workers ci docs +.PHONY: check test test-integration run-server run-workers ci docs build: build-workers @@ -24,6 +24,11 @@ clippy-fix: test: cargo test +# Run the integration suite locally: MinIO (docker compose) + the Workers +# runtime (wrangler dev), mirroring CI. Pass extra pytest args via ARGS. +test-integration: + ./scripts/integration-test.sh $(ARGS) + run-server: cargo run -p multistore-server -- $(ARGS) diff --git a/scripts/integration-test.sh b/scripts/integration-test.sh new file mode 100755 index 0000000..859ead6 --- /dev/null +++ b/scripts/integration-test.sh @@ -0,0 +1,78 @@ +#!/usr/bin/env bash +# +# Run the integration test suite locally — the same setup CI uses: +# MinIO (via docker compose) behind the Cloudflare Workers runtime (wrangler dev). +# +# Usage: +# ./scripts/integration-test.sh [extra pytest args] +# make test-integration +# +# Prerequisites: docker, node/npx, uv (uvx), and the wasm32 Rust target +# (`rustup target add wasm32-unknown-unknown`). worker-build is installed +# automatically if missing. +set -euo pipefail + +cd "$(dirname "$0")/.." + +PORT="${PROXY_PORT:-8787}" +PROXY_URL="http://localhost:${PORT}" +WORKER_DIR="examples/cf-workers" + +cleanup() { + # Kill wrangler (and the workerd children it spawns). + pkill -f "wrangler dev --config wrangler.integration.toml" 2>/dev/null || true +} +trap cleanup EXIT + +echo "==> Starting MinIO + seeding buckets (docker compose)" +# Not `--wait`: the one-shot seeder (minio-init) exits, which `--wait` treats as +# a failure. Instead poll for a seeded public object — that confirms MinIO is up +# *and* the buckets are seeded. +docker compose up -d +for i in $(seq 1 30); do + if curl -so /dev/null "http://localhost:9000/public-data/hello.txt"; then + echo " MinIO ready and seeded" + break + fi + if [ "$i" -eq 30 ]; then + echo "ERROR: MinIO did not become ready within 30s" >&2 + exit 1 + fi + sleep 1 +done + +if ! command -v worker-build >/dev/null 2>&1; then + echo "==> Installing worker-build" + cargo install worker-build --version '^0.7' +fi + +echo "==> Building worker (WASM)" +( cd "$WORKER_DIR" && worker-build --release ) + +# wrangler dev needs SESSION_TOKEN_KEY; generate a throwaway one if absent. +if [ ! -f "$WORKER_DIR/.dev.vars" ]; then + echo "==> Writing throwaway $WORKER_DIR/.dev.vars" + echo "SESSION_TOKEN_KEY=$(openssl rand -base64 32)" > "$WORKER_DIR/.dev.vars" +fi + +echo "==> Starting wrangler dev on :${PORT}" +( cd "$WORKER_DIR" && npx wrangler dev --config wrangler.integration.toml --port "$PORT" ) \ + > /tmp/multistore-wrangler.log 2>&1 & + +echo "==> Waiting for the proxy to accept requests" +for i in $(seq 1 60); do + if curl -so /dev/null "${PROXY_URL}/"; then + echo " ready" + break + fi + if [ "$i" -eq 60 ]; then + echo "ERROR: proxy did not start within 120s; wrangler log:" >&2 + tail -20 /tmp/multistore-wrangler.log >&2 + exit 1 + fi + sleep 2 +done + +echo "==> Running integration tests" +PROXY_URL="$PROXY_URL" uvx --with pytest,boto3,requests \ + pytest tests/integration/ -v "$@" From 1ba0ec87460bec22da7c4c89607d3392f4b6ee62 Mon Sep 17 00:00:00 2001 From: Anthony Lukach Date: Tue, 23 Jun 2026 17:03:14 -0700 Subject: [PATCH 08/21] docs: note Cloudflare Workers upload size limit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Document that the Workers runtime is bound by Cloudflare's request-body size limit (100 MB Free/Pro … 500 MB Enterprise), enforced at the edge with a 413. Guidance: use multipart for large objects (only part size must stay under the limit) and configure clients to chunk below it. Cross-references the streaming UploadPart follow-up (#89) and notes the native/Lambda runtimes differ. Co-Authored-By: Claude Opus 4.8 --- docs/reference/operations.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/docs/reference/operations.md b/docs/reference/operations.md index fe3f755..299ebf6 100644 --- a/docs/reference/operations.md +++ b/docs/reference/operations.md @@ -55,3 +55,14 @@ Handled by OIDC discovery closures (registered on the `Router` via `OidcRouterEx > - **Server-side copy is not supported** — A `PUT` carrying `x-amz-copy-source` (CopyObject / UploadPartCopy) is rejected with `501 NotImplemented` rather than silently overwriting the destination. > - **`x-amz-*` write headers are dropped** — Object metadata, storage class, tagging, ACLs, SSE, and checksum headers on writes are not forwarded (see "Writes and request headers" above). > - **Versioned/MFA delete is not handled** — A `?versionId=` on a delete is ignored; the current object version is deleted. + +### Upload size on the Cloudflare Workers runtime + +The Workers runtime is bounded by Cloudflare's [request-body size limit](https://developers.cloudflare.com/workers/platform/limits/#request-and-response-limits) — 100 MB on Free/Pro, 200 MB on Business, 500 MB (default) on Enterprise. This is a **hard platform limit enforced at Cloudflare's edge**: a request body larger than the plan limit is rejected with `413` before the proxy can act on it, and the proxy cannot raise it. + +Consequences and guidance: + +- **A single `PutObject` cannot exceed the plan body limit.** Upload larger objects as a **multipart upload**: each `UploadPart` is a separate request, so only the *part* size must stay under the limit. With S3's 10,000-part maximum, 100 MB parts allow objects up to ~1 TB even on Free/Pro. +- **Configure clients to chunk below the limit.** e.g. boto3 `TransferConfig(multipart_threshold=…, multipart_chunksize=…)` with a chunk size under the plan limit; aws-cli's `s3.multipart_chunksize`. +- Until [streaming `UploadPart`](https://github.com/developmentseed/multistore/issues/89) lands, parts on Workers are additionally capped by the 128 MB worker memory limit (parts are buffered in WASM). Keep `multipart_chunksize` comfortably below 100 MB. +- The native server and Lambda runtimes have their own, generally higher, limits — this constraint is specific to Workers. From c99f39b24d87f5eb7beba5134bbd99595e5baff4 Mon Sep 17 00:00:00 2001 From: Anthony Lukach Date: Tue, 23 Jun 2026 17:19:39 -0700 Subject: [PATCH 09/21] feat: reject oversized uploads with EntityTooLarge MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cloudflare Workers rejects request bodies over the plan limit with an opaque edge 413 before the proxy can shape the response. Give clients an actionable S3 error instead: when configured, reject a `PutObject`/`UploadPart` whose declared `Content-Length` exceeds a maximum with `EntityTooLarge` (HTTP 400). - core: add `ProxyError::EntityTooLarge` (→ S3 `EntityTooLarge`, 400) and `ProxyGateway::with_max_request_body_size`. `check_upload_size` runs in the PutObject and UploadPart dispatch arms; no limit (the default) preserves current behavior, and requests without a `Content-Length` fall through to the runtime's own limit. Unit tests cover over/under/no-limit for both ops. - cf-workers example: read the limit from the `MAX_UPLOAD_BYTES` env var. - integration: set `MAX_UPLOAD_BYTES=10 MiB` in the test config (above the part sizes the multipart tests use) and assert a 12 MiB PUT is rejected with `EntityTooLarge`. - docs: document the knob under the Workers upload-size limitations. Co-Authored-By: Claude Opus 4.8 --- crates/core/src/error.rs | 8 ++ crates/core/src/proxy.rs | 134 ++++++++++++++++++ docs/reference/operations.md | 2 + examples/cf-workers/src/lib.rs | 11 ++ examples/cf-workers/wrangler.integration.toml | 4 + tests/integration/test_integration.py | 12 ++ 6 files changed, 171 insertions(+) diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs index ecddae4..e6772ab 100644 --- a/crates/core/src/error.rs +++ b/crates/core/src/error.rs @@ -31,6 +31,12 @@ pub enum ProxyError { #[error("not implemented: {0}")] NotImplemented(String), + /// The upload body exceeds the proxy's configured maximum size. Returned + /// as S3's `EntityTooLarge` so clients get an actionable error instead of a + /// runtime-specific rejection (e.g. Cloudflare's edge `413`). + #[error("entity too large")] + EntityTooLarge, + /// The request contains no authentication credentials. #[error("missing authentication")] MissingAuth, @@ -89,6 +95,7 @@ impl ProxyError { Self::SignatureDoesNotMatch => "SignatureDoesNotMatch", Self::InvalidRequest(_) => "InvalidRequest", Self::NotImplemented(_) => "NotImplemented", + Self::EntityTooLarge => "EntityTooLarge", Self::MissingAuth => "AccessDenied", Self::ExpiredCredentials => "ExpiredToken", Self::InvalidOidcToken(_) => "InvalidIdentityToken", @@ -110,6 +117,7 @@ impl ProxyError { Self::SignatureDoesNotMatch => 403, Self::InvalidRequest(_) => 400, Self::NotImplemented(_) => 501, + Self::EntityTooLarge => 400, Self::InvalidOidcToken(_) => 400, Self::RoleNotFound(_) => 403, Self::PreconditionFailed => 412, diff --git a/crates/core/src/proxy.rs b/crates/core/src/proxy.rs index 02270ef..45e45bc 100644 --- a/crates/core/src/proxy.rs +++ b/crates/core/src/proxy.rs @@ -144,6 +144,12 @@ pub struct ProxyGateway { /// When true, responses include a `Server-Timing` header with gateway /// processing metrics. Enabled by default. server_timing: bool, + /// Maximum accepted upload body size in bytes, if set. When a `PutObject` + /// or `UploadPart` declares a `Content-Length` larger than this, the proxy + /// rejects it with `EntityTooLarge` instead of forwarding it. Useful for + /// surfacing a clean S3 error ahead of a runtime body-size limit (e.g. + /// Cloudflare Workers' edge `413`). `None` means no proxy-enforced limit. + max_request_body_size: Option, } impl ProxyGateway @@ -176,6 +182,7 @@ where debug_errors: false, user_agent: DEFAULT_USER_AGENT.to_string(), server_timing: true, + max_request_body_size: None, } } @@ -246,6 +253,44 @@ where self } + /// Set the maximum accepted upload body size, in bytes. + /// + /// When set, a `PutObject` or `UploadPart` whose `Content-Length` exceeds + /// this is rejected up front with S3's `EntityTooLarge` (HTTP 400) rather + /// than forwarded. Use this on runtimes with a hard request-body limit — + /// e.g. Cloudflare Workers, where the edge otherwise rejects oversized + /// bodies with an opaque `413` — to give clients an actionable S3 error. + /// + /// The check relies on a declared `Content-Length`; requests without one + /// (e.g. unknown-length streaming) fall through to the runtime's own limit. + /// `None` (the default) disables the proxy-enforced limit. + pub fn with_max_request_body_size(mut self, max_bytes: Option) -> Self { + self.max_request_body_size = max_bytes; + self + } + + /// Reject an upload whose declared `Content-Length` exceeds the configured + /// maximum. No-op when no limit is set or no `Content-Length` is present. + fn check_upload_size(&self, headers: &HeaderMap) -> Result<(), ProxyError> { + if let Some(max) = self.max_request_body_size { + let declared = headers + .get(http::header::CONTENT_LENGTH) + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse::().ok()); + if let Some(len) = declared { + if len > max { + tracing::warn!( + content_length = len, + max = max, + "rejecting upload exceeding configured max body size" + ); + return Err(ProxyError::EntityTooLarge); + } + } + } + Ok(()) + } + /// Inject a `Server-Timing` header into the response headers if enabled. fn maybe_inject_server_timing( &self, @@ -764,6 +809,7 @@ where Ok(HandlerAction::Forward(fwd)) } S3Operation::PutObject { key, .. } => { + self.check_upload_size(original_headers)?; let fwd = self .build_forward( Method::PUT, @@ -830,6 +876,10 @@ where bucket_config.backend_type ))); } + // UploadPart carries the part body; reject oversized parts up + // front. (Create/Complete/Abort bodies are small and won't trip + // a sane limit.) + self.check_upload_size(original_headers)?; Ok(HandlerAction::NeedsBody(PendingRequest { operation: operation.clone(), bucket_config: bucket_config.clone(), @@ -1570,6 +1620,90 @@ mod tests { }); } + // -- Max upload size (EntityTooLarge) ------------------------------------ + + #[test] + fn put_over_max_body_size_is_rejected() { + run(async { + let gw = gateway().with_max_request_body_size(Some(1024)); + let mut headers = HeaderMap::new(); + headers.insert("content-length", "2048".parse().unwrap()); + let action = gw + .resolve_request(Method::PUT, "/test-bucket/big.bin", None, &headers, None) + .await; + match action { + HandlerAction::Response(r) => assert_eq!( + r.status, 400, + "oversized PUT should be rejected with EntityTooLarge (400)" + ), + other => panic!( + "expected Response, got {:?}", + std::mem::discriminant(&other) + ), + } + }); + } + + #[test] + fn put_under_max_body_size_forwards() { + run(async { + let gw = gateway().with_max_request_body_size(Some(1_000_000)); + let mut headers = HeaderMap::new(); + headers.insert("content-length", "1024".parse().unwrap()); + let action = gw + .resolve_request(Method::PUT, "/test-bucket/ok.bin", None, &headers, None) + .await; + assert!( + matches!(action, HandlerAction::Forward(_)), + "PUT within the limit should forward" + ); + }); + } + + #[test] + fn put_with_no_limit_forwards_large_body() { + run(async { + let gw = gateway(); // default: no proxy-enforced limit + let mut headers = HeaderMap::new(); + headers.insert("content-length", "999999999".parse().unwrap()); + let action = gw + .resolve_request(Method::PUT, "/test-bucket/huge.bin", None, &headers, None) + .await; + assert!( + matches!(action, HandlerAction::Forward(_)), + "with no limit configured, large PUT should still forward" + ); + }); + } + + #[test] + fn upload_part_over_max_body_size_is_rejected() { + run(async { + let gw = gateway().with_max_request_body_size(Some(1024)); + let mut headers = HeaderMap::new(); + headers.insert("content-length", "5000".parse().unwrap()); + let action = gw + .resolve_request( + Method::PUT, + "/test-bucket/key.bin", + Some("partNumber=1&uploadId=abc"), + &headers, + None, + ) + .await; + match action { + HandlerAction::Response(r) => assert_eq!( + r.status, 400, + "oversized UploadPart should be rejected with EntityTooLarge (400)" + ), + other => panic!( + "expected Response, got {:?}", + std::mem::discriminant(&other) + ), + } + }); + } + // -- Middleware test types ----------------------------------------------- struct BlockMiddleware; diff --git a/docs/reference/operations.md b/docs/reference/operations.md index 299ebf6..a36aaff 100644 --- a/docs/reference/operations.md +++ b/docs/reference/operations.md @@ -66,3 +66,5 @@ Consequences and guidance: - **Configure clients to chunk below the limit.** e.g. boto3 `TransferConfig(multipart_threshold=…, multipart_chunksize=…)` with a chunk size under the plan limit; aws-cli's `s3.multipart_chunksize`. - Until [streaming `UploadPart`](https://github.com/developmentseed/multistore/issues/89) lands, parts on Workers are additionally capped by the 128 MB worker memory limit (parts are buffered in WASM). Keep `multipart_chunksize` comfortably below 100 MB. - The native server and Lambda runtimes have their own, generally higher, limits — this constraint is specific to Workers. + +**Surfacing a clean error.** Configure the gateway with [`with_max_request_body_size`](https://docs.rs/multistore/latest/multistore/proxy/struct.ProxyGateway.html) so a `PutObject`/`UploadPart` whose `Content-Length` exceeds the limit is rejected up front with S3's `EntityTooLarge` (HTTP 400) — an actionable error instead of Cloudflare's opaque `413`. The Workers example reads this from the `MAX_UPLOAD_BYTES` environment variable; set it to your plan's request-body limit (e.g. `104857600` for 100 MB). The check requires a declared `Content-Length`; unknown-length streaming requests fall through to the platform limit. diff --git a/examples/cf-workers/src/lib.rs b/examples/cf-workers/src/lib.rs index c35e20a..46b6390 100644 --- a/examples/cf-workers/src/lib.rs +++ b/examples/cf-workers/src/lib.rs @@ -93,6 +93,17 @@ async fn fetch(req: web_sys::Request, env: Env, _ctx: Context) -> Result().ok()) + { + gateway = gateway.with_max_request_body_size(Some(max)); + } + // Parse request metadata and extract the body stream (zero-copy). let (parts, js_body) = RequestParts::from_web_sys(&req).map_err(|e| worker::Error::RustError(e))?; diff --git a/examples/cf-workers/wrangler.integration.toml b/examples/cf-workers/wrangler.integration.toml index b73a551..4abfed2 100644 --- a/examples/cf-workers/wrangler.integration.toml +++ b/examples/cf-workers/wrangler.integration.toml @@ -14,6 +14,10 @@ command = "cargo install worker-build --version '^0.7' && worker-build --release [vars] VIRTUAL_HOST_DOMAIN = "s3.local" +# Reject uploads over 10 MiB with EntityTooLarge (kept above the part sizes the +# multipart tests use). Production should set this to the plan's request-body +# limit; see docs/reference/operations.md. +MAX_UPLOAD_BYTES = "10485760" [vars.PROXY_CONFIG] diff --git a/tests/integration/test_integration.py b/tests/integration/test_integration.py index bc08438..5964d47 100644 --- a/tests/integration/test_integration.py +++ b/tests/integration/test_integration.py @@ -236,6 +236,18 @@ def test_batch_delete(self): client.get_object(Bucket="private-uploads", Key=key) assert exc_info.value.response["Error"]["Code"] in ("NoSuchKey", "404") + def test_oversized_put_rejected_entity_too_large(self): + """A PUT exceeding MAX_UPLOAD_BYTES (10 MiB in the test config) is + rejected with EntityTooLarge rather than forwarded to the backend.""" + client = static_client() + key = f"test-toolarge-{uuid.uuid4()}.bin" + body = b"z" * (12 * MIB) # over the 10 MiB limit + with pytest.raises(ClientError) as exc_info: + client.put_object(Bucket="private-uploads", Key=key, Body=body) + err = exc_info.value.response["Error"] + assert err["Code"] == "EntityTooLarge", err + assert exc_info.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400 + # --------------------------------------------------------------------------- # Multipart uploads From 7d32fe82bddf4430a317bc38a602e26a127bb562 Mon Sep 17 00:00:00 2001 From: Anthony Lukach Date: Tue, 23 Jun 2026 18:04:02 -0700 Subject: [PATCH 10/21] refactor: tidy data-edit code per over-engineering review Small simplifications, no behavior change: - `build_object_path` was a duplicate of `apply_backend_prefix` with a Path return type; collapse it to `Path::from(apply_backend_prefix(...))`. - `with_max_request_body_size` took `Option` but every caller passed `Some(_)`; take `u64` and wrap internally. - Drop the unused `Default` derive on `BackendOutcome` (only ever built with an explicit literal). Co-Authored-By: Claude Opus 4.8 --- crates/core/src/api/delete.rs | 2 +- crates/core/src/proxy.rs | 30 ++++++++---------------------- examples/cf-workers/src/lib.rs | 2 +- 3 files changed, 10 insertions(+), 24 deletions(-) diff --git a/crates/core/src/api/delete.rs b/crates/core/src/api/delete.rs index d704583..f000e20 100644 --- a/crates/core/src/api/delete.rs +++ b/crates/core/src/api/delete.rs @@ -101,7 +101,7 @@ pub struct DeleteError { } /// The backend's keys that were deleted and any per-key errors it reported. -#[derive(Debug, Default)] +#[derive(Debug)] pub struct BackendOutcome { /// Backend keys reported as deleted. pub deleted: Vec, diff --git a/crates/core/src/proxy.rs b/crates/core/src/proxy.rs index 45e45bc..f2969e8 100644 --- a/crates/core/src/proxy.rs +++ b/crates/core/src/proxy.rs @@ -263,9 +263,9 @@ where /// /// The check relies on a declared `Content-Length`; requests without one /// (e.g. unknown-length streaming) fall through to the runtime's own limit. - /// `None` (the default) disables the proxy-enforced limit. - pub fn with_max_request_body_size(mut self, max_bytes: Option) -> Self { - self.max_request_body_size = max_bytes; + /// Leaving this unset (the default) disables the proxy-enforced limit. + pub fn with_max_request_body_size(mut self, max_bytes: u64) -> Self { + self.max_request_body_size = Some(max_bytes); self } @@ -1267,25 +1267,11 @@ fn error_response(err: &ProxyError, resource: &str, request_id: &str, debug: boo /// Build an object_store Path from a bucket config and client-visible key. fn build_object_path(config: &BucketConfig, key: &str) -> object_store::path::Path { - match &config.backend_prefix { - Some(prefix) => { - let p = prefix.trim_end_matches('/'); - if p.is_empty() { - object_store::path::Path::from(key) - } else { - let mut full_key = String::with_capacity(p.len() + 1 + key.len()); - full_key.push_str(p); - full_key.push('/'); - full_key.push_str(key); - object_store::path::Path::from(full_key) - } - } - None => object_store::path::Path::from(key), - } + object_store::path::Path::from(apply_backend_prefix(config, key)) } /// Map a client-visible key into the backend key space by prepending -/// `backend_prefix` (the string counterpart of [`build_object_path`]). +/// `backend_prefix`. fn apply_backend_prefix(config: &BucketConfig, key: &str) -> String { match &config.backend_prefix { Some(prefix) => { @@ -1625,7 +1611,7 @@ mod tests { #[test] fn put_over_max_body_size_is_rejected() { run(async { - let gw = gateway().with_max_request_body_size(Some(1024)); + let gw = gateway().with_max_request_body_size(1024); let mut headers = HeaderMap::new(); headers.insert("content-length", "2048".parse().unwrap()); let action = gw @@ -1647,7 +1633,7 @@ mod tests { #[test] fn put_under_max_body_size_forwards() { run(async { - let gw = gateway().with_max_request_body_size(Some(1_000_000)); + let gw = gateway().with_max_request_body_size(1_000_000); let mut headers = HeaderMap::new(); headers.insert("content-length", "1024".parse().unwrap()); let action = gw @@ -1679,7 +1665,7 @@ mod tests { #[test] fn upload_part_over_max_body_size_is_rejected() { run(async { - let gw = gateway().with_max_request_body_size(Some(1024)); + let gw = gateway().with_max_request_body_size(1024); let mut headers = HeaderMap::new(); headers.insert("content-length", "5000".parse().unwrap()); let action = gw diff --git a/examples/cf-workers/src/lib.rs b/examples/cf-workers/src/lib.rs index 46b6390..3a4c28c 100644 --- a/examples/cf-workers/src/lib.rs +++ b/examples/cf-workers/src/lib.rs @@ -101,7 +101,7 @@ async fn fetch(req: web_sys::Request, env: Env, _ctx: Context) -> Result().ok()) { - gateway = gateway.with_max_request_body_size(Some(max)); + gateway = gateway.with_max_request_body_size(max); } // Parse request metadata and extract the body stream (zero-copy). From 79982a517f6d5c1628ab85a510cb5045c528ebef Mon Sep 17 00:00:00 2001 From: Anthony Lukach Date: Tue, 23 Jun 2026 18:22:26 -0700 Subject: [PATCH 11/21] fix(s3): bound and validate batch-delete requests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Review found DeleteObjects was unbounded: `check_upload_size` was wired into PutObject/UploadPart but not the batch-delete arm, and the key list had no cap — an authenticated caller could submit a platform-limit-sized body with unbounded keys (memory + backend fan-out). Also aligns error codes with S3. - Add `ProxyError::MalformedXml` (→ S3 `MalformedXML`, 400). - `DeleteRequest::parse` rejects empty bodies AND >1000 keys with MalformedXML (was `InvalidRequest`; the doc comment already claimed MalformedXML). Add `MAX_KEYS` and tests for the cap and the error code. - Call `check_upload_size` in the DeleteObjects dispatch arm so the configured max body size bounds the buffered delete body too. Co-Authored-By: Claude Opus 4.8 --- crates/core/src/api/delete.rs | 52 +++++++++++++++++++++++++++++------ crates/core/src/error.rs | 8 ++++++ crates/core/src/proxy.rs | 3 ++ 3 files changed, 55 insertions(+), 8 deletions(-) diff --git a/crates/core/src/api/delete.rs b/crates/core/src/api/delete.rs index f000e20..e27aabf 100644 --- a/crates/core/src/api/delete.rs +++ b/crates/core/src/api/delete.rs @@ -35,18 +35,28 @@ pub struct DeleteObjectEntry { } impl DeleteRequest { + /// The maximum number of objects S3 accepts in a single batch delete. + pub const MAX_KEYS: usize = 1000; + /// Parse a batch-delete request body. /// - /// Returns an error if the body is not well-formed XML or names no objects - /// (S3 rejects an empty delete with `MalformedXML`). + /// Mirrors S3's `MalformedXML` rejections: the body must be well-formed XML, + /// name at least one object, and name no more than [`MAX_KEYS`](Self::MAX_KEYS). pub fn parse(body: &[u8]) -> Result { let req: DeleteRequest = quick_xml::de::from_reader(body) - .map_err(|e| ProxyError::InvalidRequest(format!("malformed delete body: {e}")))?; + .map_err(|e| ProxyError::MalformedXml(format!("malformed delete body: {e}")))?; if req.objects.is_empty() { - return Err(ProxyError::InvalidRequest( + return Err(ProxyError::MalformedXml( "delete request names no objects".into(), )); } + if req.objects.len() > Self::MAX_KEYS { + return Err(ProxyError::MalformedXml(format!( + "delete request names {} objects, exceeding the {}-key limit", + req.objects.len(), + Self::MAX_KEYS + ))); + } Ok(req) } @@ -191,14 +201,40 @@ mod tests { } #[test] - fn empty_delete_is_rejected() { + fn empty_delete_is_rejected_as_malformed_xml() { let body = br#""#; - assert!(DeleteRequest::parse(body).is_err()); + assert!(matches!( + DeleteRequest::parse(body), + Err(ProxyError::MalformedXml(_)) + )); } #[test] - fn malformed_body_is_rejected() { - assert!(DeleteRequest::parse(b"not xml").is_err()); + fn malformed_body_is_rejected_as_malformed_xml() { + assert!(matches!( + DeleteRequest::parse(b"not xml"), + Err(ProxyError::MalformedXml(_)) + )); + } + + #[test] + fn over_key_limit_is_rejected() { + let mut body = String::from(""); + for i in 0..=DeleteRequest::MAX_KEYS { + body.push_str(&format!("k{i}")); + } + body.push_str(""); + assert!(matches!( + DeleteRequest::parse(body.as_bytes()), + Err(ProxyError::MalformedXml(_)) + )); + // Exactly MAX_KEYS is allowed. + let mut ok = String::from(""); + for i in 0..DeleteRequest::MAX_KEYS { + ok.push_str(&format!("k{i}")); + } + ok.push_str(""); + assert!(DeleteRequest::parse(ok.as_bytes()).is_ok()); } #[test] diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs index e6772ab..c28dd2a 100644 --- a/crates/core/src/error.rs +++ b/crates/core/src/error.rs @@ -25,6 +25,12 @@ pub enum ProxyError { #[error("invalid request: {0}")] InvalidRequest(String), + /// The XML in the request body was not well-formed or did not validate + /// against the expected schema (e.g. a batch-delete body that is malformed, + /// empty, or exceeds the 1000-key limit). Maps to S3's `400 MalformedXML`. + #[error("malformed XML: {0}")] + MalformedXml(String), + /// The requested S3 operation is recognized but not supported by the proxy /// (e.g. server-side copy via `x-amz-copy-source`). Maps to S3's /// `501 NotImplemented`. @@ -94,6 +100,7 @@ impl ProxyError { Self::AccessDenied => "AccessDenied", Self::SignatureDoesNotMatch => "SignatureDoesNotMatch", Self::InvalidRequest(_) => "InvalidRequest", + Self::MalformedXml(_) => "MalformedXML", Self::NotImplemented(_) => "NotImplemented", Self::EntityTooLarge => "EntityTooLarge", Self::MissingAuth => "AccessDenied", @@ -116,6 +123,7 @@ impl ProxyError { Self::AccessDenied | Self::MissingAuth | Self::ExpiredCredentials => 403, Self::SignatureDoesNotMatch => 403, Self::InvalidRequest(_) => 400, + Self::MalformedXml(_) => 400, Self::NotImplemented(_) => 501, Self::EntityTooLarge => 400, Self::InvalidOidcToken(_) => 400, diff --git a/crates/core/src/proxy.rs b/crates/core/src/proxy.rs index f2969e8..dc5bd54 100644 --- a/crates/core/src/proxy.rs +++ b/crates/core/src/proxy.rs @@ -897,6 +897,9 @@ where bucket_config.backend_type ))); } + // The body is buffered whole; bound it like other uploads. The + // key count is additionally capped when the body is parsed. + self.check_upload_size(original_headers)?; Ok(HandlerAction::NeedsBody(PendingRequest { operation: operation.clone(), bucket_config: bucket_config.clone(), From 5a7f7dc61849783129dd4ab91b4fe4f6e57b0222 Mon Sep 17 00:00:00 2001 From: Anthony Lukach Date: Tue, 23 Jun 2026 18:23:15 -0700 Subject: [PATCH 12/21] fix(s3): add bucket namespace to DeleteResult XML S3's DeleteObjects response carries `xmlns="http://s3.amazonaws.com/doc/2006-03-01/"` on the `` root (as the proxy already does for ListBucketResult). Strict XML/XPath-based S3 clients can mis-parse a namespace-less root. Add the `@xmlns` attribute and assert it in the build test. Co-Authored-By: Claude Opus 4.8 --- crates/core/src/api/delete.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/crates/core/src/api/delete.rs b/crates/core/src/api/delete.rs index e27aabf..9ed5143 100644 --- a/crates/core/src/api/delete.rs +++ b/crates/core/src/api/delete.rs @@ -153,6 +153,8 @@ pub fn build_delete_result(deleted: &[String], errors: &[DeleteError], quiet: bo #[derive(Serialize)] #[serde(rename = "DeleteResult")] struct Result<'a> { + #[serde(rename = "@xmlns")] + xmlns: &'static str, #[serde(rename = "Deleted")] deleted: Vec>, #[serde(rename = "Error")] @@ -168,7 +170,11 @@ pub fn build_delete_result(deleted: &[String], errors: &[DeleteError], quiet: bo } else { deleted.iter().map(|k| Deleted { key: k }).collect() }; - let result = Result { deleted, errors }; + let result = Result { + xmlns: "http://s3.amazonaws.com/doc/2006-03-01/", + deleted, + errors, + }; format!( "\n{}", quick_xml::se::to_string(&result).unwrap_or_default() @@ -268,6 +274,10 @@ mod tests { message: "denied".into(), }]; let verbose = build_delete_result(&deleted, &errors, false); + // S3's DeleteResult carries the bucket namespace on the root element. + assert!( + verbose.contains("") + ); assert!(verbose.contains("a.txt")); assert!(verbose.contains("secret/x")); assert!(verbose.contains("AccessDenied")); From 72c5a1044d8586f011e6d905a4ee12a4dcee882e Mon Sep 17 00:00:00 2001 From: Anthony Lukach Date: Tue, 23 Jun 2026 18:24:53 -0700 Subject: [PATCH 13/21] docs: fix drift surfaced by review - errors.md: add the MalformedXml, NotImplemented, and EntityTooLarge rows. - request-lifecycle.md + architecture/index.md: NeedsBody and the raw-signed (non-presigned) path now cover batch delete, not just multipart; broaden the S3-only warning and the presigned-URL claim accordingly. - deployment/cloudflare-workers.md: document the MAX_UPLOAD_BYTES env var. - CONTRIBUTING.md: list `cargo test` and `make test-integration`. Co-Authored-By: Claude Opus 4.8 --- CONTRIBUTING.md | 6 ++++++ docs/architecture/index.md | 2 +- docs/architecture/request-lifecycle.md | 14 +++++++------- docs/deployment/cloudflare-workers.md | 1 + docs/reference/errors.md | 3 +++ 5 files changed, 18 insertions(+), 8 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 35805f6..fe971b9 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -16,6 +16,12 @@ cargo check # Check (WASM) cargo check -p multistore-cf-workers --target wasm32-unknown-unknown + +# Unit + doc tests +cargo test + +# Integration tests (MinIO via docker compose behind wrangler dev; mirrors CI) +make test-integration ``` ## Release Process diff --git a/docs/architecture/index.md b/docs/architecture/index.md index c1db90d..a18d61f 100644 --- a/docs/architecture/index.md +++ b/docs/architecture/index.md @@ -36,7 +36,7 @@ flowchart LR **Two-phase dispatch** — The `ProxyGateway` separates request resolution from execution. `resolve_request()` determines what to do; the runtime executes it. This keeps streaming logic in runtime-specific code where it belongs. -**Presigned URLs for streaming** — GET, HEAD, PUT, and DELETE operations use presigned URLs. The runtime forwards the request directly to the backend — no buffering, no double-handling of bodies. +**Presigned URLs for streaming** — GET, HEAD, PUT, and single-object DELETE use presigned URLs. The runtime forwards the request directly to the backend — no buffering, no double-handling of bodies. Body-bearing operations that can't be presigned (multipart and batch delete) instead use raw SigV4-signed HTTP, buffering the (small) body. **Pluggable traits** — Four trait boundaries enable customization: - `Router` / `RouteHandler` — Path-based pre-dispatch request interception (STS, OIDC discovery, custom endpoints) diff --git a/docs/architecture/request-lifecycle.md b/docs/architecture/request-lifecycle.md index a9efe38..2a99aa2 100644 --- a/docs/architecture/request-lifecycle.md +++ b/docs/architecture/request-lifecycle.md @@ -49,9 +49,9 @@ sequenceDiagram Note over Middleware: after_dispatch(CompletedRequest) Gateway-->>Runtime: GatewayResponse::Response Runtime-->>Client: Return response body - else NeedsBody (multipart) + else NeedsBody (multipart, batch delete) Gateway->>Gateway: collect_body(body) → bytes - Gateway->>Backend: Signed multipart request + Gateway->>Backend: Signed multipart / batch-delete request Backend-->>Gateway: Response Note over Middleware: after_dispatch(CompletedRequest) Gateway-->>Runtime: GatewayResponse::Response @@ -118,7 +118,7 @@ The `ProxyGateway` owns S3 request parsing, identity resolution, and bucket auth - Virtual-hosted: `GET /key` with `Host: bucket.s3.example.com` → same operation 2. **Resolve identity** via the `CredentialRegistry` — verifies SigV4 signatures against stored or sealed credentials 3. **Resolve bucket** via the `BucketRegistry` — looks up the bucket config and authorizes the caller -4. **Dispatch** the operation based on type (forward, list, or multipart) +4. **Dispatch** the operation based on type (forward, list, or body-bearing — multipart and batch delete) Custom `BucketRegistry` implementations can provide entirely different authorization logic, namespace mapping, or dynamic bucket configuration. @@ -145,18 +145,18 @@ LIST supports backend-side pagination. V2 uses `continuation-token` and `start-a ### `NeedsBody(PendingRequest)` (internal) -Used for: **CreateMultipartUpload, UploadPart, CompleteMultipartUpload, AbortMultipartUpload** +Used for: **CreateMultipartUpload, UploadPart, CompleteMultipartUpload, AbortMultipartUpload, and DeleteObjects (batch delete)** -Multipart operations need the request body (e.g., the XML body for `CompleteMultipartUpload`). When using `handle_request`, this is resolved internally — the gateway calls the `collect_body` closure provided by the runtime and returns the result as `GatewayResponse::Response`. Runtimes never see this variant. +These operations need the request body — the XML body for `CompleteMultipartUpload`, or the key list for `DeleteObjects` (each key is authorized individually once the body arrives). When using `handle_request`, this is resolved internally — the gateway calls the `collect_body` closure provided by the runtime and returns the result as `GatewayResponse::Response`. Runtimes never see this variant. For lower-level control, `ProxyGateway::handle` returns the raw three-variant `HandlerAction`, and runtimes call `handle_with_body()` themselves. > [!WARNING] -> Multipart uploads are only supported for `backend_type = "s3"`. Non-S3 backends should use single PUT requests (object_store handles chunking internally). +> Multipart uploads and batch delete are only supported for `backend_type = "s3"`. Non-S3 backends should use single PUT/DELETE requests (object_store handles chunking internally). ## Outbound User-Agent -All outbound requests to backend object stores include a `User-Agent` header identifying multistore as the caller. This applies to both presigned URL forwards (GET, HEAD, PUT, DELETE) and raw signed requests (multipart operations). +All outbound requests to backend object stores include a `User-Agent` header identifying multistore as the caller. This applies to both presigned URL forwards (GET, HEAD, PUT, single-object DELETE) and raw signed requests (multipart operations and batch delete). The default value is `multistore/{version}`, where `{version}` is derived from the crate version (`CARGO_PKG_VERSION`) — e.g. `multistore/0.4.0`. Override it via the gateway builder to include your application name: diff --git a/docs/deployment/cloudflare-workers.md b/docs/deployment/cloudflare-workers.md index 646a806..24102b6 100644 --- a/docs/deployment/cloudflare-workers.md +++ b/docs/deployment/cloudflare-workers.md @@ -101,6 +101,7 @@ HTTP 500 in production (`tests/smoke/test_federation.py` fails). |----------|----------|-------------| | `PROXY_CONFIG` | Yes | JSON config (buckets, roles, credentials) | | `VIRTUAL_HOST_DOMAIN` | No | Domain for virtual-hosted requests | +| `MAX_UPLOAD_BYTES` | No | Reject uploads whose `Content-Length` exceeds this with `EntityTooLarge` (400) instead of Cloudflare's edge `413`. Set to the plan's request-body limit, e.g. `104857600` (100 MB). See [Supported Operations](../reference/operations.md). | | `SESSION_TOKEN_KEY` | For STS | Base64-encoded 32-byte AES-256-GCM key | | `OIDC_PROVIDER_KEY` | For OIDC backend auth | PEM-encoded RSA private key | | `OIDC_PROVIDER_ISSUER` | For OIDC backend auth | Public URL for JWKS discovery | diff --git a/docs/reference/errors.md b/docs/reference/errors.md index 79bc703..927c63b 100644 --- a/docs/reference/errors.md +++ b/docs/reference/errors.md @@ -23,6 +23,9 @@ The proxy returns S3-compatible error responses in XML format: | InvalidOidcToken | 400 | `InvalidIdentityToken` | JWT validation failed (bad signature, untrusted issuer, etc.) | | RoleNotFound | 403 | `AccessDenied` | Requested role doesn't exist in config | | InvalidRequest | 400 | `InvalidRequest` | Malformed S3 request | +| MalformedXml | 400 | `MalformedXML` | Request body XML is malformed, empty, or exceeds limits (e.g. a batch delete naming no objects or more than 1000 keys) | +| NotImplemented | 501 | `NotImplemented` | Recognized but unsupported operation (e.g. server-side copy via `x-amz-copy-source`) | +| EntityTooLarge | 400 | `EntityTooLarge` | Upload `Content-Length` exceeds the configured maximum body size | | BackendError | 503 | `ServiceUnavailable` | Backend object store is unreachable or returned an error | | PreconditionFailed | 412 | `PreconditionFailed` | Conditional request failed (If-Match, etc.) | | NotModified | 304 | `NotModified` | Conditional request — content not changed | From ca5a8ac1a12633ea679984e018867bca5e9ea5b4 Mon Sep 17 00:00:00 2001 From: Anthony Lukach Date: Tue, 23 Jun 2026 18:26:18 -0700 Subject: [PATCH 14/21] refactor: cleanups from over-engineering review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit No behavior change except the batch-delete fallback (now stricter): - Rename the `Result`-named local serde structs in `delete.rs` to `DeleteResultXml` so they no longer shadow `std::result::Result` in functions that return `Result`. - `execute_delete_objects`: on a 2xx-but-unparseable backend `DeleteResult`, return `BackendError` instead of fabricating success for the forwarded keys. This also removes the now-dead `allowed_client` Vec (one fewer per-key clone). - `key_matches_prefix`: replace the per-call `format!("{prefix}/")` with a byte boundary check — it runs per key × scope × prefix on batch authorization. Co-Authored-By: Claude Opus 4.8 --- crates/core/src/api/delete.rs | 8 ++++---- crates/core/src/auth/authorize.rs | 8 ++++++-- crates/core/src/proxy.rs | 13 +++++++------ 3 files changed, 17 insertions(+), 12 deletions(-) diff --git a/crates/core/src/api/delete.rs b/crates/core/src/api/delete.rs index 9ed5143..93e4ce3 100644 --- a/crates/core/src/api/delete.rs +++ b/crates/core/src/api/delete.rs @@ -126,7 +126,7 @@ pub struct BackendOutcome { pub fn parse_backend_result(xml: &[u8]) -> Result { #[derive(Deserialize)] #[serde(rename = "DeleteResult")] - struct Result { + struct DeleteResultXml { #[serde(default, rename = "Deleted")] deleted: Vec, #[serde(default, rename = "Error")] @@ -137,7 +137,7 @@ pub fn parse_backend_result(xml: &[u8]) -> Result { #[serde(rename = "Key")] key: String, } - let parsed: Result = quick_xml::de::from_reader(xml) + let parsed: DeleteResultXml = quick_xml::de::from_reader(xml) .map_err(|e| ProxyError::BackendError(format!("malformed delete result: {e}")))?; Ok(BackendOutcome { deleted: parsed.deleted.into_iter().map(|d| d.key).collect(), @@ -152,7 +152,7 @@ pub fn parse_backend_result(xml: &[u8]) -> Result { pub fn build_delete_result(deleted: &[String], errors: &[DeleteError], quiet: bool) -> String { #[derive(Serialize)] #[serde(rename = "DeleteResult")] - struct Result<'a> { + struct DeleteResultXml<'a> { #[serde(rename = "@xmlns")] xmlns: &'static str, #[serde(rename = "Deleted")] @@ -170,7 +170,7 @@ pub fn build_delete_result(deleted: &[String], errors: &[DeleteError], quiet: bo } else { deleted.iter().map(|k| Deleted { key: k }).collect() }; - let result = Result { + let result = DeleteResultXml { xmlns: "http://s3.amazonaws.com/doc/2006-03-01/", deleted, errors, diff --git a/crates/core/src/auth/authorize.rs b/crates/core/src/auth/authorize.rs index 99e3ec2..97021e0 100644 --- a/crates/core/src/auth/authorize.rs +++ b/crates/core/src/auth/authorize.rs @@ -13,8 +13,12 @@ fn key_matches_prefix(key: &str, prefix: &str) -> bool { if prefix.ends_with('/') || prefix.is_empty() { return key.starts_with(prefix); } - // Prefix does not end with '/' — require an exact match or a '/' boundary - key == prefix || key.starts_with(&format!("{}/", prefix)) + // Prefix does not end with '/' — require an exact match or a '/' boundary. + // Done without allocating: this runs per key × scope × prefix on batch ops. + key == prefix + || (key.len() > prefix.len() + && key.starts_with(prefix) + && key.as_bytes()[prefix.len()] == b'/') } /// Check if a resolved identity is authorized to perform an operation. diff --git a/crates/core/src/proxy.rs b/crates/core/src/proxy.rs index dc5bd54..9f851c4 100644 --- a/crates/core/src/proxy.rs +++ b/crates/core/src/proxy.rs @@ -1144,12 +1144,10 @@ where let quiet = request.quiet; // Partition keys by per-key authorization. - let mut allowed_client: Vec = Vec::new(); let mut allowed_backend: Vec = Vec::new(); let mut errors: Vec = Vec::new(); for key in request.keys() { if crate::auth::key_authorized(&pending.identity, bucket, Action::DeleteObject, key) { - allowed_client.push(key.to_string()); allowed_backend.push(apply_backend_prefix(config, key)); } else { errors.push(delete::DeleteError { @@ -1211,10 +1209,13 @@ where } } Err(e) => { - // The backend returned 2xx but an unparseable body. Treat the - // forwarded keys as deleted rather than failing the request. - tracing::warn!(error = %e, "could not parse backend delete result; assuming success"); - deleted_client.extend(allowed_client.iter().cloned()); + // A 2xx with an unparseable DeleteResult is a backend + // contract violation. Surface it rather than fabricating + // success for keys whose actual fate is unknown. + tracing::error!(error = %e, "backend returned an unparseable delete result"); + return Err(ProxyError::BackendError( + "backend returned an unparseable delete result".into(), + )); } } } From d94c9079dceaab94ec49b3f7c209c4f2932e1836 Mon Sep 17 00:00:00 2001 From: Anthony Lukach Date: Tue, 23 Jun 2026 18:31:48 -0700 Subject: [PATCH 15/21] test(integration): batch-delete per-key authz + copy rejection End-to-end coverage for the two behaviors that were only unit-tested: - Add a prefix-restricted static credential (`AKTEST000000000002`, scoped to the `allowed/` prefix on private-uploads). - test_batch_delete_partial_authorization: a batch delete of one in-scope and one out-of-scope key deletes the allowed one, reports the other as AccessDenied, and verifies the out-of-scope key is NOT deleted (the security property) while the in-scope one is gone. - test_copy_object_rejected: copy_object (x-amz-copy-source) returns 501 NotImplemented and does not create the destination. Co-Authored-By: Claude Opus 4.8 --- examples/cf-workers/wrangler.integration.toml | 13 +++++ tests/integration/test_integration.py | 54 +++++++++++++++++++ 2 files changed, 67 insertions(+) diff --git a/examples/cf-workers/wrangler.integration.toml b/examples/cf-workers/wrangler.integration.toml index 4abfed2..b17d2d0 100644 --- a/examples/cf-workers/wrangler.integration.toml +++ b/examples/cf-workers/wrangler.integration.toml @@ -68,6 +68,19 @@ actions = ["get_object", "head_object", "put_object", "delete_object", "list_buc bucket = "private-uploads" prefixes = [] +# Prefix-restricted credential — used to test per-key batch-delete authorization. +[[vars.PROXY_CONFIG.credentials]] +access_key_id = "AKTEST000000000002" +created_at = "2024-01-01T00:00:00Z" +enabled = true +principal_name = "integration-prefix-restricted-user" +secret_access_key = "testSecretKey00000000000000000002" + +[[vars.PROXY_CONFIG.credentials.allowed_scopes]] +actions = ["get_object", "head_object", "put_object", "delete_object", "list_bucket"] +bucket = "private-uploads" +prefixes = ["allowed/"] + # --- Roles --- [[vars.PROXY_CONFIG.roles]] diff --git a/tests/integration/test_integration.py b/tests/integration/test_integration.py index 5964d47..d24b06c 100644 --- a/tests/integration/test_integration.py +++ b/tests/integration/test_integration.py @@ -248,6 +248,60 @@ def test_oversized_put_rejected_entity_too_large(self): assert err["Code"] == "EntityTooLarge", err assert exc_info.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400 + def test_batch_delete_partial_authorization(self): + """Per-key authz: a batch delete with one in-scope and one out-of-scope + key deletes the allowed one, reports the other as AccessDenied, and does + NOT delete the out-of-scope key.""" + full = static_client() # full access to private-uploads + restricted = static_client( # scoped to the "allowed/" prefix only + access_key="AKTEST000000000002", + secret_key="testSecretKey00000000000000000002", + ) + suffix = uuid.uuid4() + allowed_key = f"allowed/{suffix}.txt" + denied_key = f"denied/{suffix}.txt" + full.put_object(Bucket="private-uploads", Key=allowed_key, Body=b"a") + full.put_object(Bucket="private-uploads", Key=denied_key, Body=b"b") + + resp = restricted.delete_objects( + Bucket="private-uploads", + Delete={"Objects": [{"Key": allowed_key}, {"Key": denied_key}]}, + ) + deleted = {d["Key"] for d in resp.get("Deleted", [])} + errors = {e["Key"]: e["Code"] for e in resp.get("Errors", [])} + assert deleted == {allowed_key}, resp + assert errors == {denied_key: "AccessDenied"}, resp + + # Security property: the out-of-scope key must still exist; the in-scope + # key must be gone. + full.head_object(Bucket="private-uploads", Key=denied_key) # raises if deleted + with pytest.raises(ClientError) as exc: + full.head_object(Bucket="private-uploads", Key=allowed_key) + assert exc.value.response["ResponseMetadata"]["HTTPStatusCode"] == 404 + + full.delete_object(Bucket="private-uploads", Key=denied_key) # cleanup + + def test_copy_object_rejected(self): + """Server-side copy (x-amz-copy-source) is rejected with 501 + NotImplemented and does not create the destination.""" + client = static_client() + src = f"copy-src-{uuid.uuid4()}.txt" + dst = f"copy-dst-{uuid.uuid4()}.txt" + client.put_object(Bucket="private-uploads", Key=src, Body=b"source") + with pytest.raises(ClientError) as exc: + client.copy_object( + Bucket="private-uploads", + Key=dst, + CopySource={"Bucket": "private-uploads", "Key": src}, + ) + assert exc.value.response["Error"]["Code"] == "NotImplemented", exc.value.response + assert exc.value.response["ResponseMetadata"]["HTTPStatusCode"] == 501 + # The destination must not have been created. + with pytest.raises(ClientError): + client.head_object(Bucket="private-uploads", Key=dst) + + client.delete_object(Bucket="private-uploads", Key=src) # cleanup + # --------------------------------------------------------------------------- # Multipart uploads From 762393ea4b608263bdd49010ec3f6fef66e91112 Mon Sep 17 00:00:00 2001 From: Anthony Lukach Date: Tue, 23 Jun 2026 18:41:16 -0700 Subject: [PATCH 16/21] chore: auto-stop MinIO in make test-integration when we started it MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `make test-integration` started MinIO but left it running, so a one-off run leaked containers. Now it tears MinIO down on exit — but only if the script started it. If MinIO is already up (a dev iterating with `docker compose up -d`), it's left warm so repeated runs stay fast and we never stop a stack we didn't start. wrangler dev is always stopped on exit. Co-Authored-By: Claude Opus 4.8 --- scripts/integration-test.sh | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/scripts/integration-test.sh b/scripts/integration-test.sh index 859ead6..f68edf1 100755 --- a/scripts/integration-test.sh +++ b/scripts/integration-test.sh @@ -7,6 +7,11 @@ # ./scripts/integration-test.sh [extra pytest args] # make test-integration # +# MinIO lifecycle: if MinIO isn't running, this starts it and stops it again on +# exit (clean one-off run). If it's already up — e.g. you ran `docker compose +# up -d` to iterate — it's left running so repeated runs stay fast. wrangler dev +# is always stopped on exit. +# # Prerequisites: docker, node/npx, uv (uvx), and the wasm32 Rust target # (`rustup target add wasm32-unknown-unknown`). worker-build is installed # automatically if missing. @@ -18,12 +23,26 @@ PORT="${PROXY_PORT:-8787}" PROXY_URL="http://localhost:${PORT}" WORKER_DIR="examples/cf-workers" +# Tear down MinIO on exit only if we started it. If it was already running +# (e.g. a dev who ran `docker compose up -d` to iterate), leave it warm so +# repeated runs stay fast and we don't stop a stack we didn't start. +STARTED_MINIO=false cleanup() { # Kill wrangler (and the workerd children it spawns). pkill -f "wrangler dev --config wrangler.integration.toml" 2>/dev/null || true + if [ "$STARTED_MINIO" = true ]; then + echo "==> Stopping MinIO (started by this run)" + docker compose down >/dev/null 2>&1 || true + fi } trap cleanup EXIT +if curl -sf -o /dev/null http://localhost:9000/minio/health/live 2>/dev/null; then + echo "==> MinIO already running — leaving it up after the run" +else + STARTED_MINIO=true +fi + echo "==> Starting MinIO + seeding buckets (docker compose)" # Not `--wait`: the one-shot seeder (minio-init) exits, which `--wait` treats as # a failure. Instead poll for a seeded public object — that confirms MinIO is up From 766eda007f9ede28e12176e37a8e4fc16b0069df Mon Sep 17 00:00:00 2001 From: Anthony Lukach Date: Tue, 23 Jun 2026 18:53:25 -0700 Subject: [PATCH 17/21] refactor: address round-2 review polish MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit No behavior change (covered by existing unit + integration tests): - Unify the S3-backend gate: rename `supports_s3_multipart` → `is_s3_backend` and use it in both the multipart and batch-delete dispatch arms (the latter hand-rolled the same `parsed_backend_type() == S3` check). - `canonicalize_query_string`: use `Cow` so the common `key=value` params aren't copied on the per-request signing/verification path (only value-less flags allocate). - `strip_backend_prefix`: strip `{prefix}/` via `strip_prefix` chaining instead of a per-key `format!` allocation. - Docs: the `multipart`/`request_signer` module docs and the cf-workers architecture comment now note batch delete also uses the raw-signed path; `with_max_request_body_size` docs note it covers DeleteObjects too. Co-Authored-By: Claude Opus 4.8 --- crates/core/src/auth/sigv4.rs | 9 ++++-- crates/core/src/backend/multipart.rs | 11 ++++--- crates/core/src/backend/request_signer.rs | 8 ++++-- crates/core/src/proxy.rs | 35 ++++++++++++----------- crates/core/src/types.rs | 6 ++-- docs/reference/operations.md | 2 +- examples/cf-workers/src/lib.rs | 2 +- 7 files changed, 43 insertions(+), 30 deletions(-) diff --git a/crates/core/src/auth/sigv4.rs b/crates/core/src/auth/sigv4.rs index a670225..2a97de2 100644 --- a/crates/core/src/auth/sigv4.rs +++ b/crates/core/src/auth/sigv4.rs @@ -166,14 +166,17 @@ pub(crate) fn canonicalize_query_string(query: &str) -> String { if query.is_empty() { return String::new(); } - let mut parts: Vec = query + // Borrow params that are already `key=value`; only the value-less flags + // (`?delete`, `?uploads`) need an owned `key=`. This keeps the common path + // allocation-free on the per-request signing/verification hot path. + let mut parts: Vec> = query .split('&') .filter(|p| !p.is_empty()) .map(|p| { if p.contains('=') { - p.to_string() + std::borrow::Cow::Borrowed(p) } else { - format!("{p}=") + std::borrow::Cow::Owned(format!("{p}=")) } }) .collect(); diff --git a/crates/core/src/backend/multipart.rs b/crates/core/src/backend/multipart.rs index 1ddd869..f57be9f 100644 --- a/crates/core/src/backend/multipart.rs +++ b/crates/core/src/backend/multipart.rs @@ -1,8 +1,11 @@ -//! Multipart URL building and request signing for S3-compatible backends. +//! Backend URL building and request signing for raw signed (non-presigned) +//! S3 operations. //! -//! These helpers are used by [`Gateway::execute_multipart`](crate::proxy::Gateway) -//! for CreateMultipartUpload, UploadPart, CompleteMultipartUpload, and -//! AbortMultipartUpload operations. +//! These helpers build the backend URL and sign the request for the operations +//! that go through [`ProxyBackend::send_raw`](crate::backend::ProxyBackend::send_raw): +//! the multipart operations (CreateMultipartUpload, UploadPart, +//! CompleteMultipartUpload, AbortMultipartUpload) and batch delete +//! (DeleteObjects). use crate::backend::request_signer::S3RequestSigner; use crate::error::ProxyError; diff --git a/crates/core/src/backend/request_signer.rs b/crates/core/src/backend/request_signer.rs index bf693d3..c53d611 100644 --- a/crates/core/src/backend/request_signer.rs +++ b/crates/core/src/backend/request_signer.rs @@ -1,9 +1,11 @@ //! Outbound SigV4 request signing. //! //! [`S3RequestSigner`] signs raw HTTP requests destined for S3-compatible -//! backends using AWS Signature Version 4. Used for multipart operations -//! (CreateMultipartUpload, UploadPart, CompleteMultipartUpload, -//! AbortMultipartUpload) that go through [`backend::ProxyBackend::send_raw`](crate::backend::ProxyBackend::send_raw). +//! backends using AWS Signature Version 4. Used for the operations that go +//! through [`backend::ProxyBackend::send_raw`](crate::backend::ProxyBackend::send_raw) +//! rather than presigned URLs: multipart operations (CreateMultipartUpload, +//! UploadPart, CompleteMultipartUpload, AbortMultipartUpload) and batch delete +//! (DeleteObjects). use crate::auth::sigv4::{canonicalize_query_string, hmac_sha256}; use crate::error::ProxyError; diff --git a/crates/core/src/proxy.rs b/crates/core/src/proxy.rs index 9f851c4..5c1dfb7 100644 --- a/crates/core/src/proxy.rs +++ b/crates/core/src/proxy.rs @@ -65,7 +65,7 @@ use crate::middleware::{ use crate::registry::{BucketRegistry, CredentialRegistry}; use crate::route_handler::{ProxyResponseBody, RequestInfo}; use crate::router::Router; -use crate::types::{Action, BackendType, BucketConfig, ResolvedIdentity, S3Operation}; +use crate::types::{Action, BucketConfig, ResolvedIdentity, S3Operation}; use bytes::Bytes; use http::{HeaderMap, Method}; use object_store::list::PaginatedListOptions; @@ -144,11 +144,12 @@ pub struct ProxyGateway { /// When true, responses include a `Server-Timing` header with gateway /// processing metrics. Enabled by default. server_timing: bool, - /// Maximum accepted upload body size in bytes, if set. When a `PutObject` - /// or `UploadPart` declares a `Content-Length` larger than this, the proxy - /// rejects it with `EntityTooLarge` instead of forwarding it. Useful for - /// surfacing a clean S3 error ahead of a runtime body-size limit (e.g. - /// Cloudflare Workers' edge `413`). `None` means no proxy-enforced limit. + /// Maximum accepted upload body size in bytes, if set. When a body-bearing + /// write (`PutObject`, `UploadPart`, or `DeleteObjects`) declares a + /// `Content-Length` larger than this, the proxy rejects it with + /// `EntityTooLarge` instead of forwarding it. Useful for surfacing a clean + /// S3 error ahead of a runtime body-size limit (e.g. Cloudflare Workers' + /// edge `413`). `None` means no proxy-enforced limit. max_request_body_size: Option, } @@ -255,9 +256,10 @@ where /// Set the maximum accepted upload body size, in bytes. /// - /// When set, a `PutObject` or `UploadPart` whose `Content-Length` exceeds - /// this is rejected up front with S3's `EntityTooLarge` (HTTP 400) rather - /// than forwarded. Use this on runtimes with a hard request-body limit — + /// When set, a body-bearing write (`PutObject`, `UploadPart`, or + /// `DeleteObjects`) whose `Content-Length` exceeds this is rejected up front + /// with S3's `EntityTooLarge` (HTTP 400) rather than forwarded. Use this on + /// runtimes with a hard request-body limit — /// e.g. Cloudflare Workers, where the edge otherwise rejects oversized /// bodies with an opaque `413` — to give clients an actionable S3 error. /// @@ -870,7 +872,7 @@ where | S3Operation::UploadPart { .. } | S3Operation::CompleteMultipartUpload { .. } | S3Operation::AbortMultipartUpload { .. } => { - if !bucket_config.supports_s3_multipart() { + if !bucket_config.is_s3_backend() { return Err(ProxyError::InvalidRequest(format!( "multipart operations not supported for '{}' backends", bucket_config.backend_type @@ -891,7 +893,7 @@ where // Batch delete needs the body to read the key list and authorize // each key individually. S3Operation::DeleteObjects { .. } => { - if bucket_config.parsed_backend_type() != Some(BackendType::S3) { + if !bucket_config.is_s3_backend() { return Err(ProxyError::NotImplemented(format!( "batch delete not supported for '{}' backends", bucket_config.backend_type @@ -1296,12 +1298,13 @@ fn strip_backend_prefix(config: &BucketConfig, key: &str) -> String { Some(prefix) => { let p = prefix.trim_end_matches('/'); if p.is_empty() { - key.to_string() - } else { - key.strip_prefix(&format!("{p}/")) - .unwrap_or(key) - .to_string() + return key.to_string(); } + // Strip `{p}/` without allocating a pattern string (runs per key). + key.strip_prefix(p) + .and_then(|rest| rest.strip_prefix('/')) + .unwrap_or(key) + .to_string() } None => key.to_string(), } diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index b70f307..fded064 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -99,8 +99,10 @@ impl BucketConfig { } } - /// Whether this backend supports S3-style multipart uploads via raw HTTP. - pub fn supports_s3_multipart(&self) -> bool { + /// Whether this is an S3 backend. Operations that go through raw signed + /// HTTP rather than presigned URLs — multipart uploads and batch delete — + /// are gated on this. + pub fn is_s3_backend(&self) -> bool { matches!(self.parsed_backend_type(), Some(BackendType::S3)) } diff --git a/docs/reference/operations.md b/docs/reference/operations.md index a36aaff..6c56e51 100644 --- a/docs/reference/operations.md +++ b/docs/reference/operations.md @@ -67,4 +67,4 @@ Consequences and guidance: - Until [streaming `UploadPart`](https://github.com/developmentseed/multistore/issues/89) lands, parts on Workers are additionally capped by the 128 MB worker memory limit (parts are buffered in WASM). Keep `multipart_chunksize` comfortably below 100 MB. - The native server and Lambda runtimes have their own, generally higher, limits — this constraint is specific to Workers. -**Surfacing a clean error.** Configure the gateway with [`with_max_request_body_size`](https://docs.rs/multistore/latest/multistore/proxy/struct.ProxyGateway.html) so a `PutObject`/`UploadPart` whose `Content-Length` exceeds the limit is rejected up front with S3's `EntityTooLarge` (HTTP 400) — an actionable error instead of Cloudflare's opaque `413`. The Workers example reads this from the `MAX_UPLOAD_BYTES` environment variable; set it to your plan's request-body limit (e.g. `104857600` for 100 MB). The check requires a declared `Content-Length`; unknown-length streaming requests fall through to the platform limit. +**Surfacing a clean error.** Configure the gateway with [`with_max_request_body_size`](https://docs.rs/multistore/latest/multistore/proxy/struct.ProxyGateway.html) so a body-bearing write (`PutObject`, `UploadPart`, or `DeleteObjects`) whose `Content-Length` exceeds the limit is rejected up front with S3's `EntityTooLarge` (HTTP 400) — an actionable error instead of Cloudflare's opaque `413`. The Workers example reads this from the `MAX_UPLOAD_BYTES` environment variable; set it to your plan's request-body limit (e.g. `104857600` for 100 MB). The check requires a declared `Content-Length`; unknown-length streaming requests fall through to the platform limit. diff --git a/examples/cf-workers/src/lib.rs b/examples/cf-workers/src/lib.rs index 3a4c28c..6514450 100644 --- a/examples/cf-workers/src/lib.rs +++ b/examples/cf-workers/src/lib.rs @@ -10,7 +10,7 @@ //! -> resolve request (ProxyGateway with static config registries) //! -> Forward: web_sys::fetch with ReadableStream passthrough (zero-copy) //! -> Response: LIST XML via object_store, errors, synthetic responses -//! -> NeedsBody: multipart operations via raw signed HTTP +//! -> NeedsBody: multipart operations and batch delete via raw signed HTTP //! ``` //! //! # Configuration From 9a714f426d471d1bf36f3154489a986999d65f54 Mon Sep 17 00:00:00 2001 From: Anthony Lukach Date: Tue, 23 Jun 2026 19:04:19 -0700 Subject: [PATCH 18/21] refactor: narrow api::delete surface to pub(crate) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The batch-delete wire-format helpers were all `pub`, but nothing outside the core crate uses them — they're internal plumbing an integrator would never call (integrators build custom resolvers/backends, not S3 XML). Narrow them to `pub(crate)`, matching the sibling `api::list` module, which scopes its equivalent `build_list_xml`/`build_list_prefix` helpers `pub(crate)`. No behavior change. Co-Authored-By: Claude Opus 4.8 --- crates/core/src/api/delete.rs | 40 +++++++++++++++++++---------------- 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/crates/core/src/api/delete.rs b/crates/core/src/api/delete.rs index 93e4ce3..88dcfe7 100644 --- a/crates/core/src/api/delete.rs +++ b/crates/core/src/api/delete.rs @@ -16,33 +16,33 @@ use serde::{Deserialize, Serialize}; /// Parsed inbound `` request body. #[derive(Debug, Deserialize)] #[serde(rename = "Delete")] -pub struct DeleteRequest { +pub(crate) struct DeleteRequest { /// When true, successful deletions are omitted from the response — only /// errors are reported. #[serde(default, rename = "Quiet")] - pub quiet: bool, + pub(crate) quiet: bool, /// The objects to delete. #[serde(default, rename = "Object")] - pub objects: Vec, + pub(crate) objects: Vec, } /// A single `` entry in a batch-delete request. #[derive(Debug, Deserialize)] -pub struct DeleteObjectEntry { +pub(crate) struct DeleteObjectEntry { /// The (client-facing) object key to delete. #[serde(rename = "Key")] - pub key: String, + pub(crate) key: String, } impl DeleteRequest { /// The maximum number of objects S3 accepts in a single batch delete. - pub const MAX_KEYS: usize = 1000; + pub(crate) const MAX_KEYS: usize = 1000; /// Parse a batch-delete request body. /// /// Mirrors S3's `MalformedXML` rejections: the body must be well-formed XML, /// name at least one object, and name no more than [`MAX_KEYS`](Self::MAX_KEYS). - pub fn parse(body: &[u8]) -> Result { + pub(crate) fn parse(body: &[u8]) -> Result { let req: DeleteRequest = quick_xml::de::from_reader(body) .map_err(|e| ProxyError::MalformedXml(format!("malformed delete body: {e}")))?; if req.objects.is_empty() { @@ -61,7 +61,7 @@ impl DeleteRequest { } /// The client-facing keys named in the request, in order. - pub fn keys(&self) -> impl Iterator { + pub(crate) fn keys(&self) -> impl Iterator { self.objects.iter().map(|o| o.key.as_str()) } } @@ -72,7 +72,7 @@ impl DeleteRequest { /// (i.e. with any `backend_prefix` applied). `Quiet` is always `false` so the /// backend reports each deletion explicitly, letting the proxy map results back /// to client keys before applying the client's own quiet preference. -pub fn build_backend_delete_body(backend_keys: &[String]) -> String { +pub(crate) fn build_backend_delete_body(backend_keys: &[String]) -> String { #[derive(Serialize)] #[serde(rename = "Delete")] struct Body<'a> { @@ -98,32 +98,32 @@ pub fn build_backend_delete_body(backend_keys: &[String]) -> String { /// A per-key error in a ``. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct DeleteError { +pub(crate) struct DeleteError { /// The object key the error applies to. #[serde(rename = "Key")] - pub key: String, + pub(crate) key: String, /// S3 error code (e.g. `AccessDenied`). #[serde(rename = "Code")] - pub code: String, + pub(crate) code: String, /// Human-readable message. #[serde(rename = "Message")] - pub message: String, + pub(crate) message: String, } /// The backend's keys that were deleted and any per-key errors it reported. #[derive(Debug)] -pub struct BackendOutcome { +pub(crate) struct BackendOutcome { /// Backend keys reported as deleted. - pub deleted: Vec, + pub(crate) deleted: Vec, /// Per-key errors reported by the backend (backend key space). - pub errors: Vec, + pub(crate) errors: Vec, } /// Parse a backend `` response. /// /// Tolerates the extra elements S3 includes (`VersionId`, `DeleteMarker`, …) and /// returns only what the proxy needs to rebuild the client response. -pub fn parse_backend_result(xml: &[u8]) -> Result { +pub(crate) fn parse_backend_result(xml: &[u8]) -> Result { #[derive(Deserialize)] #[serde(rename = "DeleteResult")] struct DeleteResultXml { @@ -149,7 +149,11 @@ pub fn parse_backend_result(xml: &[u8]) -> Result { /// /// `deleted` and `errors` are in client key space. In `quiet` mode the /// `` entries are omitted (S3 semantics); errors are always reported. -pub fn build_delete_result(deleted: &[String], errors: &[DeleteError], quiet: bool) -> String { +pub(crate) fn build_delete_result( + deleted: &[String], + errors: &[DeleteError], + quiet: bool, +) -> String { #[derive(Serialize)] #[serde(rename = "DeleteResult")] struct DeleteResultXml<'a> { From 74352546cde3cee36641ddfe9625bbe3181dad91 Mon Sep 17 00:00:00 2001 From: Anthony Lukach Date: Tue, 23 Jun 2026 19:17:14 -0700 Subject: [PATCH 19/21] refactor: share content_length helper `check_upload_size` (added in this PR) re-implemented the `content-length` header parse that `handle_request` already had as a nested fn. Extract one module-level `content_length(&HeaderMap) -> Option` and use it in both; `response_body_bytes` stays nested (genuinely single-use). No behavior change. Co-Authored-By: Claude Opus 4.8 --- crates/core/src/proxy.rs | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/crates/core/src/proxy.rs b/crates/core/src/proxy.rs index 5c1dfb7..346764b 100644 --- a/crates/core/src/proxy.rs +++ b/crates/core/src/proxy.rs @@ -275,11 +275,7 @@ where /// maximum. No-op when no limit is set or no `Content-Length` is present. fn check_upload_size(&self, headers: &HeaderMap) -> Result<(), ProxyError> { if let Some(max) = self.max_request_body_size { - let declared = headers - .get(http::header::CONTENT_LENGTH) - .and_then(|v| v.to_str().ok()) - .and_then(|s| s.parse::().ok()); - if let Some(len) = declared { + if let Some(len) = content_length(headers) { if len > max { tracing::warn!( content_length = len, @@ -409,14 +405,7 @@ where } } - fn content_length_from_headers(headers: &HeaderMap) -> Option { - headers - .get("content-length") - .and_then(|v| v.to_str().ok()) - .and_then(|s| s.parse::().ok()) - } - - let request_bytes = content_length_from_headers(req.headers); + let request_bytes = content_length(req.headers); let (mut response, status, resp_bytes, was_forwarded, backend_start) = match action { HandlerAction::Response(r) => { @@ -1276,6 +1265,14 @@ fn build_object_path(config: &BucketConfig, key: &str) -> object_store::path::Pa object_store::path::Path::from(apply_backend_prefix(config, key)) } +/// Parse the declared `Content-Length` header as a byte count, if present and valid. +fn content_length(headers: &HeaderMap) -> Option { + headers + .get(http::header::CONTENT_LENGTH) + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse::().ok()) +} + /// Map a client-visible key into the backend key space by prepending /// `backend_prefix`. fn apply_backend_prefix(config: &BucketConfig, key: &str) -> String { From 4a9c3538210304199ea888baf52340078fc29ffa Mon Sep 17 00:00:00 2001 From: Anthony Lukach Date: Tue, 23 Jun 2026 21:23:22 -0700 Subject: [PATCH 20/21] feat(registry): delegate per-key batch-delete authz to BucketRegistry MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit execute_delete_objects hard-coded per-key authorization against the caller's STS scopes via auth::key_authorized, while the coarse path (get_bucket) lets the registry own authorization. A registry that authorizes through an external API — resolving permission at get_bucket time, with empty STS scopes — had no way to allow batch-delete keys; every key was denied even for authorized callers. Add BucketRegistry::authorize_key with a default that preserves the existing scope check, and call it from the gateway instead of the free function. API-driven registries can override it to honor the bucket-level decision they already made in get_bucket. Co-Authored-By: Claude Opus 4.8 (1M context) --- Cargo.lock | 20 ++++++++++---------- crates/core/src/proxy.rs | 6 +++++- crates/core/src/registry/bucket.rs | 23 ++++++++++++++++++++++- 3 files changed, 37 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d5cf6e5..da5f3d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1121,7 +1121,7 @@ dependencies = [ [[package]] name = "multistore" -version = "0.5.0" +version = "0.5.1" dependencies = [ "async-trait", "base64", @@ -1147,7 +1147,7 @@ dependencies = [ [[package]] name = "multistore-cf-workers" -version = "0.5.0" +version = "0.5.1" dependencies = [ "async-trait", "bytes", @@ -1170,7 +1170,7 @@ dependencies = [ [[package]] name = "multistore-cf-workers-example" -version = "0.5.0" +version = "0.5.1" dependencies = [ "bytes", "console_error_panic_hook", @@ -1194,7 +1194,7 @@ dependencies = [ [[package]] name = "multistore-lambda" -version = "0.5.0" +version = "0.5.1" dependencies = [ "bytes", "http", @@ -1214,7 +1214,7 @@ dependencies = [ [[package]] name = "multistore-metering" -version = "0.5.0" +version = "0.5.1" dependencies = [ "bytes", "futures", @@ -1226,7 +1226,7 @@ dependencies = [ [[package]] name = "multistore-oidc-provider" -version = "0.5.0" +version = "0.5.1" dependencies = [ "base64", "chrono", @@ -1246,7 +1246,7 @@ dependencies = [ [[package]] name = "multistore-path-mapping" -version = "0.5.0" +version = "0.5.1" dependencies = [ "multistore", "percent-encoding", @@ -1255,7 +1255,7 @@ dependencies = [ [[package]] name = "multistore-server" -version = "0.5.0" +version = "0.5.1" dependencies = [ "axum", "bytes", @@ -1277,7 +1277,7 @@ dependencies = [ [[package]] name = "multistore-static-config" -version = "0.5.0" +version = "0.5.1" dependencies = [ "chrono", "multistore", @@ -1289,7 +1289,7 @@ dependencies = [ [[package]] name = "multistore-sts" -version = "0.5.0" +version = "0.5.1" dependencies = [ "aes-gcm", "base64", diff --git a/crates/core/src/proxy.rs b/crates/core/src/proxy.rs index 346764b..ff341af 100644 --- a/crates/core/src/proxy.rs +++ b/crates/core/src/proxy.rs @@ -1138,7 +1138,11 @@ where let mut allowed_backend: Vec = Vec::new(); let mut errors: Vec = Vec::new(); for key in request.keys() { - if crate::auth::key_authorized(&pending.identity, bucket, Action::DeleteObject, key) { + if self + .bucket_registry + .authorize_key(bucket, &pending.identity, Action::DeleteObject, key) + .await + { allowed_backend.push(apply_backend_prefix(config, key)); } else { errors.push(delete::DeleteError { diff --git a/crates/core/src/registry/bucket.rs b/crates/core/src/registry/bucket.rs index 7e85e9f..85162c4 100644 --- a/crates/core/src/registry/bucket.rs +++ b/crates/core/src/registry/bucket.rs @@ -4,7 +4,7 @@ use crate::api::list_rewrite::ListRewrite; use crate::api::response::BucketEntry; use crate::error::ProxyError; use crate::maybe_send::{MaybeSend, MaybeSync}; -use crate::types::{BucketConfig, BucketOwner, ResolvedIdentity, S3Operation}; +use crate::types::{Action, BucketConfig, BucketOwner, ResolvedIdentity, S3Operation}; use std::future::Future; /// Default owner name used in `ListAllMyBucketsResult` responses. @@ -51,6 +51,27 @@ pub trait BucketRegistry: Clone + MaybeSend + MaybeSync + 'static { identity: &ResolvedIdentity, ) -> impl Future, ProxyError>> + MaybeSend; + /// Authorize a single key for a batch operation such as + /// [`DeleteObjects`](crate::types::S3Operation::DeleteObjects). + /// + /// Called per key by the gateway *after* the coarse [`get_bucket`](Self::get_bucket) + /// authorization, once the keys — which live in the request body — are known. + /// The default enforces the caller's scope grants via + /// [`key_authorized`](crate::auth::key_authorized), matching the + /// coarse-path scope model. Registries that authorize through a different + /// mechanism (e.g. an external permission API consulted in `get_bucket`) + /// can override this to apply their own per-key policy — returning `true` + /// when the bucket-level decision already covers every key. + fn authorize_key( + &self, + name: &str, + identity: &ResolvedIdentity, + action: Action, + key: &str, + ) -> impl Future + MaybeSend { + async move { crate::auth::key_authorized(identity, name, action, key) } + } + /// The owner identity returned in `ListAllMyBucketsResult` responses. /// /// Defaults to `("multistore-proxy", "multistore-proxy")`. From fe567baa41c37906abd96326847374610bed3434 Mon Sep 17 00:00:00 2001 From: Anthony Lukach Date: Tue, 23 Jun 2026 22:15:40 -0700 Subject: [PATCH 21/21] fix(path-mapping): forward authorize_key to the inner registry MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit MappedRegistry forwards get_bucket/list_buckets/bucket_owner but inherited the default authorize_key, which bypasses the inner registry entirely (it checks the caller's STS scopes directly). An inner registry that overrides authorize_key — e.g. one authorizing batch-delete keys via an external API — was silently ignored, so every key fell to the scope check. Forward it like the other methods. Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/path-mapping/src/lib.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/crates/path-mapping/src/lib.rs b/crates/path-mapping/src/lib.rs index 6df3dec..2705ac1 100644 --- a/crates/path-mapping/src/lib.rs +++ b/crates/path-mapping/src/lib.rs @@ -464,6 +464,20 @@ impl BucketRegistry for MappedRegistry { self.inner.list_buckets(identity).await } + /// Forward per-key authorization to the inner registry, so an inner + /// registry's `authorize_key` override is honored (the default trait impl + /// would bypass it). Arguments pass through unchanged, matching how + /// `get_bucket` forwards the bucket name. + async fn authorize_key( + &self, + name: &str, + identity: &multistore::types::ResolvedIdentity, + action: multistore::types::Action, + key: &str, + ) -> bool { + self.inner.authorize_key(name, identity, action, key).await + } + fn bucket_owner(&self) -> multistore::types::BucketOwner { self.inner.bucket_owner() }