diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 35805f6..fe971b9 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -16,6 +16,12 @@ cargo check # Check (WASM) cargo check -p multistore-cf-workers --target wasm32-unknown-unknown + +# Unit + doc tests +cargo test + +# Integration tests (MinIO via docker compose behind wrangler dev; mirrors CI) +make test-integration ``` ## Release Process diff --git a/Cargo.lock b/Cargo.lock index a8e8ac5..da5f3d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1121,7 +1121,7 @@ dependencies = [ [[package]] name = "multistore" -version = "0.5.0" +version = "0.5.1" dependencies = [ "async-trait", "base64", @@ -1132,6 +1132,7 @@ dependencies = [ "hmac", "http", "matchit 0.8.4", + "md-5", "object_store", "percent-encoding", "quick-xml 0.37.5", @@ -1146,7 +1147,7 @@ dependencies = [ [[package]] name = "multistore-cf-workers" -version = "0.5.0" +version = "0.5.1" dependencies = [ "async-trait", "bytes", @@ -1169,7 +1170,7 @@ dependencies = [ [[package]] name = "multistore-cf-workers-example" -version = "0.5.0" +version = "0.5.1" dependencies = [ "bytes", "console_error_panic_hook", @@ -1193,7 +1194,7 @@ dependencies = [ [[package]] name = "multistore-lambda" -version = "0.5.0" +version = "0.5.1" dependencies = [ "bytes", "http", @@ -1213,7 +1214,7 @@ dependencies = [ [[package]] name = "multistore-metering" -version = "0.5.0" +version = "0.5.1" dependencies = [ "bytes", "futures", @@ -1225,7 +1226,7 @@ dependencies = [ [[package]] name = "multistore-oidc-provider" -version = "0.5.0" +version = "0.5.1" dependencies = [ "base64", "chrono", @@ -1245,7 +1246,7 @@ dependencies = [ [[package]] name = "multistore-path-mapping" -version = "0.5.0" +version = "0.5.1" dependencies = [ "multistore", "percent-encoding", @@ -1254,7 +1255,7 @@ dependencies = [ [[package]] name = "multistore-server" -version = "0.5.0" +version = "0.5.1" dependencies = [ "axum", "bytes", @@ -1276,7 +1277,7 @@ dependencies = [ [[package]] name = "multistore-static-config" -version = "0.5.0" +version = "0.5.1" dependencies = [ "chrono", "multistore", @@ -1288,7 +1289,7 @@ dependencies = [ [[package]] name = "multistore-sts" -version = "0.5.0" +version = "0.5.1" dependencies = [ "aes-gcm", "base64", diff --git a/Cargo.toml b/Cargo.toml index 3608fea..d82df70 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,6 +49,7 @@ matchit = "0.8" # Crypto hmac = "0.12" sha2 = { version = "0.10", features = ["oid"] } +md-5 = "0.10" rsa = "0.9" aes-gcm = "0.10" diff --git a/Makefile b/Makefile index 42dcf55..9c39ed4 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: check test run-server run-workers ci docs +.PHONY: check test test-integration run-server run-workers ci docs build: build-workers @@ -24,6 +24,11 @@ clippy-fix: test: cargo test +# Run the integration suite locally: MinIO (docker compose) + the Workers +# runtime (wrangler dev), mirroring CI. Pass extra pytest args via ARGS. +test-integration: + ./scripts/integration-test.sh $(ARGS) + run-server: cargo run -p multistore-server -- $(ARGS) diff --git a/crates/cf-workers/src/backend.rs b/crates/cf-workers/src/backend.rs index 3c6e11b..8ba5f42 100644 --- a/crates/cf-workers/src/backend.rs +++ b/crates/cf-workers/src/backend.rs @@ -146,22 +146,31 @@ impl ProxyBackend for WorkerBackend { // Fetch via worker let worker_req: worker::Request = ws_request.into(); - let mut worker_resp = Fetch::Request(worker_req) + let worker_resp = Fetch::Request(worker_req) .send() .await .map_err(|e| ProxyError::BackendError(format!("fetch failed: {}", e)))?; let status = worker_resp.status_code(); - // Read response body as bytes (multipart responses are small) - let resp_bytes = worker_resp - .bytes() - .await - .map_err(|e| ProxyError::Internal(format!("failed to read response: {}", e)))?; - + // Convert to `web_sys::Response` and read the headers BEFORE consuming + // the body. The `worker::Response → web_sys::Response` conversion panics + // once the body has been read, so reading bytes first (and converting + // after) blew up `send_raw` on every multipart/batch-delete response. + // `forward()` relies on the same before-body ordering. let ws_response: web_sys::Response = worker_resp.into(); let resp_headers = headermap_from_js(&ws_response.headers()); + // Read the (small) response body via `arrayBuffer()`. + let buf = wasm_bindgen_futures::JsFuture::from( + ws_response + .array_buffer() + .map_err(|e| ProxyError::Internal(format!("arrayBuffer() failed: {:?}", e)))?, + ) + .await + .map_err(|e| ProxyError::Internal(format!("failed to read response: {:?}", e)))?; + let resp_bytes = js_sys::Uint8Array::new(&buf).to_vec(); + Ok(RawResponse { status, headers: resp_headers, diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 6cca716..5eea88e 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -25,6 +25,7 @@ url.workspace = true percent-encoding.workspace = true hmac.workspace = true sha2.workspace = true +md-5.workspace = true quick-xml.workspace = true tracing.workspace = true object_store.workspace = true diff --git a/crates/core/src/api/delete.rs b/crates/core/src/api/delete.rs new file mode 100644 index 0000000..88dcfe7 --- /dev/null +++ b/crates/core/src/api/delete.rs @@ -0,0 +1,303 @@ +//! Batch delete (`DeleteObjects`) request/response handling. +//! +//! S3's batch delete (`POST /{bucket}?delete`) carries the keys to delete in an +//! XML request body and returns a per-key result. This module owns the pure +//! parsing and serialization: reading the inbound `` body, building the +//! body forwarded to the backend, and merging the backend's `` +//! with the proxy's own per-key authorization decisions. +//! +//! Per-key authorization (via [`auth::key_authorized`](crate::auth::key_authorized)) +//! and backend forwarding live in the gateway; everything here is runtime- and +//! I/O-free so it can be exercised directly in unit tests. + +use crate::error::ProxyError; +use serde::{Deserialize, Serialize}; + +/// Parsed inbound `` request body. +#[derive(Debug, Deserialize)] +#[serde(rename = "Delete")] +pub(crate) struct DeleteRequest { + /// When true, successful deletions are omitted from the response — only + /// errors are reported. + #[serde(default, rename = "Quiet")] + pub(crate) quiet: bool, + /// The objects to delete. + #[serde(default, rename = "Object")] + pub(crate) objects: Vec, +} + +/// A single `` entry in a batch-delete request. +#[derive(Debug, Deserialize)] +pub(crate) struct DeleteObjectEntry { + /// The (client-facing) object key to delete. + #[serde(rename = "Key")] + pub(crate) key: String, +} + +impl DeleteRequest { + /// The maximum number of objects S3 accepts in a single batch delete. + pub(crate) const MAX_KEYS: usize = 1000; + + /// Parse a batch-delete request body. + /// + /// Mirrors S3's `MalformedXML` rejections: the body must be well-formed XML, + /// name at least one object, and name no more than [`MAX_KEYS`](Self::MAX_KEYS). + pub(crate) fn parse(body: &[u8]) -> Result { + let req: DeleteRequest = quick_xml::de::from_reader(body) + .map_err(|e| ProxyError::MalformedXml(format!("malformed delete body: {e}")))?; + if req.objects.is_empty() { + return Err(ProxyError::MalformedXml( + "delete request names no objects".into(), + )); + } + if req.objects.len() > Self::MAX_KEYS { + return Err(ProxyError::MalformedXml(format!( + "delete request names {} objects, exceeding the {}-key limit", + req.objects.len(), + Self::MAX_KEYS + ))); + } + Ok(req) + } + + /// The client-facing keys named in the request, in order. + pub(crate) fn keys(&self) -> impl Iterator { + self.objects.iter().map(|o| o.key.as_str()) + } +} + +/// Build the `` XML body forwarded to the backend. +/// +/// `backend_keys` are the keys already mapped into the backend's key space +/// (i.e. with any `backend_prefix` applied). `Quiet` is always `false` so the +/// backend reports each deletion explicitly, letting the proxy map results back +/// to client keys before applying the client's own quiet preference. +pub(crate) fn build_backend_delete_body(backend_keys: &[String]) -> String { + #[derive(Serialize)] + #[serde(rename = "Delete")] + struct Body<'a> { + #[serde(rename = "Quiet")] + quiet: bool, + #[serde(rename = "Object")] + objects: Vec>, + } + #[derive(Serialize)] + struct Obj<'a> { + #[serde(rename = "Key")] + key: &'a str, + } + let body = Body { + quiet: false, + objects: backend_keys.iter().map(|k| Obj { key: k }).collect(), + }; + format!( + "\n{}", + quick_xml::se::to_string(&body).unwrap_or_default() + ) +} + +/// A per-key error in a ``. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub(crate) struct DeleteError { + /// The object key the error applies to. + #[serde(rename = "Key")] + pub(crate) key: String, + /// S3 error code (e.g. `AccessDenied`). + #[serde(rename = "Code")] + pub(crate) code: String, + /// Human-readable message. + #[serde(rename = "Message")] + pub(crate) message: String, +} + +/// The backend's keys that were deleted and any per-key errors it reported. +#[derive(Debug)] +pub(crate) struct BackendOutcome { + /// Backend keys reported as deleted. + pub(crate) deleted: Vec, + /// Per-key errors reported by the backend (backend key space). + pub(crate) errors: Vec, +} + +/// Parse a backend `` response. +/// +/// Tolerates the extra elements S3 includes (`VersionId`, `DeleteMarker`, …) and +/// returns only what the proxy needs to rebuild the client response. +pub(crate) fn parse_backend_result(xml: &[u8]) -> Result { + #[derive(Deserialize)] + #[serde(rename = "DeleteResult")] + struct DeleteResultXml { + #[serde(default, rename = "Deleted")] + deleted: Vec, + #[serde(default, rename = "Error")] + errors: Vec, + } + #[derive(Deserialize)] + struct Deleted { + #[serde(rename = "Key")] + key: String, + } + let parsed: DeleteResultXml = quick_xml::de::from_reader(xml) + .map_err(|e| ProxyError::BackendError(format!("malformed delete result: {e}")))?; + Ok(BackendOutcome { + deleted: parsed.deleted.into_iter().map(|d| d.key).collect(), + errors: parsed.errors, + }) +} + +/// Serialize a client-facing ``. +/// +/// `deleted` and `errors` are in client key space. In `quiet` mode the +/// `` entries are omitted (S3 semantics); errors are always reported. +pub(crate) fn build_delete_result( + deleted: &[String], + errors: &[DeleteError], + quiet: bool, +) -> String { + #[derive(Serialize)] + #[serde(rename = "DeleteResult")] + struct DeleteResultXml<'a> { + #[serde(rename = "@xmlns")] + xmlns: &'static str, + #[serde(rename = "Deleted")] + deleted: Vec>, + #[serde(rename = "Error")] + errors: &'a [DeleteError], + } + #[derive(Serialize)] + struct Deleted<'a> { + #[serde(rename = "Key")] + key: &'a str, + } + let deleted = if quiet { + Vec::new() + } else { + deleted.iter().map(|k| Deleted { key: k }).collect() + }; + let result = DeleteResultXml { + xmlns: "http://s3.amazonaws.com/doc/2006-03-01/", + deleted, + errors, + }; + format!( + "\n{}", + quick_xml::se::to_string(&result).unwrap_or_default() + ) +} + +#[cfg(test)] +mod tests { + use super::*; + + const SAMPLE: &[u8] = br#" + + a.txt + nested/b.txt + "#; + + #[test] + fn parses_keys_and_quiet_default_false() { + let req = DeleteRequest::parse(SAMPLE).unwrap(); + assert!(!req.quiet); + let keys: Vec<_> = req.keys().collect(); + assert_eq!(keys, vec!["a.txt", "nested/b.txt"]); + } + + #[test] + fn parses_quiet_flag() { + let body = br#"truek"#; + let req = DeleteRequest::parse(body).unwrap(); + assert!(req.quiet); + } + + #[test] + fn empty_delete_is_rejected_as_malformed_xml() { + let body = br#""#; + assert!(matches!( + DeleteRequest::parse(body), + Err(ProxyError::MalformedXml(_)) + )); + } + + #[test] + fn malformed_body_is_rejected_as_malformed_xml() { + assert!(matches!( + DeleteRequest::parse(b"not xml"), + Err(ProxyError::MalformedXml(_)) + )); + } + + #[test] + fn over_key_limit_is_rejected() { + let mut body = String::from(""); + for i in 0..=DeleteRequest::MAX_KEYS { + body.push_str(&format!("k{i}")); + } + body.push_str(""); + assert!(matches!( + DeleteRequest::parse(body.as_bytes()), + Err(ProxyError::MalformedXml(_)) + )); + // Exactly MAX_KEYS is allowed. + let mut ok = String::from(""); + for i in 0..DeleteRequest::MAX_KEYS { + ok.push_str(&format!("k{i}")); + } + ok.push_str(""); + assert!(DeleteRequest::parse(ok.as_bytes()).is_ok()); + } + + #[test] + fn backend_body_lists_each_key_non_quiet() { + let body = build_backend_delete_body(&["p/a.txt".into(), "p/b.txt".into()]); + assert!(body.contains("false")); + assert!(body.contains("p/a.txt")); + assert!(body.contains("p/b.txt")); + } + + #[test] + fn parses_backend_result_with_extra_elements() { + let xml = br#" + + p/a.txttruev1 + p/c.txtInternalErroroops + "#; + let out = parse_backend_result(xml).unwrap(); + assert_eq!(out.deleted, vec!["p/a.txt"]); + assert_eq!(out.errors.len(), 1); + assert_eq!(out.errors[0].key, "p/c.txt"); + assert_eq!(out.errors[0].code, "InternalError"); + } + + #[test] + fn build_result_omits_deleted_when_quiet() { + let deleted = vec!["a.txt".to_string()]; + let errors = vec![DeleteError { + key: "secret/x".into(), + code: "AccessDenied".into(), + message: "denied".into(), + }]; + let verbose = build_delete_result(&deleted, &errors, false); + // S3's DeleteResult carries the bucket namespace on the root element. + assert!( + verbose.contains("") + ); + assert!(verbose.contains("a.txt")); + assert!(verbose.contains("secret/x")); + assert!(verbose.contains("AccessDenied")); + + let quiet = build_delete_result(&deleted, &errors, true); + assert!(!quiet.contains("")); + // Errors are always reported, even in quiet mode. + assert!(quiet.contains("AccessDenied")); + } + + #[test] + fn keys_are_xml_escaped() { + // A key with XML-significant characters must be escaped in both the + // backend body and the result. + let body = build_backend_delete_body(&["a&b.txt".into()]); + assert!(body.contains("a&b<c>.txt")); + assert!(!body.contains("a&b.txt")); + } +} diff --git a/crates/core/src/api/mod.rs b/crates/core/src/api/mod.rs index 7e1a287..2462d9a 100644 --- a/crates/core/src/api/mod.rs +++ b/crates/core/src/api/mod.rs @@ -1,5 +1,6 @@ //! S3 API parsing and response serialization. +pub mod delete; pub mod list; pub mod list_rewrite; pub mod request; diff --git a/crates/core/src/api/request.rs b/crates/core/src/api/request.rs index d26dd0a..df41a38 100644 --- a/crates/core/src/api/request.rs +++ b/crates/core/src/api/request.rs @@ -12,9 +12,21 @@ pub fn parse_s3_request( method: &Method, uri_path: &str, query: Option<&str>, - _headers: &http::HeaderMap, + headers: &http::HeaderMap, host_style: HostStyle, ) -> Result { + // Server-side copy (CopyObject / UploadPartCopy) arrives as a PUT carrying + // `x-amz-copy-source`. It is not supported: forwarding such a request via a + // presigned URL would drop the copy-source header and silently overwrite the + // destination with an empty body. Reject it explicitly so the failure is + // unambiguous rather than corrupting data. See + // .plans/2026-06-23-data-edit-operations-design.md for the deferred design. + if *method == Method::PUT && headers.contains_key("x-amz-copy-source") { + return Err(ProxyError::NotImplemented( + "server-side copy (x-amz-copy-source) is not supported".into(), + )); + } + // GET / with path-style → ListBuckets (no bucket in path) if matches!(host_style, HostStyle::Path) && uri_path.trim_start_matches('/').is_empty() { if *method == Method::GET { @@ -54,6 +66,7 @@ pub fn build_s3_operation( .map(|(_, v)| v.clone()); let has_uploads = query_params.iter().any(|(k, _)| k == "uploads"); + let has_delete = query_params.iter().any(|(k, _)| k == "delete"); match *method { Method::GET => { @@ -85,6 +98,14 @@ pub fn build_s3_operation( part_number, }) } else { + // `x-amz-copy-source` (CopyObject / UploadPartCopy) is rejected + // upstream in `parse_s3_request`. Callers that invoke + // `build_s3_operation` directly (custom resolvers) are + // responsible for their own copy-source handling. + // ponytail: deferred — trailer checksums (`x-amz-checksum-*`) + // sent on writes are dropped, not forwarded; they need the + // header-signing forward path. See + // .plans/2026-06-23-data-edit-operations-design.md. Ok(S3Operation::PutObject { bucket, key }) } } @@ -97,6 +118,11 @@ pub fn build_s3_operation( key, upload_id, }) + } else if has_delete { + // Batch delete: `POST /{bucket}?delete` with an XML body listing + // keys. The keys (and per-key authorization) are handled once the + // body is materialized. + Ok(S3Operation::DeleteObjects { bucket }) } else { Err(ProxyError::InvalidRequest( "unsupported POST operation".into(), @@ -111,6 +137,10 @@ pub fn build_s3_operation( upload_id, }) } else if !key.is_empty() { + // ponytail: deferred — versioned delete (`?versionId=`) and MFA + // delete are not handled; the version is ignored and the current + // object is deleted. Upgrade path: thread version-id through the + // forward. See .plans/2026-06-23-data-edit-operations-design.md. Ok(S3Operation::DeleteObject { bucket, key }) } else { Err(ProxyError::InvalidRequest( @@ -154,3 +184,61 @@ fn parse_query_params(query: Option<&str>) -> Vec<(String, String)> { }) .unwrap_or_default() } + +#[cfg(test)] +mod tests { + use super::*; + + fn parse( + method: Method, + path: &str, + query: Option<&str>, + headers: &http::HeaderMap, + ) -> Result { + parse_s3_request(&method, path, query, headers, HostStyle::Path) + } + + #[test] + fn batch_delete_parses_as_delete_objects() { + let op = parse( + Method::POST, + "/my-bucket", + Some("delete"), + &http::HeaderMap::new(), + ) + .unwrap(); + assert!( + matches!(op, S3Operation::DeleteObjects { ref bucket } if bucket == "my-bucket"), + "POST ?delete should parse as DeleteObjects, got {op:?}" + ); + } + + #[test] + fn post_without_known_subresource_is_rejected() { + let err = parse( + Method::POST, + "/my-bucket/key", + None, + &http::HeaderMap::new(), + ) + .unwrap_err(); + assert!(matches!(err, ProxyError::InvalidRequest(_))); + } + + #[test] + fn copy_source_put_is_rejected_not_implemented() { + let mut headers = http::HeaderMap::new(); + headers.insert("x-amz-copy-source", "/src-bucket/src-key".parse().unwrap()); + let err = parse(Method::PUT, "/dst-bucket/dst-key", None, &headers).unwrap_err(); + assert!( + matches!(err, ProxyError::NotImplemented(_)), + "copy-source PUT must be rejected as NotImplemented, got {err:?}" + ); + } + + #[test] + fn plain_put_still_parses_as_put_object() { + let op = parse(Method::PUT, "/b/k.txt", None, &http::HeaderMap::new()).unwrap(); + assert!(matches!(op, S3Operation::PutObject { .. })); + } +} diff --git a/crates/core/src/auth/authorize.rs b/crates/core/src/auth/authorize.rs index 07134bd..97021e0 100644 --- a/crates/core/src/auth/authorize.rs +++ b/crates/core/src/auth/authorize.rs @@ -13,8 +13,12 @@ fn key_matches_prefix(key: &str, prefix: &str) -> bool { if prefix.ends_with('/') || prefix.is_empty() { return key.starts_with(prefix); } - // Prefix does not end with '/' — require an exact match or a '/' boundary - key == prefix || key.starts_with(&format!("{}/", prefix)) + // Prefix does not end with '/' — require an exact match or a '/' boundary. + // Done without allocating: this runs per key × scope × prefix on batch ops. + key == prefix + || (key.len() > prefix.len() + && key.starts_with(prefix) + && key.as_bytes()[prefix.len()] == b'/') } /// Check if a resolved identity is authorized to perform an operation. @@ -60,6 +64,12 @@ pub fn authorize( _ => operation.key().to_string(), }; + // Batch delete carries no key here — the keys live in the request body. This + // is only a coarse check that *some* scope grants DeleteObject on the bucket; + // each key is authorized individually (against its prefix) once the body is + // parsed. See [`key_authorized`]. + let ignore_prefix = matches!(operation, S3Operation::DeleteObjects { .. }); + // Check if any scope grants access let authorized = scopes.iter().any(|scope| { if scope.bucket != bucket { @@ -69,8 +79,8 @@ pub fn authorize( return false; } // Check prefix restrictions - if scope.prefixes.is_empty() { - return true; // Full bucket access + if ignore_prefix || scope.prefixes.is_empty() { + return true; // Full bucket access (or deferred per-key check) } scope .prefixes @@ -92,6 +102,31 @@ pub fn authorize( } } +/// Check whether `identity` may perform `action` on a single `key` in `bucket`. +/// +/// Used for per-key authorization of batch operations such as +/// [`DeleteObjects`](crate::types::S3Operation::DeleteObjects), where the coarse +/// [`authorize`] check only verified that *some* scope grants the action on the +/// bucket. Anonymous identities are never authorized here — this is only used +/// for write actions, which anonymous callers can never perform. +pub fn key_authorized( + identity: &ResolvedIdentity, + bucket: &str, + action: Action, + key: &str, +) -> bool { + let scopes = match identity { + ResolvedIdentity::Anonymous => return false, + ResolvedIdentity::Authenticated(id) => &id.allowed_scopes, + }; + scopes.iter().any(|scope| { + scope.bucket == bucket + && scope.actions.contains(&action) + && (scope.prefixes.is_empty() + || scope.prefixes.iter().any(|p| key_matches_prefix(key, p))) + }) +} + #[cfg(test)] mod tests { use super::*; @@ -121,4 +156,89 @@ mod tests { assert!(!key_matches_prefix("other/file.txt", "data/")); assert!(!key_matches_prefix("other/file.txt", "data")); } + + use crate::types::{AccessScope, AuthenticatedIdentity, BucketConfig}; + + fn identity_with(scope: AccessScope) -> ResolvedIdentity { + ResolvedIdentity::Authenticated(AuthenticatedIdentity { + principal_name: "tester".into(), + allowed_scopes: vec![scope], + }) + } + + fn bucket(name: &str, anonymous: bool) -> BucketConfig { + BucketConfig { + name: name.into(), + backend_type: "s3".into(), + backend_prefix: None, + anonymous_access: anonymous, + allowed_roles: vec![], + backend_options: Default::default(), + } + } + + #[test] + fn key_authorized_enforces_prefix_per_key() { + let id = identity_with(AccessScope { + bucket: "b".into(), + prefixes: vec!["data/".into()], + actions: vec![Action::DeleteObject], + }); + assert!(key_authorized(&id, "b", Action::DeleteObject, "data/x.txt")); + assert!(!key_authorized( + &id, + "b", + Action::DeleteObject, + "other/x.txt" + )); + // Wrong bucket / wrong action are denied. + assert!(!key_authorized( + &id, + "other", + Action::DeleteObject, + "data/x.txt" + )); + assert!(!key_authorized(&id, "b", Action::PutObject, "data/x.txt")); + } + + #[test] + fn key_authorized_denies_anonymous() { + assert!(!key_authorized( + &ResolvedIdentity::Anonymous, + "b", + Action::DeleteObject, + "anything" + )); + } + + #[test] + fn delete_objects_coarse_authz_ignores_prefix() { + // A prefix-scoped caller passes the coarse batch-delete check even though + // the operation carries no key; per-key enforcement happens later. + let id = identity_with(AccessScope { + bucket: "b".into(), + prefixes: vec!["data/".into()], + actions: vec![Action::DeleteObject], + }); + let op = S3Operation::DeleteObjects { bucket: "b".into() }; + assert!(authorize(&id, &op, &bucket("b", false)).is_ok()); + } + + #[test] + fn delete_objects_denied_without_delete_action() { + let id = identity_with(AccessScope { + bucket: "b".into(), + prefixes: vec![], + actions: vec![Action::GetObject], + }); + let op = S3Operation::DeleteObjects { bucket: "b".into() }; + assert!(authorize(&id, &op, &bucket("b", false)).is_err()); + } + + #[test] + fn delete_objects_denied_for_anonymous() { + let op = S3Operation::DeleteObjects { bucket: "b".into() }; + // Even on an anonymous-readable bucket, batch delete is a write. + assert!(authorize(&ResolvedIdentity::Anonymous, &op, &bucket("b", true)).is_err()); + } } diff --git a/crates/core/src/auth/mod.rs b/crates/core/src/auth/mod.rs index 496c0e0..6561acd 100644 --- a/crates/core/src/auth/mod.rs +++ b/crates/core/src/auth/mod.rs @@ -9,7 +9,7 @@ mod authorize; pub mod identity; pub mod sigv4; -pub use authorize::authorize; +pub use authorize::{authorize, key_authorized}; pub use identity::resolve_identity; pub use sigv4::{parse_sigv4_auth, verify_sigv4_signature, SigV4Auth}; diff --git a/crates/core/src/auth/sigv4.rs b/crates/core/src/auth/sigv4.rs index 12da1e7..2a97de2 100644 --- a/crates/core/src/auth/sigv4.rs +++ b/crates/core/src/auth/sigv4.rs @@ -154,12 +154,32 @@ pub fn verify_sigv4_signature( Ok(matched) } -/// Sort query string parameters for SigV4 canonical request construction. +/// Build the SigV4 canonical query string: every parameter as `key=value`, +/// sorted. +/// +/// A value-less flag parameter (e.g. `?uploads`, `?delete`) is canonicalized +/// with an empty value and a trailing `=` per the SigV4 spec. Clients (and +/// backends) sign it that way, so the proxy must reconstruct it identically on +/// both the inbound-verification and outbound-signing sides or the signature +/// will not match. Empty segments (from a stray `&`) are dropped. pub(crate) fn canonicalize_query_string(query: &str) -> String { if query.is_empty() { return String::new(); } - let mut parts: Vec<&str> = query.split('&').collect(); + // Borrow params that are already `key=value`; only the value-less flags + // (`?delete`, `?uploads`) need an owned `key=`. This keeps the common path + // allocation-free on the per-request signing/verification hot path. + let mut parts: Vec> = query + .split('&') + .filter(|p| !p.is_empty()) + .map(|p| { + if p.contains('=') { + std::borrow::Cow::Borrowed(p) + } else { + std::borrow::Cow::Owned(format!("{p}=")) + } + }) + .collect(); parts.sort_unstable(); parts.join("&") } @@ -180,3 +200,49 @@ pub(crate) fn constant_time_eq(a: &[u8], b: &[u8]) -> bool { .fold(0u8, |acc, (x, y)| acc | (x ^ y)) == 0 } + +#[cfg(test)] +mod tests { + use super::canonicalize_query_string; + + #[test] + fn empty_query_is_empty() { + assert_eq!(canonicalize_query_string(""), ""); + } + + #[test] + fn value_less_flag_gets_trailing_equals() { + // The bug that broke multipart and batch delete: `?uploads` / `?delete` + // must canonicalize to `uploads=` / `delete=`. + assert_eq!(canonicalize_query_string("uploads"), "uploads="); + assert_eq!(canonicalize_query_string("delete"), "delete="); + } + + #[test] + fn valued_params_are_sorted_and_unchanged() { + assert_eq!( + canonicalize_query_string("list-type=2&prefix=foo"), + "list-type=2&prefix=foo" + ); + // Sorting is by the full encoded parameter. + assert_eq!( + canonicalize_query_string("partNumber=1&uploadId=abc"), + "partNumber=1&uploadId=abc" + ); + assert_eq!(canonicalize_query_string("b=2&a=1"), "a=1&b=2"); + } + + #[test] + fn mixed_flag_and_valued_params() { + // Real shape of a versioned delete-style request. + assert_eq!( + canonicalize_query_string("versionId=v1&delete"), + "delete=&versionId=v1" + ); + } + + #[test] + fn stray_empty_segments_are_dropped() { + assert_eq!(canonicalize_query_string("delete&"), "delete="); + } +} diff --git a/crates/core/src/backend/multipart.rs b/crates/core/src/backend/multipart.rs index 11a13c2..f57be9f 100644 --- a/crates/core/src/backend/multipart.rs +++ b/crates/core/src/backend/multipart.rs @@ -1,8 +1,11 @@ -//! Multipart URL building and request signing for S3-compatible backends. +//! Backend URL building and request signing for raw signed (non-presigned) +//! S3 operations. //! -//! These helpers are used by [`Gateway::execute_multipart`](crate::proxy::Gateway) -//! for CreateMultipartUpload, UploadPart, CompleteMultipartUpload, and -//! AbortMultipartUpload operations. +//! These helpers build the backend URL and sign the request for the operations +//! that go through [`ProxyBackend::send_raw`](crate::backend::ProxyBackend::send_raw): +//! the multipart operations (CreateMultipartUpload, UploadPart, +//! CompleteMultipartUpload, AbortMultipartUpload) and batch delete +//! (DeleteObjects). use crate::backend::request_signer::S3RequestSigner; use crate::error::ProxyError; @@ -22,6 +25,15 @@ pub fn build_backend_url( let bucket = config.option("bucket_name").unwrap_or(""); let bucket_is_empty = bucket.is_empty(); + // Batch delete targets the bucket, not a key: `{base}[/{bucket}]?delete`. + if matches!(operation, S3Operation::DeleteObjects { .. }) { + return Ok(if bucket_is_empty { + format!("{base}?delete") + } else { + format!("{base}/{bucket}?delete") + }); + } + let mut key = String::new(); if let Some(prefix) = &config.backend_prefix { key.push_str(prefix.trim_end_matches('/')); diff --git a/crates/core/src/backend/request_signer.rs b/crates/core/src/backend/request_signer.rs index ad023fb..c53d611 100644 --- a/crates/core/src/backend/request_signer.rs +++ b/crates/core/src/backend/request_signer.rs @@ -1,11 +1,13 @@ //! Outbound SigV4 request signing. //! //! [`S3RequestSigner`] signs raw HTTP requests destined for S3-compatible -//! backends using AWS Signature Version 4. Used for multipart operations -//! (CreateMultipartUpload, UploadPart, CompleteMultipartUpload, -//! AbortMultipartUpload) that go through [`backend::ProxyBackend::send_raw`](crate::backend::ProxyBackend::send_raw). +//! backends using AWS Signature Version 4. Used for the operations that go +//! through [`backend::ProxyBackend::send_raw`](crate::backend::ProxyBackend::send_raw) +//! rather than presigned URLs: multipart operations (CreateMultipartUpload, +//! UploadPart, CompleteMultipartUpload, AbortMultipartUpload) and batch delete +//! (DeleteObjects). -use crate::auth::sigv4::hmac_sha256; +use crate::auth::sigv4::{canonicalize_query_string, hmac_sha256}; use crate::error::ProxyError; use http::HeaderMap; @@ -75,9 +77,11 @@ impl S3RequestSigner { }; headers.insert("host", host_header.parse().unwrap()); - // Canonical request + // Canonical request. The query must be in SigV4 canonical form — every + // parameter as `key=value` (value-less flags like `?uploads`/`?delete` + // get a trailing `=`), sorted — or the backend rejects the signature. let canonical_uri = url.path(); - let canonical_querystring = url.query().unwrap_or(""); + let canonical_querystring = canonicalize_query_string(url.query().unwrap_or("")); let mut signed_header_names: Vec<&str> = headers.keys().map(|k| k.as_str()).collect(); signed_header_names.sort(); diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs index 1c5bbcf..c28dd2a 100644 --- a/crates/core/src/error.rs +++ b/crates/core/src/error.rs @@ -25,6 +25,24 @@ pub enum ProxyError { #[error("invalid request: {0}")] InvalidRequest(String), + /// The XML in the request body was not well-formed or did not validate + /// against the expected schema (e.g. a batch-delete body that is malformed, + /// empty, or exceeds the 1000-key limit). Maps to S3's `400 MalformedXML`. + #[error("malformed XML: {0}")] + MalformedXml(String), + + /// The requested S3 operation is recognized but not supported by the proxy + /// (e.g. server-side copy via `x-amz-copy-source`). Maps to S3's + /// `501 NotImplemented`. + #[error("not implemented: {0}")] + NotImplemented(String), + + /// The upload body exceeds the proxy's configured maximum size. Returned + /// as S3's `EntityTooLarge` so clients get an actionable error instead of a + /// runtime-specific rejection (e.g. Cloudflare's edge `413`). + #[error("entity too large")] + EntityTooLarge, + /// The request contains no authentication credentials. #[error("missing authentication")] MissingAuth, @@ -82,6 +100,9 @@ impl ProxyError { Self::AccessDenied => "AccessDenied", Self::SignatureDoesNotMatch => "SignatureDoesNotMatch", Self::InvalidRequest(_) => "InvalidRequest", + Self::MalformedXml(_) => "MalformedXML", + Self::NotImplemented(_) => "NotImplemented", + Self::EntityTooLarge => "EntityTooLarge", Self::MissingAuth => "AccessDenied", Self::ExpiredCredentials => "ExpiredToken", Self::InvalidOidcToken(_) => "InvalidIdentityToken", @@ -102,6 +123,9 @@ impl ProxyError { Self::AccessDenied | Self::MissingAuth | Self::ExpiredCredentials => 403, Self::SignatureDoesNotMatch => 403, Self::InvalidRequest(_) => 400, + Self::MalformedXml(_) => 400, + Self::NotImplemented(_) => 501, + Self::EntityTooLarge => 400, Self::InvalidOidcToken(_) => 400, Self::RoleNotFound(_) => 403, Self::PreconditionFailed => 412, diff --git a/crates/core/src/proxy.rs b/crates/core/src/proxy.rs index a7da8c0..ff341af 100644 --- a/crates/core/src/proxy.rs +++ b/crates/core/src/proxy.rs @@ -65,7 +65,7 @@ use crate::middleware::{ use crate::registry::{BucketRegistry, CredentialRegistry}; use crate::route_handler::{ProxyResponseBody, RequestInfo}; use crate::router::Router; -use crate::types::{BucketConfig, ResolvedIdentity, S3Operation}; +use crate::types::{Action, BucketConfig, ResolvedIdentity, S3Operation}; use bytes::Bytes; use http::{HeaderMap, Method}; use object_store::list::PaginatedListOptions; @@ -144,6 +144,13 @@ pub struct ProxyGateway { /// When true, responses include a `Server-Timing` header with gateway /// processing metrics. Enabled by default. server_timing: bool, + /// Maximum accepted upload body size in bytes, if set. When a body-bearing + /// write (`PutObject`, `UploadPart`, or `DeleteObjects`) declares a + /// `Content-Length` larger than this, the proxy rejects it with + /// `EntityTooLarge` instead of forwarding it. Useful for surfacing a clean + /// S3 error ahead of a runtime body-size limit (e.g. Cloudflare Workers' + /// edge `413`). `None` means no proxy-enforced limit. + max_request_body_size: Option, } impl ProxyGateway @@ -176,6 +183,7 @@ where debug_errors: false, user_agent: DEFAULT_USER_AGENT.to_string(), server_timing: true, + max_request_body_size: None, } } @@ -246,6 +254,41 @@ where self } + /// Set the maximum accepted upload body size, in bytes. + /// + /// When set, a body-bearing write (`PutObject`, `UploadPart`, or + /// `DeleteObjects`) whose `Content-Length` exceeds this is rejected up front + /// with S3's `EntityTooLarge` (HTTP 400) rather than forwarded. Use this on + /// runtimes with a hard request-body limit — + /// e.g. Cloudflare Workers, where the edge otherwise rejects oversized + /// bodies with an opaque `413` — to give clients an actionable S3 error. + /// + /// The check relies on a declared `Content-Length`; requests without one + /// (e.g. unknown-length streaming) fall through to the runtime's own limit. + /// Leaving this unset (the default) disables the proxy-enforced limit. + pub fn with_max_request_body_size(mut self, max_bytes: u64) -> Self { + self.max_request_body_size = Some(max_bytes); + self + } + + /// Reject an upload whose declared `Content-Length` exceeds the configured + /// maximum. No-op when no limit is set or no `Content-Length` is present. + fn check_upload_size(&self, headers: &HeaderMap) -> Result<(), ProxyError> { + if let Some(max) = self.max_request_body_size { + if let Some(len) = content_length(headers) { + if len > max { + tracing::warn!( + content_length = len, + max = max, + "rejecting upload exceeding configured max body size" + ); + return Err(ProxyError::EntityTooLarge); + } + } + } + Ok(()) + } + /// Inject a `Server-Timing` header into the response headers if enabled. fn maybe_inject_server_timing( &self, @@ -362,14 +405,7 @@ where } } - fn content_length_from_headers(headers: &HeaderMap) -> Option { - headers - .get("content-length") - .and_then(|v| v.to_str().ok()) - .and_then(|s| s.parse::().ok()) - } - - let request_bytes = content_length_from_headers(req.headers); + let request_bytes = content_length(req.headers); let (mut response, status, resp_bytes, was_forwarded, backend_start) = match action { HandlerAction::Response(r) => { @@ -650,18 +686,23 @@ where ) } - /// Phase 2: Complete a multipart operation with the request body. + /// Phase 2: Complete a body-bearing operation with the materialized body. /// - /// Called by the runtime after materializing the body for a `NeedsBody` action. - /// Middleware is not re-run here — it already executed during phase 1 - /// when the `NeedsBody` action was produced. + /// Called by the runtime after materializing the body for a `NeedsBody` + /// action — multipart operations and batch delete. Middleware is not re-run + /// here — it already executed during phase 1 when the `NeedsBody` action was + /// produced. pub async fn handle_with_body(&self, pending: PendingRequest, body: Bytes) -> ProxyResult { - match self.execute_multipart(&pending, body).await { + let result = match &pending.operation { + S3Operation::DeleteObjects { .. } => self.execute_delete_objects(&pending, body).await, + _ => self.execute_multipart(&pending, body).await, + }; + match result { Ok(result) => { tracing::info!( request_id = %pending.request_id, status = result.status, - "multipart request completed" + "body request completed" ); result } @@ -671,7 +712,7 @@ where error = %err, status = err.status_code(), s3_code = %err.s3_error_code(), - "multipart request failed" + "body request failed" ); error_response( &err, @@ -759,13 +800,31 @@ where Ok(HandlerAction::Forward(fwd)) } S3Operation::PutObject { key, .. } => { + self.check_upload_size(original_headers)?; let fwd = self .build_forward( Method::PUT, bucket_config, key, original_headers, - &["content-type", "content-length", "content-md5"], + // Standard HTTP entity headers are safe to forward to a + // presigned URL: S3 applies them even though they are not + // part of the (host-only) presigned signature. `x-amz-*` + // write headers (metadata, SSE, tagging, storage-class, + // checksums) are deliberately NOT forwarded here — S3 + // rejects unsigned `x-amz-*` headers on presigned + // requests, so they need the header-signing path. See + // .plans/2026-06-23-data-edit-operations-design.md. + &[ + "content-type", + "content-length", + "content-md5", + "content-disposition", + "content-encoding", + "content-language", + "cache-control", + "expires", + ], request_id, ) .await?; @@ -802,17 +861,42 @@ where | S3Operation::UploadPart { .. } | S3Operation::CompleteMultipartUpload { .. } | S3Operation::AbortMultipartUpload { .. } => { - if !bucket_config.supports_s3_multipart() { + if !bucket_config.is_s3_backend() { return Err(ProxyError::InvalidRequest(format!( "multipart operations not supported for '{}' backends", bucket_config.backend_type ))); } + // UploadPart carries the part body; reject oversized parts up + // front. (Create/Complete/Abort bodies are small and won't trip + // a sane limit.) + self.check_upload_size(original_headers)?; + Ok(HandlerAction::NeedsBody(PendingRequest { + operation: operation.clone(), + bucket_config: bucket_config.clone(), + original_headers: original_headers.clone(), + request_id: request_id.to_string(), + identity: ctx.identity.clone(), + })) + } + // Batch delete needs the body to read the key list and authorize + // each key individually. + S3Operation::DeleteObjects { .. } => { + if !bucket_config.is_s3_backend() { + return Err(ProxyError::NotImplemented(format!( + "batch delete not supported for '{}' backends", + bucket_config.backend_type + ))); + } + // The body is buffered whole; bound it like other uploads. The + // key count is additionally capped when the body is parsed. + self.check_upload_size(original_headers)?; Ok(HandlerAction::NeedsBody(PendingRequest { operation: operation.clone(), bucket_config: bucket_config.clone(), original_headers: original_headers.clone(), request_id: request_id.to_string(), + identity: ctx.identity.clone(), })) } _ => Err(ProxyError::Internal("unexpected operation".into())), @@ -1028,6 +1112,118 @@ where body: ProxyResponseBody::from_bytes(raw_resp.body), }) } + + /// Execute a batch delete (`DeleteObjects`) via raw signed HTTP. + /// + /// Each key in the request body is authorized individually against the + /// caller's scopes (the earlier [`authorize`](crate::auth::authorize) check + /// only verified the caller may delete *something* in the bucket). Keys the + /// caller is not allowed to delete are reported as per-key `AccessDenied` + /// errors (S3's partial-result semantics) rather than failing the whole + /// request; the remaining keys are forwarded to the backend. + async fn execute_delete_objects( + &self, + pending: &PendingRequest, + body: Bytes, + ) -> Result { + use crate::api::delete; + + let config = &pending.bucket_config; + let bucket = pending.operation.bucket().unwrap_or_default(); + + let request = delete::DeleteRequest::parse(&body)?; + let quiet = request.quiet; + + // Partition keys by per-key authorization. + let mut allowed_backend: Vec = Vec::new(); + let mut errors: Vec = Vec::new(); + for key in request.keys() { + if self + .bucket_registry + .authorize_key(bucket, &pending.identity, Action::DeleteObject, key) + .await + { + allowed_backend.push(apply_backend_prefix(config, key)); + } else { + errors.push(delete::DeleteError { + key: key.to_string(), + code: "AccessDenied".into(), + message: "Access Denied".into(), + }); + } + } + + let mut deleted_client: Vec = Vec::new(); + + if !allowed_backend.is_empty() { + let backend_body = Bytes::from(delete::build_backend_delete_body(&allowed_backend)); + let backend_url = build_backend_url(config, &pending.operation)?; + + let mut headers = HeaderMap::new(); + headers.insert("content-type", "application/xml".parse().unwrap()); + // S3 requires a Content-MD5 (or trailing checksum) on DeleteObjects. + headers.insert( + "content-md5", + content_md5(&backend_body) + .parse() + .map_err(|_| ProxyError::Internal("invalid content-md5 header".into()))?, + ); + headers.insert(http::header::USER_AGENT, self.user_agent.parse().unwrap()); + + let payload_hash = hash_payload(&backend_body); + sign_s3_request( + &Method::POST, + &backend_url, + &mut headers, + config, + &payload_hash, + )?; + + let raw_resp = self + .backend + .send_raw(Method::POST, backend_url, headers, backend_body) + .await?; + + tracing::debug!(status = raw_resp.status, "batch delete backend response"); + + if raw_resp.status >= 300 { + return Err(ProxyError::BackendError(format!( + "backend rejected batch delete with status {}", + raw_resp.status + ))); + } + + match delete::parse_backend_result(&raw_resp.body) { + Ok(outcome) => { + for k in outcome.deleted { + deleted_client.push(strip_backend_prefix(config, &k)); + } + for mut e in outcome.errors { + e.key = strip_backend_prefix(config, &e.key); + errors.push(e); + } + } + Err(e) => { + // A 2xx with an unparseable DeleteResult is a backend + // contract violation. Surface it rather than fabricating + // success for keys whose actual fate is unknown. + tracing::error!(error = %e, "backend returned an unparseable delete result"); + return Err(ProxyError::BackendError( + "backend returned an unparseable delete result".into(), + )); + } + } + } + + let xml = delete::build_delete_result(&deleted_client, &errors, quiet); + let mut resp_headers = HeaderMap::new(); + resp_headers.insert("content-type", "application/xml".parse().unwrap()); + Ok(ProxyResult { + status: 200, + headers: resp_headers, + body: ProxyResponseBody::from_bytes(Bytes::from(xml)), + }) + } } impl Dispatch for ProxyGateway @@ -1070,23 +1266,58 @@ fn error_response(err: &ProxyError, resource: &str, request_id: &str, debug: boo /// Build an object_store Path from a bucket config and client-visible key. fn build_object_path(config: &BucketConfig, key: &str) -> object_store::path::Path { + object_store::path::Path::from(apply_backend_prefix(config, key)) +} + +/// Parse the declared `Content-Length` header as a byte count, if present and valid. +fn content_length(headers: &HeaderMap) -> Option { + headers + .get(http::header::CONTENT_LENGTH) + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse::().ok()) +} + +/// Map a client-visible key into the backend key space by prepending +/// `backend_prefix`. +fn apply_backend_prefix(config: &BucketConfig, key: &str) -> String { match &config.backend_prefix { Some(prefix) => { let p = prefix.trim_end_matches('/'); if p.is_empty() { - object_store::path::Path::from(key) + key.to_string() } else { - let mut full_key = String::with_capacity(p.len() + 1 + key.len()); - full_key.push_str(p); - full_key.push('/'); - full_key.push_str(key); - object_store::path::Path::from(full_key) + format!("{p}/{key}") } } - None => object_store::path::Path::from(key), + None => key.to_string(), } } +/// Strip `backend_prefix` from a backend key to recover the client-visible key. +fn strip_backend_prefix(config: &BucketConfig, key: &str) -> String { + match &config.backend_prefix { + Some(prefix) => { + let p = prefix.trim_end_matches('/'); + if p.is_empty() { + return key.to_string(); + } + // Strip `{p}/` without allocating a pattern string (runs per key). + key.strip_prefix(p) + .and_then(|rest| rest.strip_prefix('/')) + .unwrap_or(key) + .to_string() + } + None => key.to_string(), + } +} + +/// Compute the base64-encoded MD5 of `body` for the `Content-MD5` header. +fn content_md5(body: &[u8]) -> String { + use base64::Engine; + use md5::{Digest, Md5}; + base64::engine::general_purpose::STANDARD.encode(Md5::digest(body)) +} + #[cfg(test)] mod tests { use super::*; @@ -1383,6 +1614,90 @@ mod tests { }); } + // -- Max upload size (EntityTooLarge) ------------------------------------ + + #[test] + fn put_over_max_body_size_is_rejected() { + run(async { + let gw = gateway().with_max_request_body_size(1024); + let mut headers = HeaderMap::new(); + headers.insert("content-length", "2048".parse().unwrap()); + let action = gw + .resolve_request(Method::PUT, "/test-bucket/big.bin", None, &headers, None) + .await; + match action { + HandlerAction::Response(r) => assert_eq!( + r.status, 400, + "oversized PUT should be rejected with EntityTooLarge (400)" + ), + other => panic!( + "expected Response, got {:?}", + std::mem::discriminant(&other) + ), + } + }); + } + + #[test] + fn put_under_max_body_size_forwards() { + run(async { + let gw = gateway().with_max_request_body_size(1_000_000); + let mut headers = HeaderMap::new(); + headers.insert("content-length", "1024".parse().unwrap()); + let action = gw + .resolve_request(Method::PUT, "/test-bucket/ok.bin", None, &headers, None) + .await; + assert!( + matches!(action, HandlerAction::Forward(_)), + "PUT within the limit should forward" + ); + }); + } + + #[test] + fn put_with_no_limit_forwards_large_body() { + run(async { + let gw = gateway(); // default: no proxy-enforced limit + let mut headers = HeaderMap::new(); + headers.insert("content-length", "999999999".parse().unwrap()); + let action = gw + .resolve_request(Method::PUT, "/test-bucket/huge.bin", None, &headers, None) + .await; + assert!( + matches!(action, HandlerAction::Forward(_)), + "with no limit configured, large PUT should still forward" + ); + }); + } + + #[test] + fn upload_part_over_max_body_size_is_rejected() { + run(async { + let gw = gateway().with_max_request_body_size(1024); + let mut headers = HeaderMap::new(); + headers.insert("content-length", "5000".parse().unwrap()); + let action = gw + .resolve_request( + Method::PUT, + "/test-bucket/key.bin", + Some("partNumber=1&uploadId=abc"), + &headers, + None, + ) + .await; + match action { + HandlerAction::Response(r) => assert_eq!( + r.status, 400, + "oversized UploadPart should be rejected with EntityTooLarge (400)" + ), + other => panic!( + "expected Response, got {:?}", + std::mem::discriminant(&other) + ), + } + }); + } + // -- Middleware test types ----------------------------------------------- struct BlockMiddleware; @@ -1581,4 +1896,159 @@ mod tests { ); }); } + + // -- Batch delete (DeleteObjects) ----------------------------------------- + + /// Backend that captures the forwarded delete body and returns a canned + /// `DeleteResult` marking `allowed/a.txt` deleted. + #[derive(Clone)] + struct DeleteMockBackend { + captured: Arc>>, + } + + impl ProxyBackend for DeleteMockBackend { + type ResponseBody = (); + type Body = (); + + async fn forward( + &self, + _request: ForwardRequest, + _body: (), + ) -> Result, ProxyError> { + unimplemented!() + } + + fn create_paginated_store( + &self, + _config: &BucketConfig, + ) -> Result, ProxyError> { + unimplemented!() + } + + fn create_signer(&self, config: &BucketConfig) -> Result, ProxyError> { + crate::backend::build_signer(config) + } + + async fn send_raw( + &self, + _method: http::Method, + _url: String, + _headers: HeaderMap, + body: Bytes, + ) -> Result { + *self.captured.lock().unwrap() = Some(body); + Ok(RawResponse { + status: 200, + headers: HeaderMap::new(), + body: Bytes::from_static( + b"allowed/a.txt", + ), + }) + } + } + + #[test] + fn batch_delete_filters_unauthorized_keys_per_key() { + use crate::types::{AccessScope, AuthenticatedIdentity}; + run(async { + let captured = Arc::new(std::sync::Mutex::new(None)); + let backend = DeleteMockBackend { + captured: captured.clone(), + }; + let gw = ProxyGateway::new(backend, MockRegistry, MockCreds, None); + + let identity = ResolvedIdentity::Authenticated(AuthenticatedIdentity { + principal_name: "tester".into(), + allowed_scopes: vec![AccessScope { + bucket: "test-bucket".into(), + prefixes: vec!["allowed/".into()], + actions: vec![Action::DeleteObject], + }], + }); + + let pending = PendingRequest { + operation: S3Operation::DeleteObjects { + bucket: "test-bucket".into(), + }, + bucket_config: test_bucket_config("test-bucket"), + original_headers: HeaderMap::new(), + request_id: "rid".into(), + identity, + }; + + let body = Bytes::from_static( + br#"allowed/a.txtdenied/b.txt"#, + ); + + let result = gw.handle_with_body(pending, body).await; + assert_eq!(result.status, 200); + + let xml = match result.body { + ProxyResponseBody::Bytes(b) => String::from_utf8(b.to_vec()).unwrap(), + ProxyResponseBody::Empty => panic!("expected a body"), + }; + // Authorized key deleted; unauthorized key reported as AccessDenied. + assert!( + xml.contains("allowed/a.txt"), + "{xml}" + ); + assert!(xml.contains("denied/b.txt"), "{xml}"); + assert!(xml.contains("AccessDenied"), "{xml}"); + + // The denied key must never be forwarded to the backend. + let sent = captured + .lock() + .unwrap() + .clone() + .expect("backend was called"); + let sent = String::from_utf8(sent.to_vec()).unwrap(); + assert!(sent.contains("allowed/a.txt"), "forwarded body: {sent}"); + assert!( + !sent.contains("denied/b.txt"), + "denied key leaked to backend: {sent}" + ); + }); + } + + #[test] + fn batch_delete_all_denied_skips_backend() { + use crate::types::{AccessScope, AuthenticatedIdentity}; + run(async { + let captured = Arc::new(std::sync::Mutex::new(None)); + let backend = DeleteMockBackend { + captured: captured.clone(), + }; + let gw = ProxyGateway::new(backend, MockRegistry, MockCreds, None); + + // Scope grants only a different prefix → every requested key is denied. + let identity = ResolvedIdentity::Authenticated(AuthenticatedIdentity { + principal_name: "tester".into(), + allowed_scopes: vec![AccessScope { + bucket: "test-bucket".into(), + prefixes: vec!["other/".into()], + actions: vec![Action::DeleteObject], + }], + }); + + let pending = PendingRequest { + operation: S3Operation::DeleteObjects { + bucket: "test-bucket".into(), + }, + bucket_config: test_bucket_config("test-bucket"), + original_headers: HeaderMap::new(), + request_id: "rid".into(), + identity, + }; + + let body = + Bytes::from_static(br#"secret/a.txt"#); + let result = gw.handle_with_body(pending, body).await; + assert_eq!(result.status, 200); + // Backend must not be contacted when nothing is authorized. + assert!( + captured.lock().unwrap().is_none(), + "backend should be skipped" + ); + }); + } } diff --git a/crates/core/src/registry/bucket.rs b/crates/core/src/registry/bucket.rs index 7e85e9f..85162c4 100644 --- a/crates/core/src/registry/bucket.rs +++ b/crates/core/src/registry/bucket.rs @@ -4,7 +4,7 @@ use crate::api::list_rewrite::ListRewrite; use crate::api::response::BucketEntry; use crate::error::ProxyError; use crate::maybe_send::{MaybeSend, MaybeSync}; -use crate::types::{BucketConfig, BucketOwner, ResolvedIdentity, S3Operation}; +use crate::types::{Action, BucketConfig, BucketOwner, ResolvedIdentity, S3Operation}; use std::future::Future; /// Default owner name used in `ListAllMyBucketsResult` responses. @@ -51,6 +51,27 @@ pub trait BucketRegistry: Clone + MaybeSend + MaybeSync + 'static { identity: &ResolvedIdentity, ) -> impl Future, ProxyError>> + MaybeSend; + /// Authorize a single key for a batch operation such as + /// [`DeleteObjects`](crate::types::S3Operation::DeleteObjects). + /// + /// Called per key by the gateway *after* the coarse [`get_bucket`](Self::get_bucket) + /// authorization, once the keys — which live in the request body — are known. + /// The default enforces the caller's scope grants via + /// [`key_authorized`](crate::auth::key_authorized), matching the + /// coarse-path scope model. Registries that authorize through a different + /// mechanism (e.g. an external permission API consulted in `get_bucket`) + /// can override this to apply their own per-key policy — returning `true` + /// when the bucket-level decision already covers every key. + fn authorize_key( + &self, + name: &str, + identity: &ResolvedIdentity, + action: Action, + key: &str, + ) -> impl Future + MaybeSend { + async move { crate::auth::key_authorized(identity, name, action, key) } + } + /// The owner identity returned in `ListAllMyBucketsResult` responses. /// /// Defaults to `("multistore-proxy", "multistore-proxy")`. diff --git a/crates/core/src/route_handler.rs b/crates/core/src/route_handler.rs index d18d5b9..a7e9e62 100644 --- a/crates/core/src/route_handler.rs +++ b/crates/core/src/route_handler.rs @@ -97,12 +97,16 @@ impl ProxyResult { } } -/// Opaque state for a multipart operation that needs the request body. +/// Opaque state for an operation that needs the request body before it can be +/// completed (multipart operations and batch delete). pub struct PendingRequest { pub(crate) operation: crate::types::S3Operation, pub(crate) bucket_config: crate::types::BucketConfig, pub(crate) original_headers: HeaderMap, pub(crate) request_id: String, + /// The resolved caller identity, needed for per-key authorization of batch + /// operations (e.g. `DeleteObjects`). Unused by multipart operations. + pub(crate) identity: crate::types::ResolvedIdentity, } /// Response headers that must NOT be forwarded to clients. diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index 64a33e6..fded064 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -99,8 +99,10 @@ impl BucketConfig { } } - /// Whether this backend supports S3-style multipart uploads via raw HTTP. - pub fn supports_s3_multipart(&self) -> bool { + /// Whether this is an S3 backend. Operations that go through raw signed + /// HTTP rather than presigned URLs — multipart uploads and batch delete — + /// are gated on this. + pub fn is_s3_backend(&self) -> bool { matches!(self.parsed_backend_type(), Some(BackendType::S3)) } @@ -346,6 +348,12 @@ pub enum S3Operation { bucket: String, key: String, }, + /// Batch delete (`POST /{bucket}?delete`). The keys to delete live in the + /// request body, so this operation carries only the bucket — the body is + /// parsed and each key authorized individually once it arrives. + DeleteObjects { + bucket: String, + }, ListBucket { bucket: String, /// Raw query string from the incoming request, forwarded to the backend. @@ -370,7 +378,8 @@ impl S3Operation { http::Method::DELETE } S3Operation::CreateMultipartUpload { .. } - | S3Operation::CompleteMultipartUpload { .. } => http::Method::POST, + | S3Operation::CompleteMultipartUpload { .. } + | S3Operation::DeleteObjects { .. } => http::Method::POST, } } @@ -386,6 +395,9 @@ impl S3Operation { S3Operation::CompleteMultipartUpload { .. } => Action::CompleteMultipartUpload, S3Operation::AbortMultipartUpload { .. } => Action::AbortMultipartUpload, S3Operation::DeleteObject { .. } => Action::DeleteObject, + // Batch delete authorizes as DeleteObject; each key in the body is + // checked individually against the caller's scopes. + S3Operation::DeleteObjects { .. } => Action::DeleteObject, S3Operation::ListBuckets => Action::ListBucket, } } @@ -401,7 +413,8 @@ impl S3Operation { | S3Operation::UploadPart { bucket, .. } | S3Operation::CompleteMultipartUpload { bucket, .. } | S3Operation::AbortMultipartUpload { bucket, .. } - | S3Operation::DeleteObject { bucket, .. } => Some(bucket), + | S3Operation::DeleteObject { bucket, .. } + | S3Operation::DeleteObjects { bucket } => Some(bucket), S3Operation::ListBuckets => None, } } @@ -417,7 +430,9 @@ impl S3Operation { | S3Operation::CompleteMultipartUpload { key, .. } | S3Operation::AbortMultipartUpload { key, .. } | S3Operation::DeleteObject { key, .. } => key, - S3Operation::ListBucket { .. } | S3Operation::ListBuckets => "", + S3Operation::ListBucket { .. } + | S3Operation::ListBuckets + | S3Operation::DeleteObjects { .. } => "", } } } diff --git a/crates/path-mapping/src/lib.rs b/crates/path-mapping/src/lib.rs index 6df3dec..2705ac1 100644 --- a/crates/path-mapping/src/lib.rs +++ b/crates/path-mapping/src/lib.rs @@ -464,6 +464,20 @@ impl BucketRegistry for MappedRegistry { self.inner.list_buckets(identity).await } + /// Forward per-key authorization to the inner registry, so an inner + /// registry's `authorize_key` override is honored (the default trait impl + /// would bypass it). Arguments pass through unchanged, matching how + /// `get_bucket` forwards the bucket name. + async fn authorize_key( + &self, + name: &str, + identity: &multistore::types::ResolvedIdentity, + action: multistore::types::Action, + key: &str, + ) -> bool { + self.inner.authorize_key(name, identity, action, key).await + } + fn bucket_owner(&self) -> multistore::types::BucketOwner { self.inner.bucket_owner() } diff --git a/docs/architecture/index.md b/docs/architecture/index.md index c1db90d..a18d61f 100644 --- a/docs/architecture/index.md +++ b/docs/architecture/index.md @@ -36,7 +36,7 @@ flowchart LR **Two-phase dispatch** — The `ProxyGateway` separates request resolution from execution. `resolve_request()` determines what to do; the runtime executes it. This keeps streaming logic in runtime-specific code where it belongs. -**Presigned URLs for streaming** — GET, HEAD, PUT, and DELETE operations use presigned URLs. The runtime forwards the request directly to the backend — no buffering, no double-handling of bodies. +**Presigned URLs for streaming** — GET, HEAD, PUT, and single-object DELETE use presigned URLs. The runtime forwards the request directly to the backend — no buffering, no double-handling of bodies. Body-bearing operations that can't be presigned (multipart and batch delete) instead use raw SigV4-signed HTTP, buffering the (small) body. **Pluggable traits** — Four trait boundaries enable customization: - `Router` / `RouteHandler` — Path-based pre-dispatch request interception (STS, OIDC discovery, custom endpoints) diff --git a/docs/architecture/request-lifecycle.md b/docs/architecture/request-lifecycle.md index a9efe38..2a99aa2 100644 --- a/docs/architecture/request-lifecycle.md +++ b/docs/architecture/request-lifecycle.md @@ -49,9 +49,9 @@ sequenceDiagram Note over Middleware: after_dispatch(CompletedRequest) Gateway-->>Runtime: GatewayResponse::Response Runtime-->>Client: Return response body - else NeedsBody (multipart) + else NeedsBody (multipart, batch delete) Gateway->>Gateway: collect_body(body) → bytes - Gateway->>Backend: Signed multipart request + Gateway->>Backend: Signed multipart / batch-delete request Backend-->>Gateway: Response Note over Middleware: after_dispatch(CompletedRequest) Gateway-->>Runtime: GatewayResponse::Response @@ -118,7 +118,7 @@ The `ProxyGateway` owns S3 request parsing, identity resolution, and bucket auth - Virtual-hosted: `GET /key` with `Host: bucket.s3.example.com` → same operation 2. **Resolve identity** via the `CredentialRegistry` — verifies SigV4 signatures against stored or sealed credentials 3. **Resolve bucket** via the `BucketRegistry` — looks up the bucket config and authorizes the caller -4. **Dispatch** the operation based on type (forward, list, or multipart) +4. **Dispatch** the operation based on type (forward, list, or body-bearing — multipart and batch delete) Custom `BucketRegistry` implementations can provide entirely different authorization logic, namespace mapping, or dynamic bucket configuration. @@ -145,18 +145,18 @@ LIST supports backend-side pagination. V2 uses `continuation-token` and `start-a ### `NeedsBody(PendingRequest)` (internal) -Used for: **CreateMultipartUpload, UploadPart, CompleteMultipartUpload, AbortMultipartUpload** +Used for: **CreateMultipartUpload, UploadPart, CompleteMultipartUpload, AbortMultipartUpload, and DeleteObjects (batch delete)** -Multipart operations need the request body (e.g., the XML body for `CompleteMultipartUpload`). When using `handle_request`, this is resolved internally — the gateway calls the `collect_body` closure provided by the runtime and returns the result as `GatewayResponse::Response`. Runtimes never see this variant. +These operations need the request body — the XML body for `CompleteMultipartUpload`, or the key list for `DeleteObjects` (each key is authorized individually once the body arrives). When using `handle_request`, this is resolved internally — the gateway calls the `collect_body` closure provided by the runtime and returns the result as `GatewayResponse::Response`. Runtimes never see this variant. For lower-level control, `ProxyGateway::handle` returns the raw three-variant `HandlerAction`, and runtimes call `handle_with_body()` themselves. > [!WARNING] -> Multipart uploads are only supported for `backend_type = "s3"`. Non-S3 backends should use single PUT requests (object_store handles chunking internally). +> Multipart uploads and batch delete are only supported for `backend_type = "s3"`. Non-S3 backends should use single PUT/DELETE requests (object_store handles chunking internally). ## Outbound User-Agent -All outbound requests to backend object stores include a `User-Agent` header identifying multistore as the caller. This applies to both presigned URL forwards (GET, HEAD, PUT, DELETE) and raw signed requests (multipart operations). +All outbound requests to backend object stores include a `User-Agent` header identifying multistore as the caller. This applies to both presigned URL forwards (GET, HEAD, PUT, single-object DELETE) and raw signed requests (multipart operations and batch delete). The default value is `multistore/{version}`, where `{version}` is derived from the crate version (`CARGO_PKG_VERSION`) — e.g. `multistore/0.4.0`. Override it via the gateway builder to include your application name: diff --git a/docs/deployment/cloudflare-workers.md b/docs/deployment/cloudflare-workers.md index 646a806..24102b6 100644 --- a/docs/deployment/cloudflare-workers.md +++ b/docs/deployment/cloudflare-workers.md @@ -101,6 +101,7 @@ HTTP 500 in production (`tests/smoke/test_federation.py` fails). |----------|----------|-------------| | `PROXY_CONFIG` | Yes | JSON config (buckets, roles, credentials) | | `VIRTUAL_HOST_DOMAIN` | No | Domain for virtual-hosted requests | +| `MAX_UPLOAD_BYTES` | No | Reject uploads whose `Content-Length` exceeds this with `EntityTooLarge` (400) instead of Cloudflare's edge `413`. Set to the plan's request-body limit, e.g. `104857600` (100 MB). See [Supported Operations](../reference/operations.md). | | `SESSION_TOKEN_KEY` | For STS | Base64-encoded 32-byte AES-256-GCM key | | `OIDC_PROVIDER_KEY` | For OIDC backend auth | PEM-encoded RSA private key | | `OIDC_PROVIDER_ISSUER` | For OIDC backend auth | Public URL for JWKS discovery | diff --git a/docs/reference/errors.md b/docs/reference/errors.md index 79bc703..927c63b 100644 --- a/docs/reference/errors.md +++ b/docs/reference/errors.md @@ -23,6 +23,9 @@ The proxy returns S3-compatible error responses in XML format: | InvalidOidcToken | 400 | `InvalidIdentityToken` | JWT validation failed (bad signature, untrusted issuer, etc.) | | RoleNotFound | 403 | `AccessDenied` | Requested role doesn't exist in config | | InvalidRequest | 400 | `InvalidRequest` | Malformed S3 request | +| MalformedXml | 400 | `MalformedXML` | Request body XML is malformed, empty, or exceeds limits (e.g. a batch delete naming no objects or more than 1000 keys) | +| NotImplemented | 501 | `NotImplemented` | Recognized but unsupported operation (e.g. server-side copy via `x-amz-copy-source`) | +| EntityTooLarge | 400 | `EntityTooLarge` | Upload `Content-Length` exceeds the configured maximum body size | | BackendError | 503 | `ServiceUnavailable` | Backend object store is unreachable or returned an error | | PreconditionFailed | 412 | `PreconditionFailed` | Conditional request failed (If-Match, etc.) | | NotModified | 304 | `NotModified` | Conditional request — content not changed | diff --git a/docs/reference/operations.md b/docs/reference/operations.md index 24f8abd..6c56e51 100644 --- a/docs/reference/operations.md +++ b/docs/reference/operations.md @@ -8,6 +8,7 @@ | HeadObject | `HEAD /{bucket}/{key}` | Forward | Get file metadata | | PutObject | `PUT /{bucket}/{key}` | Forward | Upload a file | | DeleteObject | `DELETE /{bucket}/{key}` | Forward | Delete a file | +| DeleteObjects | `POST /{bucket}?delete` | NeedsBody | Batch-delete up to 1000 keys (`aws s3 rm --recursive`, `delete_objects`) | | ListBucket | `GET /{bucket}` | Response | List objects in a bucket (ListObjectsV1 and V2) | | ListBuckets | `GET /` | Response | List all virtual buckets | | CreateMultipartUpload | `POST /{bucket}/{key}?uploads` | NeedsBody | Initiate a multipart upload | @@ -19,7 +20,15 @@ - **Forward** — A presigned URL is generated and returned to the runtime, which executes it with its native HTTP client. Bodies stream directly between client and backend without buffering. - **Response** — The handler builds a complete response (XML for LIST, error responses) and returns it. No presigned URL involved. -- **NeedsBody** — The runtime collects the request body, then the handler signs and sends the request via raw HTTP (`backend.send_raw()`). Multipart only. +- **NeedsBody** — The runtime collects the request body, then the handler signs and sends the request via raw HTTP (`backend.send_raw()`). Used by multipart and batch delete. + +### Batch delete authorization + +`DeleteObjects` carries its keys in the request body, so authorization happens in two stages: the bucket-level check confirms the caller may delete *something* in the bucket, then **each key in the body is authorized individually** against the caller's allowed prefixes. Keys the caller is not permitted to delete are returned as per-key `AccessDenied` entries in the `DeleteResult` (S3's partial-result semantics) and are never forwarded to the backend; authorized keys are deleted regardless. Anonymous callers cannot batch-delete. + +### Writes and request headers + +`PutObject` forwards the request body plus standard HTTP entity headers (`Content-Type`, `Content-Disposition`, `Content-Encoding`, `Content-Language`, `Cache-Control`, `Expires`, `Content-MD5`) to a presigned URL. `x-amz-*` headers (user metadata `x-amz-meta-*`, storage class, tagging, ACLs, SSE, and checksum headers such as `x-amz-checksum-*`) are **not** forwarded: S3 rejects unsigned `x-amz-*` headers on presigned requests, and the proxy presigns over `host` only. Supporting those headers requires a header-signing forward path — see the design note in `.plans/`. ## STS Operations @@ -41,5 +50,21 @@ Handled by OIDC discovery closures (registered on the `Router` via `OidcRouterEx ## Limitations > [!WARNING] -> - **Multipart is S3 only** — Multipart operations use raw HTTP with `S3RequestSigner` and are gated to `backend_type = "s3"`. Non-S3 backends should use single PUT requests. +> - **Multipart and batch delete are S3 only** — Both use raw HTTP with `S3RequestSigner` and are gated to `backend_type = "s3"`. Non-S3 backends should use single PUT/DELETE requests. > - **DeleteObject does not return confirmation** — The proxy forwards the DELETE and returns the backend's response status. +> - **Server-side copy is not supported** — A `PUT` carrying `x-amz-copy-source` (CopyObject / UploadPartCopy) is rejected with `501 NotImplemented` rather than silently overwriting the destination. +> - **`x-amz-*` write headers are dropped** — Object metadata, storage class, tagging, ACLs, SSE, and checksum headers on writes are not forwarded (see "Writes and request headers" above). +> - **Versioned/MFA delete is not handled** — A `?versionId=` on a delete is ignored; the current object version is deleted. + +### Upload size on the Cloudflare Workers runtime + +The Workers runtime is bounded by Cloudflare's [request-body size limit](https://developers.cloudflare.com/workers/platform/limits/#request-and-response-limits) — 100 MB on Free/Pro, 200 MB on Business, 500 MB (default) on Enterprise. This is a **hard platform limit enforced at Cloudflare's edge**: a request body larger than the plan limit is rejected with `413` before the proxy can act on it, and the proxy cannot raise it. + +Consequences and guidance: + +- **A single `PutObject` cannot exceed the plan body limit.** Upload larger objects as a **multipart upload**: each `UploadPart` is a separate request, so only the *part* size must stay under the limit. With S3's 10,000-part maximum, 100 MB parts allow objects up to ~1 TB even on Free/Pro. +- **Configure clients to chunk below the limit.** e.g. boto3 `TransferConfig(multipart_threshold=…, multipart_chunksize=…)` with a chunk size under the plan limit; aws-cli's `s3.multipart_chunksize`. +- Until [streaming `UploadPart`](https://github.com/developmentseed/multistore/issues/89) lands, parts on Workers are additionally capped by the 128 MB worker memory limit (parts are buffered in WASM). Keep `multipart_chunksize` comfortably below 100 MB. +- The native server and Lambda runtimes have their own, generally higher, limits — this constraint is specific to Workers. + +**Surfacing a clean error.** Configure the gateway with [`with_max_request_body_size`](https://docs.rs/multistore/latest/multistore/proxy/struct.ProxyGateway.html) so a body-bearing write (`PutObject`, `UploadPart`, or `DeleteObjects`) whose `Content-Length` exceeds the limit is rejected up front with S3's `EntityTooLarge` (HTTP 400) — an actionable error instead of Cloudflare's opaque `413`. The Workers example reads this from the `MAX_UPLOAD_BYTES` environment variable; set it to your plan's request-body limit (e.g. `104857600` for 100 MB). The check requires a declared `Content-Length`; unknown-length streaming requests fall through to the platform limit. diff --git a/examples/cf-workers/src/lib.rs b/examples/cf-workers/src/lib.rs index c35e20a..6514450 100644 --- a/examples/cf-workers/src/lib.rs +++ b/examples/cf-workers/src/lib.rs @@ -10,7 +10,7 @@ //! -> resolve request (ProxyGateway with static config registries) //! -> Forward: web_sys::fetch with ReadableStream passthrough (zero-copy) //! -> Response: LIST XML via object_store, errors, synthetic responses -//! -> NeedsBody: multipart operations via raw signed HTTP +//! -> NeedsBody: multipart operations and batch delete via raw signed HTTP //! ``` //! //! # Configuration @@ -93,6 +93,17 @@ async fn fetch(req: web_sys::Request, env: Env, _ctx: Context) -> Result().ok()) + { + gateway = gateway.with_max_request_body_size(max); + } + // Parse request metadata and extract the body stream (zero-copy). let (parts, js_body) = RequestParts::from_web_sys(&req).map_err(|e| worker::Error::RustError(e))?; diff --git a/examples/cf-workers/wrangler.integration.toml b/examples/cf-workers/wrangler.integration.toml index 00511e8..b17d2d0 100644 --- a/examples/cf-workers/wrangler.integration.toml +++ b/examples/cf-workers/wrangler.integration.toml @@ -14,6 +14,10 @@ command = "cargo install worker-build --version '^0.7' && worker-build --release [vars] VIRTUAL_HOST_DOMAIN = "s3.local" +# Reject uploads over 10 MiB with EntityTooLarge (kept above the part sizes the +# multipart tests use). Production should set this to the plan's request-body +# limit; see docs/reference/operations.md. +MAX_UPLOAD_BYTES = "10485760" [vars.PROXY_CONFIG] @@ -60,10 +64,23 @@ bucket = "public-data" prefixes = [] [[vars.PROXY_CONFIG.credentials.allowed_scopes]] -actions = ["get_object", "head_object", "put_object", "delete_object", "list_bucket"] +actions = ["get_object", "head_object", "put_object", "delete_object", "list_bucket", "create_multipart_upload", "upload_part", "complete_multipart_upload", "abort_multipart_upload"] bucket = "private-uploads" prefixes = [] +# Prefix-restricted credential — used to test per-key batch-delete authorization. +[[vars.PROXY_CONFIG.credentials]] +access_key_id = "AKTEST000000000002" +created_at = "2024-01-01T00:00:00Z" +enabled = true +principal_name = "integration-prefix-restricted-user" +secret_access_key = "testSecretKey00000000000000000002" + +[[vars.PROXY_CONFIG.credentials.allowed_scopes]] +actions = ["get_object", "head_object", "put_object", "delete_object", "list_bucket"] +bucket = "private-uploads" +prefixes = ["allowed/"] + # --- Roles --- [[vars.PROXY_CONFIG.roles]] @@ -81,7 +98,7 @@ bucket = "public-data" prefixes = [] [[vars.PROXY_CONFIG.roles.allowed_scopes]] -actions = ["get_object", "head_object", "put_object", "delete_object", "list_bucket"] +actions = ["get_object", "head_object", "put_object", "delete_object", "list_bucket", "create_multipart_upload", "upload_part", "complete_multipart_upload", "abort_multipart_upload"] bucket = "private-uploads" prefixes = [] diff --git a/scripts/integration-test.sh b/scripts/integration-test.sh new file mode 100755 index 0000000..f68edf1 --- /dev/null +++ b/scripts/integration-test.sh @@ -0,0 +1,97 @@ +#!/usr/bin/env bash +# +# Run the integration test suite locally — the same setup CI uses: +# MinIO (via docker compose) behind the Cloudflare Workers runtime (wrangler dev). +# +# Usage: +# ./scripts/integration-test.sh [extra pytest args] +# make test-integration +# +# MinIO lifecycle: if MinIO isn't running, this starts it and stops it again on +# exit (clean one-off run). If it's already up — e.g. you ran `docker compose +# up -d` to iterate — it's left running so repeated runs stay fast. wrangler dev +# is always stopped on exit. +# +# Prerequisites: docker, node/npx, uv (uvx), and the wasm32 Rust target +# (`rustup target add wasm32-unknown-unknown`). worker-build is installed +# automatically if missing. +set -euo pipefail + +cd "$(dirname "$0")/.." + +PORT="${PROXY_PORT:-8787}" +PROXY_URL="http://localhost:${PORT}" +WORKER_DIR="examples/cf-workers" + +# Tear down MinIO on exit only if we started it. If it was already running +# (e.g. a dev who ran `docker compose up -d` to iterate), leave it warm so +# repeated runs stay fast and we don't stop a stack we didn't start. +STARTED_MINIO=false +cleanup() { + # Kill wrangler (and the workerd children it spawns). + pkill -f "wrangler dev --config wrangler.integration.toml" 2>/dev/null || true + if [ "$STARTED_MINIO" = true ]; then + echo "==> Stopping MinIO (started by this run)" + docker compose down >/dev/null 2>&1 || true + fi +} +trap cleanup EXIT + +if curl -sf -o /dev/null http://localhost:9000/minio/health/live 2>/dev/null; then + echo "==> MinIO already running — leaving it up after the run" +else + STARTED_MINIO=true +fi + +echo "==> Starting MinIO + seeding buckets (docker compose)" +# Not `--wait`: the one-shot seeder (minio-init) exits, which `--wait` treats as +# a failure. Instead poll for a seeded public object — that confirms MinIO is up +# *and* the buckets are seeded. +docker compose up -d +for i in $(seq 1 30); do + if curl -so /dev/null "http://localhost:9000/public-data/hello.txt"; then + echo " MinIO ready and seeded" + break + fi + if [ "$i" -eq 30 ]; then + echo "ERROR: MinIO did not become ready within 30s" >&2 + exit 1 + fi + sleep 1 +done + +if ! command -v worker-build >/dev/null 2>&1; then + echo "==> Installing worker-build" + cargo install worker-build --version '^0.7' +fi + +echo "==> Building worker (WASM)" +( cd "$WORKER_DIR" && worker-build --release ) + +# wrangler dev needs SESSION_TOKEN_KEY; generate a throwaway one if absent. +if [ ! -f "$WORKER_DIR/.dev.vars" ]; then + echo "==> Writing throwaway $WORKER_DIR/.dev.vars" + echo "SESSION_TOKEN_KEY=$(openssl rand -base64 32)" > "$WORKER_DIR/.dev.vars" +fi + +echo "==> Starting wrangler dev on :${PORT}" +( cd "$WORKER_DIR" && npx wrangler dev --config wrangler.integration.toml --port "$PORT" ) \ + > /tmp/multistore-wrangler.log 2>&1 & + +echo "==> Waiting for the proxy to accept requests" +for i in $(seq 1 60); do + if curl -so /dev/null "${PROXY_URL}/"; then + echo " ready" + break + fi + if [ "$i" -eq 60 ]; then + echo "ERROR: proxy did not start within 120s; wrangler log:" >&2 + tail -20 /tmp/multistore-wrangler.log >&2 + exit 1 + fi + sleep 2 +done + +echo "==> Running integration tests" +PROXY_URL="$PROXY_URL" uvx --with pytest,boto3,requests \ + pytest tests/integration/ -v "$@" diff --git a/tests/integration/test_integration.py b/tests/integration/test_integration.py index 01071b3..d24b06c 100644 --- a/tests/integration/test_integration.py +++ b/tests/integration/test_integration.py @@ -12,13 +12,18 @@ import os import uuid import xml.etree.ElementTree as ET +from io import BytesIO import boto3 import pytest import requests +from boto3.s3.transfer import TransferConfig from botocore.config import Config from botocore.exceptions import ClientError +# S3 requires every multipart part except the last to be at least 5 MiB. +MIB = 1024 * 1024 + PROXY_URL = os.environ.get("PROXY_URL", "http://localhost:8787") @@ -135,6 +140,47 @@ def test_put_then_get_roundtrip(self): # Cleanup client.delete_object(Bucket="private-uploads", Key=key) + def test_put_larger_body_single_request(self): + """A non-trivial single PUT (streamed, not buffered) round-trips intact.""" + client = static_client() + key = f"test-large-{uuid.uuid4()}.bin" + # 2 MiB: well above the trivial happy-path size, still a single PUT + # (put_object never switches to multipart). + body = bytes((i % 251 for i in range(2 * MIB))) + + client.put_object(Bucket="private-uploads", Key=key, Body=body) + resp = client.get_object(Bucket="private-uploads", Key=key) + data = resp["Body"].read() + assert len(data) == len(body) + assert data == body + + client.delete_object(Bucket="private-uploads", Key=key) + + def test_put_preserves_content_headers(self): + """Standard entity headers set on PUT survive the round-trip. + + Exercises the widened PUT forward allowlist (Content-Type was always + forwarded; Content-Disposition / Cache-Control are new). Note: + `x-amz-meta-*` user metadata is intentionally NOT forwarded (it requires + the deferred header-signing path), so it is not asserted. + """ + client = static_client() + key = f"test-headers-{uuid.uuid4()}.txt" + client.put_object( + Bucket="private-uploads", + Key=key, + Body=b"payload with content metadata", + ContentType="application/json", + ContentDisposition='attachment; filename="report.json"', + CacheControl="max-age=3600", + ) + resp = client.get_object(Bucket="private-uploads", Key=key) + assert resp["ContentType"] == "application/json" + assert resp["ContentDisposition"] == 'attachment; filename="report.json"' + assert resp["CacheControl"] == "max-age=3600" + + client.delete_object(Bucket="private-uploads", Key=key) + def test_list_after_write(self): client = static_client() key = f"test-list-{uuid.uuid4()}.txt" @@ -170,6 +216,182 @@ def test_head_object(self): # Cleanup client.delete_object(Bucket="private-uploads", Key=key) + def test_batch_delete(self): + client = static_client() + keys = [f"test-batch-{uuid.uuid4()}.txt" for _ in range(3)] + for key in keys: + client.put_object(Bucket="private-uploads", Key=key, Body=b"batch") + + resp = client.delete_objects( + Bucket="private-uploads", + Delete={"Objects": [{"Key": k} for k in keys]}, + ) + deleted = {d["Key"] for d in resp.get("Deleted", [])} + assert deleted == set(keys), resp + assert not resp.get("Errors"), resp + + # All keys are gone. + for key in keys: + with pytest.raises(ClientError) as exc_info: + client.get_object(Bucket="private-uploads", Key=key) + assert exc_info.value.response["Error"]["Code"] in ("NoSuchKey", "404") + + def test_oversized_put_rejected_entity_too_large(self): + """A PUT exceeding MAX_UPLOAD_BYTES (10 MiB in the test config) is + rejected with EntityTooLarge rather than forwarded to the backend.""" + client = static_client() + key = f"test-toolarge-{uuid.uuid4()}.bin" + body = b"z" * (12 * MIB) # over the 10 MiB limit + with pytest.raises(ClientError) as exc_info: + client.put_object(Bucket="private-uploads", Key=key, Body=body) + err = exc_info.value.response["Error"] + assert err["Code"] == "EntityTooLarge", err + assert exc_info.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400 + + def test_batch_delete_partial_authorization(self): + """Per-key authz: a batch delete with one in-scope and one out-of-scope + key deletes the allowed one, reports the other as AccessDenied, and does + NOT delete the out-of-scope key.""" + full = static_client() # full access to private-uploads + restricted = static_client( # scoped to the "allowed/" prefix only + access_key="AKTEST000000000002", + secret_key="testSecretKey00000000000000000002", + ) + suffix = uuid.uuid4() + allowed_key = f"allowed/{suffix}.txt" + denied_key = f"denied/{suffix}.txt" + full.put_object(Bucket="private-uploads", Key=allowed_key, Body=b"a") + full.put_object(Bucket="private-uploads", Key=denied_key, Body=b"b") + + resp = restricted.delete_objects( + Bucket="private-uploads", + Delete={"Objects": [{"Key": allowed_key}, {"Key": denied_key}]}, + ) + deleted = {d["Key"] for d in resp.get("Deleted", [])} + errors = {e["Key"]: e["Code"] for e in resp.get("Errors", [])} + assert deleted == {allowed_key}, resp + assert errors == {denied_key: "AccessDenied"}, resp + + # Security property: the out-of-scope key must still exist; the in-scope + # key must be gone. + full.head_object(Bucket="private-uploads", Key=denied_key) # raises if deleted + with pytest.raises(ClientError) as exc: + full.head_object(Bucket="private-uploads", Key=allowed_key) + assert exc.value.response["ResponseMetadata"]["HTTPStatusCode"] == 404 + + full.delete_object(Bucket="private-uploads", Key=denied_key) # cleanup + + def test_copy_object_rejected(self): + """Server-side copy (x-amz-copy-source) is rejected with 501 + NotImplemented and does not create the destination.""" + client = static_client() + src = f"copy-src-{uuid.uuid4()}.txt" + dst = f"copy-dst-{uuid.uuid4()}.txt" + client.put_object(Bucket="private-uploads", Key=src, Body=b"source") + with pytest.raises(ClientError) as exc: + client.copy_object( + Bucket="private-uploads", + Key=dst, + CopySource={"Bucket": "private-uploads", "Key": src}, + ) + assert exc.value.response["Error"]["Code"] == "NotImplemented", exc.value.response + assert exc.value.response["ResponseMetadata"]["HTTPStatusCode"] == 501 + # The destination must not have been created. + with pytest.raises(ClientError): + client.head_object(Bucket="private-uploads", Key=dst) + + client.delete_object(Bucket="private-uploads", Key=src) # cleanup + + +# --------------------------------------------------------------------------- +# Multipart uploads +# --------------------------------------------------------------------------- + +class TestMultipartUploads: + """Exercise the full multipart upload path (Create/UploadPart/Complete/Abort).""" + + def test_multipart_roundtrip_high_level(self): + """boto3's transfer manager: forces Create + 2x UploadPart + Complete.""" + client = static_client() + key = f"test-multipart-{uuid.uuid4()}.bin" + # 6 MiB → two parts (5 MiB + 1 MiB) at a 5 MiB chunk size. + body = b"multipart-payload-block!" * (6 * MIB // 24 + 1) + body = body[: 6 * MIB] + config = TransferConfig( + multipart_threshold=5 * MIB, + multipart_chunksize=5 * MIB, + max_concurrency=1, + use_threads=False, + ) + + client.upload_fileobj(BytesIO(body), "private-uploads", key, Config=config) + resp = client.get_object(Bucket="private-uploads", Key=key) + assert resp["Body"].read() == body + + client.delete_object(Bucket="private-uploads", Key=key) + + def test_multipart_low_level_explicit(self): + """Drive Create/UploadPart/Complete directly and verify the round-trip.""" + client = static_client() + key = f"test-mpu-explicit-{uuid.uuid4()}.bin" + part1 = b"A" * (5 * MIB) + part2 = b"B" * (2 * MIB) + + create = client.create_multipart_upload(Bucket="private-uploads", Key=key) + upload_id = create["UploadId"] + assert upload_id + + parts = [] + for num, chunk in enumerate([part1, part2], start=1): + up = client.upload_part( + Bucket="private-uploads", + Key=key, + PartNumber=num, + UploadId=upload_id, + Body=chunk, + ) + parts.append({"PartNumber": num, "ETag": up["ETag"]}) + + client.complete_multipart_upload( + Bucket="private-uploads", + Key=key, + UploadId=upload_id, + MultipartUpload={"Parts": parts}, + ) + + resp = client.get_object(Bucket="private-uploads", Key=key) + assert resp["Body"].read() == part1 + part2 + + client.delete_object(Bucket="private-uploads", Key=key) + + def test_multipart_abort(self): + """AbortMultipartUpload tears down an in-progress upload.""" + client = static_client() + key = f"test-mpu-abort-{uuid.uuid4()}.bin" + + create = client.create_multipart_upload(Bucket="private-uploads", Key=key) + upload_id = create["UploadId"] + + client.upload_part( + Bucket="private-uploads", + Key=key, + PartNumber=1, + UploadId=upload_id, + Body=b"C" * (5 * MIB), + ) + client.abort_multipart_upload( + Bucket="private-uploads", Key=key, UploadId=upload_id + ) + + # Completing an aborted upload must fail. + with pytest.raises(ClientError): + client.complete_multipart_upload( + Bucket="private-uploads", + Key=key, + UploadId=upload_id, + MultipartUpload={"Parts": [{"PartNumber": 1, "ETag": "x"}]}, + ) + # --------------------------------------------------------------------------- # Static credential reads