diff --git a/Makefile b/Makefile index 95b4450..ddce59d 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: build fmt clippy test clean check help install-hooks lint +.PHONY: build build-debug fmt fmt-check clippy test unit test-unit check check-all check-safe clean help install-hooks install-tools setup lint run-metadata run-gateway run-scheduler run-cache run-tape .DEFAULT_GOAL := help @@ -18,10 +18,14 @@ fmt-check: clippy: @cargo clippy --all-targets --all-features -- -D warnings test: - @cargo test --all-features + @cargo test --workspace --all-features +unit test-unit: + @cargo test --workspace --lib --bins check: @cargo check --all-targets --all-features check-all: fmt-check clippy test +check-safe: fmt-check clippy unit build-debug + @echo "Safe verification passed (fmt/clippy/unit/build only)." clean: @cargo clean @@ -36,8 +40,8 @@ run-cache: run-tape: @cargo run -p coldstore-tape -lint: fmt-check clippy check test - @echo "All lint checks passed." +lint: fmt-check clippy check unit + @echo "All lint checks passed (unit-only)." install-hooks: @cp scripts/pre-commit .git/hooks/pre-commit @@ -55,6 +59,6 @@ help: @echo "ColdStore Workspace Makefile" @echo "" @echo "Build: build build-debug clean" - @echo "Quality: fmt fmt-check clippy test check check-all lint" + @echo "Quality: fmt fmt-check clippy test unit check check-all check-safe lint" @echo "Run: run-metadata run-gateway run-scheduler run-cache run-tape" @echo "Setup: setup install-tools install-hooks" diff --git a/crates/cache/Cargo.toml b/crates/cache/Cargo.toml index a2243cc..cca52ca 100644 --- a/crates/cache/Cargo.toml +++ b/crates/cache/Cargo.toml @@ -15,6 +15,7 @@ coldstore-common = { workspace = true } tokio = { workspace = true } tonic = { workspace = true } prost = { workspace = true } +prost-types = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } anyhow = { workspace = true } diff --git a/crates/cache/src/hdd.rs b/crates/cache/src/hdd.rs index 7738174..81d74ec 100644 --- a/crates/cache/src/hdd.rs +++ b/crates/cache/src/hdd.rs @@ -17,10 +17,11 @@ impl HddBackend { fs::create_dir_all(base.join("staging")).await?; fs::create_dir_all(base.join("restored")).await?; fs::create_dir_all(base.join("meta")).await?; + let next_id = discover_next_id(&base).await?; Ok(Self { base_path: base, max_size_bytes: max_size_gb * 1024 * 1024 * 1024, - next_id: AtomicU64::new(1), + next_id: AtomicU64::new(next_id), }) } @@ -134,6 +135,27 @@ impl CacheBackend for HddBackend { } async fn available_bytes(&self) -> Result { - Ok(self.max_size_bytes) + let used_bytes: u64 = self + .list_all() + .await? + .into_iter() + .map(|(_, x)| x.size) + .sum(); + Ok(self.max_size_bytes.saturating_sub(used_bytes)) } } + +async fn discover_next_id(base: &std::path::Path) -> Result { + let mut max_id = 0_u64; + let mut rd = fs::read_dir(base.join("meta")).await?; + while let Some(entry) = rd.next_entry().await? { + let name = entry.file_name(); + let name = name.to_string_lossy(); + if let Some(raw) = name.strip_suffix(".json") { + if let Ok(id) = raw.parse::() { + max_id = max_id.max(id); + } + } + } + Ok(max_id + 1) +} diff --git a/crates/cache/src/service.rs b/crates/cache/src/service.rs index 3a538c9..a92cd04 100644 --- a/crates/cache/src/service.rs +++ b/crates/cache/src/service.rs @@ -1,30 +1,269 @@ -use crate::backend::CacheBackend; +use crate::backend::{CacheBackend, CacheCategory, CacheXattrs}; use crate::hdd::HddBackend; use anyhow::Result; use coldstore_common::config::{CacheBackendConfig, CacheConfig}; use coldstore_proto::cache::cache_service_server::CacheService; use coldstore_proto::cache::*; +use prost_types::Timestamp; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::{mpsc, RwLock}; +use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Status, Streaming}; +const STREAM_CHUNK_SIZE: usize = 64 * 1024; + +#[derive(Debug, Clone, Eq, PartialEq, Hash)] +struct CacheKey { + bucket: String, + key: String, + version_id: Option, +} + +impl CacheKey { + fn new(bucket: String, key: String, version_id: Option) -> Self { + Self { + bucket, + key, + version_id, + } + } + + fn as_cursor(&self) -> String { + format!( + "{}\u{0}{}\u{0}{}", + self.bucket, + self.key, + self.version_id.clone().unwrap_or_default() + ) + } +} + +#[derive(Debug, Clone)] +struct StoredEntry { + storage_id: u64, + xattrs: CacheXattrs, +} + +#[derive(Default)] +struct CacheIndex { + staging: HashMap, + restored: HashMap, + hit_count: u64, + miss_count: u64, + evict_count: u64, + evict_bytes: u64, +} + pub struct CacheServiceImpl { - _backend: Box, + backend: Arc, _config: CacheConfig, + index: Arc>, } impl CacheServiceImpl { pub async fn new(config: &CacheConfig) -> Result { - let backend: Box = match &config.backend { + let backend: Arc = match &config.backend { CacheBackendConfig::Hdd { path, max_size_gb } => { - Box::new(HddBackend::new(path.clone(), *max_size_gb).await?) + Arc::new(HddBackend::new(path.clone(), *max_size_gb).await?) } CacheBackendConfig::Spdk { .. } => { anyhow::bail!("SPDK backend not yet implemented") } }; - Ok(Self { - _backend: backend, + + let svc = Self { + backend, _config: config.clone(), - }) + index: Arc::new(RwLock::new(CacheIndex::default())), + }; + svc.rebuild_index().await?; + Ok(svc) + } + + async fn rebuild_index(&self) -> Result<()> { + let mut index = CacheIndex::default(); + for (storage_id, xattrs) in self.backend.list_all().await? { + let key = CacheKey::new( + xattrs.bucket.clone(), + xattrs.key.clone(), + xattrs.version_id.clone(), + ); + let entry = StoredEntry { storage_id, xattrs }; + match entry.xattrs.category { + CacheCategory::Staging => { + index.staging.insert(key, entry); + } + CacheCategory::Restored => { + index.restored.insert(key, entry); + } + } + } + *self.index.write().await = index; + Ok(()) + } + + async fn remove_existing(&self, key: &CacheKey, category: CacheCategory) -> Result<()> { + let existing = { + let index = self.index.read().await; + match category { + CacheCategory::Staging => index.staging.get(key).cloned(), + CacheCategory::Restored => index.restored.get(key).cloned(), + } + }; + + if let Some(existing) = existing { + self.backend.delete(existing.storage_id).await?; + let mut index = self.index.write().await; + match category { + CacheCategory::Staging => { + index.staging.remove(key); + } + CacheCategory::Restored => { + index.restored.remove(key); + } + } + index.evict_count += 1; + index.evict_bytes += existing.xattrs.size; + } + Ok(()) + } + + async fn find_entry(&self, key: &CacheKey, category: CacheCategory) -> Option { + let index = self.index.read().await; + match category { + CacheCategory::Staging => index.staging.get(key).cloned(), + CacheCategory::Restored => index.restored.get(key).cloned(), + } + } + + async fn insert_entry(&self, key: CacheKey, entry: StoredEntry) { + let mut index = self.index.write().await; + match entry.xattrs.category { + CacheCategory::Staging => { + index.staging.insert(key, entry); + } + CacheCategory::Restored => { + index.restored.insert(key, entry); + } + } + } + + async fn update_hit_state(&self, hit: bool) { + let mut index = self.index.write().await; + if hit { + index.hit_count += 1; + } else { + index.miss_count += 1; + } + } + + async fn delete_entry(&self, key: &CacheKey, category: CacheCategory) -> Result { + let removed = { + let mut index = self.index.write().await; + match category { + CacheCategory::Staging => index.staging.remove(key), + CacheCategory::Restored => index.restored.remove(key), + } + }; + + if let Some(entry) = removed { + self.backend.delete(entry.storage_id).await?; + Ok(true) + } else { + Ok(false) + } + } + + async fn put_bytes(&self, key: CacheKey, data: Vec, xattrs: CacheXattrs) -> Result { + self.remove_existing(&key, xattrs.category).await?; + let storage_id = self.backend.write(&key.as_cursor(), &data, &xattrs).await?; + self.insert_entry(key, StoredEntry { storage_id, xattrs }) + .await; + Ok(storage_id) + } + + async fn read_restored(&self, key: &CacheKey) -> Result { + let Some(entry) = self.find_entry(key, CacheCategory::Restored).await else { + self.update_hit_state(false).await; + return Err(Status::not_found("restored object not found in cache")); + }; + + if is_expired(entry.xattrs.expire_at) { + let _ = self.delete_entry(key, CacheCategory::Restored).await; + self.update_hit_state(false).await; + return Err(Status::not_found("restored object has expired")); + } + + self.update_hit_state(true).await; + Ok(entry) + } + + async fn build_get_stream( + &self, + entry: StoredEntry, + ) -> Result>>, Status> { + let data = self + .backend + .read(entry.storage_id) + .await + .map_err(internal_status)?; + let (tx, rx) = mpsc::channel(8); + tokio::spawn(async move { + let meta = GetResponse { + payload: Some(get_response::Payload::Meta(CachedObjectMeta { + size: entry.xattrs.size, + expire_at: Some(timestamp_from_unix(entry.xattrs.expire_at)), + content_type: entry.xattrs.content_type.clone(), + etag: entry.xattrs.etag.clone(), + checksum: entry.xattrs.checksum.clone(), + })), + }; + let _ = tx.send(Ok(meta)).await; + for chunk in data.chunks(STREAM_CHUNK_SIZE) { + let _ = tx + .send(Ok(GetResponse { + payload: Some(get_response::Payload::Data(chunk.to_vec())), + })) + .await; + } + }); + Ok(Response::new(ReceiverStream::new(rx))) + } + + async fn build_staging_stream( + &self, + entry: StoredEntry, + ) -> Result>>, Status> { + let data = self + .backend + .read(entry.storage_id) + .await + .map_err(internal_status)?; + let (tx, rx) = mpsc::channel(8); + tokio::spawn(async move { + let meta = GetStagingResponse { + payload: Some(get_staging_response::Payload::Meta(StagingObjectMeta { + bucket: entry.xattrs.bucket.clone(), + key: entry.xattrs.key.clone(), + version_id: entry.xattrs.version_id.clone(), + size: entry.xattrs.size, + checksum: entry.xattrs.checksum.clone(), + content_type: entry.xattrs.content_type.clone(), + etag: entry.xattrs.etag.clone(), + staged_at: Some(timestamp_from_unix(entry.xattrs.cached_at)), + })), + }; + let _ = tx.send(Ok(meta)).await; + for chunk in data.chunks(STREAM_CHUNK_SIZE) { + let _ = tx + .send(Ok(GetStagingResponse { + payload: Some(get_staging_response::Payload::Data(chunk.to_vec())), + })) + .await; + } + }); + Ok(Response::new(ReceiverStream::new(rx))) } } @@ -32,60 +271,405 @@ impl CacheServiceImpl { impl CacheService for CacheServiceImpl { async fn put_staging( &self, - _req: Request>, + req: Request>, ) -> std::result::Result, Status> { - todo!() + let mut stream = req.into_inner(); + let mut meta: Option = None; + let mut data = Vec::new(); + + while let Some(chunk) = stream.message().await? { + match chunk.payload { + Some(put_staging_request::Payload::Meta(m)) => meta = Some(m), + Some(put_staging_request::Payload::Data(bytes)) => data.extend_from_slice(&bytes), + None => return Err(Status::invalid_argument("empty put_staging chunk")), + } + } + + let meta = meta.ok_or_else(|| Status::invalid_argument("missing staging metadata"))?; + if meta.size != data.len() as u64 { + return Err(Status::invalid_argument( + "staging object size does not match payload", + )); + } + + let key = CacheKey::new( + meta.bucket.clone(), + meta.key.clone(), + meta.version_id.clone(), + ); + let xattrs = CacheXattrs { + bucket: meta.bucket, + key: meta.key, + version_id: meta.version_id, + size: meta.size, + expire_at: 0, + cached_at: now_unix(), + checksum: meta.checksum, + content_type: meta.content_type, + etag: meta.etag, + category: CacheCategory::Staging, + }; + let storage_id = self + .put_bytes(key, data, xattrs) + .await + .map_err(internal_status)?; + + Ok(Response::new(PutStagingResponse { + staging_id: storage_id.to_string(), + })) } + async fn put_restored( &self, - _req: Request>, + req: Request>, ) -> std::result::Result, Status> { - todo!() + let mut stream = req.into_inner(); + let mut meta: Option = None; + let mut data = Vec::new(); + + while let Some(chunk) = stream.message().await? { + match chunk.payload { + Some(put_restored_request::Payload::Meta(m)) => meta = Some(m), + Some(put_restored_request::Payload::Data(bytes)) => data.extend_from_slice(&bytes), + None => return Err(Status::invalid_argument("empty put_restored chunk")), + } + } + + let meta = meta.ok_or_else(|| Status::invalid_argument("missing restored metadata"))?; + if meta.size != data.len() as u64 { + return Err(Status::invalid_argument( + "restored object size does not match payload", + )); + } + let expire_at = meta + .expire_at + .ok_or_else(|| Status::invalid_argument("restored object missing expire_at"))?; + + let key = CacheKey::new( + meta.bucket.clone(), + meta.key.clone(), + meta.version_id.clone(), + ); + let xattrs = CacheXattrs { + bucket: meta.bucket, + key: meta.key, + version_id: meta.version_id, + size: meta.size, + expire_at: expire_at.seconds, + cached_at: now_unix(), + checksum: meta.checksum, + content_type: meta.content_type, + etag: meta.etag, + category: CacheCategory::Restored, + }; + self.put_bytes(key, data, xattrs) + .await + .map_err(internal_status)?; + + Ok(Response::new(())) } + async fn delete( &self, - _req: Request, + req: Request, ) -> std::result::Result, Status> { - todo!() + let req = req.into_inner(); + let key = CacheKey::new(req.bucket, req.key, req.version_id); + self.delete_entry(&key, CacheCategory::Restored) + .await + .map_err(internal_status)?; + Ok(Response::new(())) } - type GetStream = tokio_stream::wrappers::ReceiverStream>; + type GetStream = ReceiverStream>; + async fn get( &self, - _req: Request, + req: Request, ) -> std::result::Result, Status> { - todo!() + let req = req.into_inner(); + let key = CacheKey::new(req.bucket, req.key, req.version_id); + let entry = self.read_restored(&key).await?; + self.build_get_stream(entry).await } async fn contains( &self, - _req: Request, + req: Request, ) -> std::result::Result, Status> { - todo!() + let req = req.into_inner(); + let key = CacheKey::new(req.bucket, req.key, req.version_id); + let exists = self.find_entry(&key, CacheCategory::Restored).await; + let response = if let Some(entry) = exists { + if is_expired(entry.xattrs.expire_at) { + let _ = self.delete_entry(&key, CacheCategory::Restored).await; + self.update_hit_state(false).await; + ContainsResponse { + exists: false, + expire_at: None, + } + } else { + self.update_hit_state(true).await; + ContainsResponse { + exists: true, + expire_at: Some(timestamp_from_unix(entry.xattrs.expire_at)), + } + } + } else { + self.update_hit_state(false).await; + ContainsResponse { + exists: false, + expire_at: None, + } + }; + Ok(Response::new(response)) } - type GetStagingStream = - tokio_stream::wrappers::ReceiverStream>; + type GetStagingStream = ReceiverStream>; + async fn get_staging( &self, - _req: Request, + req: Request, ) -> std::result::Result, Status> { - todo!() + let req = req.into_inner(); + let key = CacheKey::new(req.bucket, req.key, req.version_id); + let Some(entry) = self.find_entry(&key, CacheCategory::Staging).await else { + return Err(Status::not_found("staging object not found")); + }; + self.build_staging_stream(entry).await } async fn list_staging_keys( &self, - _req: Request, + req: Request, ) -> std::result::Result, Status> { - todo!() + let req = req.into_inner(); + let after = req.after.unwrap_or_default(); + let limit = if req.limit == 0 { + usize::MAX + } else { + req.limit as usize + }; + + let index = self.index.read().await; + let mut entries: Vec<_> = index + .staging + .iter() + .filter(|(key, _)| key.as_cursor() > after) + .map(|(key, entry)| (key.clone(), entry.clone())) + .collect(); + entries.sort_by(|(a, _), (b, _)| a.as_cursor().cmp(&b.as_cursor())); + + let has_more = entries.len() > limit; + let response_entries = entries + .into_iter() + .take(limit) + .map(|(key, entry)| StagingKeyEntry { + bucket: key.bucket, + key: key.key, + version_id: key.version_id, + size: entry.xattrs.size, + staged_at: Some(timestamp_from_unix(entry.xattrs.cached_at)), + }) + .collect(); + + Ok(Response::new(ListStagingKeysResponse { + entries: response_entries, + has_more, + })) } + async fn delete_staging( &self, - _req: Request, + req: Request, ) -> std::result::Result, Status> { - todo!() + let req = req.into_inner(); + let key = CacheKey::new(req.bucket, req.key, req.version_id); + self.delete_entry(&key, CacheCategory::Staging) + .await + .map_err(internal_status)?; + Ok(Response::new(())) } + async fn stats(&self, _req: Request<()>) -> std::result::Result, Status> { - todo!() + let available = self + .backend + .available_bytes() + .await + .map_err(internal_status)?; + let index = self.index.read().await; + let staging_bytes: u64 = index.staging.values().map(|entry| entry.xattrs.size).sum(); + let restored_bytes: u64 = index.restored.values().map(|entry| entry.xattrs.size).sum(); + let used_capacity = staging_bytes + restored_bytes; + let total_capacity = used_capacity + available; + + Ok(Response::new(CacheStats { + total_capacity, + used_capacity, + object_count: (index.staging.len() + index.restored.len()) as u64, + staging_count: index.staging.len() as u64, + staging_bytes, + restored_count: index.restored.len() as u64, + restored_bytes, + hit_count: index.hit_count, + miss_count: index.miss_count, + evict_count: index.evict_count, + evict_bytes: index.evict_bytes, + })) + } +} + +fn now_unix() -> i64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("system clock before unix epoch") + .as_secs() as i64 +} + +fn timestamp_from_unix(seconds: i64) -> Timestamp { + Timestamp { seconds, nanos: 0 } +} + +fn is_expired(expire_at: i64) -> bool { + expire_at > 0 && expire_at <= now_unix() +} + +fn internal_status(err: anyhow::Error) -> Status { + Status::internal(err.to_string()) +} + +#[cfg(test)] +mod tests { + use super::*; + use coldstore_proto::cache::ContainsRequest; + use std::time::{SystemTime, UNIX_EPOCH}; + use tokio_stream::StreamExt; + + fn test_config() -> CacheConfig { + let unique = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("time") + .as_nanos(); + CacheConfig { + backend: CacheBackendConfig::Hdd { + path: format!("/tmp/coldstore-cache-test-{unique}"), + max_size_gb: 1, + }, + ..CacheConfig::default() + } + } + + #[tokio::test] + async fn empty_cache_reports_miss() { + let svc = CacheServiceImpl::new(&test_config()) + .await + .expect("service init"); + let response = svc + .contains(Request::new(ContainsRequest { + bucket: "docs".into(), + key: "readme.txt".into(), + version_id: None, + })) + .await + .expect("contains should return response") + .into_inner(); + + assert!(!response.exists); + } + + #[tokio::test] + async fn restored_object_round_trip_streams_data() { + let svc = CacheServiceImpl::new(&test_config()) + .await + .expect("service init"); + let expires_at = now_unix() + 3600; + let key = CacheKey::new("docs".into(), "guide.txt".into(), Some("v1".into())); + let xattrs = CacheXattrs { + bucket: "docs".into(), + key: "guide.txt".into(), + version_id: Some("v1".into()), + size: 11, + expire_at: expires_at, + cached_at: now_unix(), + checksum: Some("sum".into()), + content_type: Some("text/plain".into()), + etag: Some("etag-1".into()), + category: CacheCategory::Restored, + }; + svc.put_bytes(key, b"hello world".to_vec(), xattrs) + .await + .expect("put restored should succeed"); + + let contains = svc + .contains(Request::new(ContainsRequest { + bucket: "docs".into(), + key: "guide.txt".into(), + version_id: Some("v1".into()), + })) + .await + .expect("contains should succeed") + .into_inner(); + assert!(contains.exists); + + let mut stream = svc + .get(Request::new(GetRequest { + bucket: "docs".into(), + key: "guide.txt".into(), + version_id: Some("v1".into()), + })) + .await + .expect("get should succeed") + .into_inner(); + + let first = stream.next().await.expect("meta chunk").expect("meta ok"); + match first.payload { + Some(get_response::Payload::Meta(meta)) => { + assert_eq!(meta.size, 11); + assert_eq!(meta.etag.as_deref(), Some("etag-1")); + } + other => panic!("unexpected first payload: {other:?}"), + } + + let second = stream.next().await.expect("data chunk").expect("data ok"); + match second.payload { + Some(get_response::Payload::Data(bytes)) => assert_eq!(bytes, b"hello world"), + other => panic!("unexpected second payload: {other:?}"), + } + } + + #[tokio::test] + async fn staging_keys_are_listed() { + let svc = CacheServiceImpl::new(&test_config()) + .await + .expect("service init"); + let key = CacheKey::new("docs".into(), "draft.txt".into(), None); + let xattrs = CacheXattrs { + bucket: "docs".into(), + key: "draft.txt".into(), + version_id: None, + size: 5, + expire_at: 0, + cached_at: now_unix(), + checksum: None, + content_type: None, + etag: Some("etag-2".into()), + category: CacheCategory::Staging, + }; + svc.put_bytes(key, b"draft".to_vec(), xattrs) + .await + .expect("put staging should succeed"); + + let listed = svc + .list_staging_keys(Request::new(ListStagingKeysRequest { + limit: 10, + after: None, + })) + .await + .expect("list staging should succeed") + .into_inner(); + + assert_eq!(listed.entries.len(), 1); + assert_eq!(listed.entries[0].bucket, "docs"); + assert_eq!(listed.entries[0].key, "draft.txt"); } } diff --git a/crates/gateway/Cargo.toml b/crates/gateway/Cargo.toml index a578dd2..1f66410 100644 --- a/crates/gateway/Cargo.toml +++ b/crates/gateway/Cargo.toml @@ -16,7 +16,7 @@ tokio = { workspace = true } tonic = { workspace = true } prost = { workspace = true } axum = { workspace = true } -tower = { workspace = true } +tower = { workspace = true, features = ["util"] } tower-http = { workspace = true } http = { workspace = true } serde = { workspace = true } @@ -28,3 +28,4 @@ chrono = { workspace = true } uuid = { workspace = true } config = { workspace = true } sha2 = { workspace = true } +tokio-stream = { workspace = true } diff --git a/crates/gateway/src/handler.rs b/crates/gateway/src/handler.rs index a072ca8..55698c7 100644 --- a/crates/gateway/src/handler.rs +++ b/crates/gateway/src/handler.rs @@ -1,24 +1,678 @@ -use crate::GatewayState; +use crate::protocol::{format_restore_header, is_restore_request, S3ErrorCode, S3ErrorResponse}; +use crate::{DownloadedObject, GatewayState}; +use axum::body::{Body, Bytes}; +use axum::extract::{Path, Query, State}; +use axum::http::{header::HeaderName, HeaderMap, HeaderValue, StatusCode}; +use axum::response::Response; use axum::{routing::get, Router}; +use std::collections::HashMap; use std::sync::Arc; pub fn router(state: Arc) -> Router { + build_router().with_state(state) +} + +fn build_router() -> Router> { Router::new() .route("/health", get(health)) - // TODO: S3 API 路由 - // PUT /{bucket}/{key} → PutObject - // GET /{bucket}/{key} → GetObject - // HEAD /{bucket}/{key} → HeadObject - // DELETE /{bucket}/{key} → DeleteObject - // POST /{bucket}/{key}?restore → RestoreObject - // GET /{bucket} → ListObjects - // PUT /{bucket} → CreateBucket - // DELETE /{bucket} → DeleteBucket - // HEAD /{bucket} → HeadBucket - // GET / → ListBuckets - .with_state(state) + .route("/", get(list_buckets)) + .route( + "/:bucket", + get(list_objects) + .put(create_bucket) + .delete(delete_bucket) + .head(head_bucket), + ) + .route( + "/:bucket/*key", + get(get_object) + .put(put_object) + .delete(delete_object) + .head(head_object) + .post(post_object), + ) +} + +#[cfg(test)] +fn test_router(state: Arc) -> Router { + build_router().with_state(state) } async fn health() -> &'static str { "OK" } + +async fn list_buckets(State(state): State>) -> Response { + match state.backend.list_buckets().await { + Ok(response) => list_buckets_xml_response(&response), + Err(status) => grpc_status_to_s3_response(status, "/"), + } +} + +async fn create_bucket( + State(state): State>, + Path(bucket): Path, +) -> Response { + match state.backend.create_bucket(&bucket).await { + Ok(()) => empty_response(StatusCode::OK), + Err(status) => grpc_status_to_s3_response(status, &format!("/{bucket}")), + } +} + +async fn delete_bucket( + State(state): State>, + Path(bucket): Path, +) -> Response { + match state.backend.delete_bucket(&bucket).await { + Ok(()) => empty_response(StatusCode::NO_CONTENT), + Err(status) => grpc_status_to_s3_response(status, &format!("/{bucket}")), + } +} + +async fn head_bucket( + State(state): State>, + Path(bucket): Path, +) -> Response { + match state.backend.head_bucket(&bucket).await { + Ok(()) => empty_response(StatusCode::OK), + Err(status) => grpc_status_to_s3_response(status, &format!("/{bucket}")), + } +} + +async fn list_objects( + State(state): State>, + Path(bucket): Path, + Query(query): Query>, +) -> Response { + let prefix = query.get("prefix").map(String::as_str); + let marker = query.get("marker").map(String::as_str); + let delimiter = query.get("delimiter").map(String::as_str); + let max_keys = query + .get("max-keys") + .and_then(|value| value.parse::().ok()) + .unwrap_or(1000); + match state + .backend + .list_objects(&bucket, prefix, marker, delimiter, max_keys) + .await + { + Ok(response) => list_objects_xml_response(&response), + Err(status) => grpc_status_to_s3_response(status, &format!("/{bucket}")), + } +} + +async fn get_object( + State(state): State>, + Path((bucket, key)): Path<(String, String)>, +) -> Response { + match state.backend.get_object(&bucket, &key).await { + Ok(object) => get_object_success_response(object), + Err(status) => grpc_status_to_s3_response(status, &format!("/{bucket}/{key}")), + } +} + +async fn put_object( + State(state): State>, + Path((bucket, key)): Path<(String, String)>, + headers: HeaderMap, + body: Bytes, +) -> Response { + let content_type = headers + .get(axum::http::header::CONTENT_TYPE) + .and_then(|value| value.to_str().ok()) + .map(str::to_string); + match state + .backend + .put_object(&bucket, &key, body.to_vec(), content_type) + .await + { + Ok(response) => put_object_success_response(response), + Err(status) => grpc_status_to_s3_response(status, &format!("/{bucket}/{key}")), + } +} + +async fn delete_object( + State(state): State>, + Path((bucket, key)): Path<(String, String)>, +) -> Response { + match state.backend.delete_object(&bucket, &key).await { + Ok(()) => empty_response(StatusCode::NO_CONTENT), + Err(status) => grpc_status_to_s3_response(status, &format!("/{bucket}/{key}")), + } +} + +async fn head_object( + State(state): State>, + Path((bucket, key)): Path<(String, String)>, +) -> Response { + match state.backend.head_object(&bucket, &key).await { + Ok(head) => head_object_success_response(head), + Err(status) => grpc_status_to_s3_response(status, &format!("/{bucket}/{key}")), + } +} + +async fn post_object( + State(state): State>, + Path((bucket, key)): Path<(String, String)>, + Query(query): Query>, +) -> Response { + let raw_query = if query.is_empty() { + None + } else { + Some( + query + .iter() + .map(|(k, v)| { + if v.is_empty() { + k.clone() + } else { + format!("{k}={v}") + } + }) + .collect::>() + .join("&"), + ) + }; + let resource = format!("/{bucket}/{key}"); + if !is_restore_request(raw_query.as_deref()) { + return bad_request_response("UnsupportedPostAction", &resource); + } + + let days = query + .get("days") + .and_then(|value| value.parse::().ok()) + .unwrap_or(1); + let tier = match query.get("tier").map(String::as_str) { + Some("Expedited") => coldstore_proto::common::RestoreTier::Expedited, + Some("Bulk") => coldstore_proto::common::RestoreTier::Bulk, + _ => coldstore_proto::common::RestoreTier::Standard, + }; + + match state + .backend + .restore_object(&bucket, &key, days, tier) + .await + { + Ok(response) => empty_response(if response.status_code == 200 { + StatusCode::OK + } else { + StatusCode::ACCEPTED + }), + Err(status) => grpc_status_to_s3_response(status, &resource), + } +} + +fn list_buckets_xml_response( + response: &coldstore_proto::scheduler::ListBucketsResponse, +) -> Response { + let buckets_xml = response + .buckets + .iter() + .map(|bucket| format!("{}", bucket.name)) + .collect::>() + .join(""); + let body = format!( + "{}", + buckets_xml + ); + xml_response(StatusCode::OK, body) +} + +fn list_objects_xml_response( + response: &coldstore_proto::scheduler::ListObjectsResponse, +) -> Response { + let contents = response + .contents + .iter() + .map(|entry| { + format!( + "{}{}{}{}", + entry.key, entry.etag, entry.size, entry.storage_class + ) + }) + .collect::>() + .join(""); + let body = format!( + "{}{}", + response.bucket, contents + ); + xml_response(StatusCode::OK, body) +} + +fn put_object_success_response( + response: coldstore_proto::scheduler::PutObjectResponse, +) -> Response { + let mut http = empty_response(StatusCode::OK); + http.headers_mut().insert( + axum::http::header::ETAG, + HeaderValue::from_str(&response.etag).unwrap(), + ); + http +} + +fn get_object_success_response(object: DownloadedObject) -> Response { + let mut response = Response::new(Body::from(object.body)); + *response.status_mut() = StatusCode::OK; + apply_object_headers(response.headers_mut(), &object.head); + response +} + +fn head_object_success_response(head: coldstore_proto::scheduler::HeadObjectResponse) -> Response { + let mut response = Response::new(Body::empty()); + *response.status_mut() = StatusCode::OK; + apply_object_headers(response.headers_mut(), &head); + response +} + +fn apply_object_headers( + headers: &mut axum::http::HeaderMap, + head: &coldstore_proto::scheduler::HeadObjectResponse, +) { + headers.insert( + axum::http::header::CONTENT_LENGTH, + HeaderValue::from_str(&head.content_length.to_string()).unwrap(), + ); + if let Some(content_type) = &head.content_type { + headers.insert( + axum::http::header::CONTENT_TYPE, + HeaderValue::from_str(content_type).unwrap(), + ); + } + headers.insert( + axum::http::header::ETAG, + HeaderValue::from_str(&head.etag).unwrap(), + ); + if let Some(restore_info) = &head.restore_info { + headers.insert( + HeaderName::from_static("x-amz-restore"), + HeaderValue::from_str(&normalize_restore_info(restore_info)).unwrap(), + ); + } +} + +fn normalize_restore_info(restore_info: &str) -> String { + if let Some(expiry) = restore_info.strip_prefix("ongoing-request=\"false\", expiry-ts=\"") { + format_restore_header(false, Some(expiry.trim_end_matches('"'))) + } else if restore_info == "ongoing-request=\"false\"" { + format_restore_header(false, None) + } else { + format_restore_header(true, None) + } +} + +fn bad_request_response(operation: &str, resource: &str) -> Response { + let message = format!("unsupported POST action for {operation}"); + let body = S3ErrorResponse { + code: S3ErrorCode::NotImplemented, + message: &message, + resource, + } + .to_xml(); + s3_xml_response(StatusCode::BAD_REQUEST, body) +} + +fn grpc_status_to_s3_response(status: tonic::Status, resource: &str) -> Response { + let (code, http_status) = match status.code() { + tonic::Code::AlreadyExists => (S3ErrorCode::NotImplemented, StatusCode::CONFLICT), + tonic::Code::NotFound => { + let code = if resource.matches('/').count() > 1 { + S3ErrorCode::NoSuchKey + } else { + S3ErrorCode::NoSuchBucket + }; + (code, StatusCode::NOT_FOUND) + } + tonic::Code::Unimplemented => (S3ErrorCode::NotImplemented, StatusCode::NOT_IMPLEMENTED), + _ => (S3ErrorCode::NotImplemented, StatusCode::BAD_GATEWAY), + }; + let body = S3ErrorResponse { + code, + message: status.message(), + resource, + } + .to_xml(); + s3_xml_response(http_status, body) +} + +fn s3_xml_response(status: StatusCode, body: String) -> Response { + let mut response = Response::new(Body::from(body)); + *response.status_mut() = status; + response.headers_mut().insert( + axum::http::header::CONTENT_TYPE, + HeaderValue::from_static("application/xml"), + ); + response +} + +fn xml_response(status: StatusCode, body: String) -> Response { + s3_xml_response(status, body) +} + +fn empty_response(status: StatusCode) -> Response { + let mut response = Response::new(Body::empty()); + *response.status_mut() = status; + response +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::GatewayBackend; + use axum::body::to_bytes; + use axum::http::Request; + use coldstore_proto::scheduler::{ + BucketEntry, HeadObjectResponse, ListBucketsResponse, ListObjectsResponse, ObjectEntry, + PutObjectResponse, RestoreObjectResponse, + }; + use tower::util::ServiceExt; + + struct MockGatewayBackend; + + #[tonic::async_trait] + impl GatewayBackend for MockGatewayBackend { + async fn list_buckets(&self) -> std::result::Result { + Ok(ListBucketsResponse { + buckets: vec![BucketEntry { + name: "docs".into(), + creation_date: None, + }], + }) + } + + async fn create_bucket(&self, bucket: &str) -> std::result::Result<(), tonic::Status> { + if bucket == "existing" { + Err(tonic::Status::already_exists("bucket already exists")) + } else { + Ok(()) + } + } + + async fn delete_bucket(&self, bucket: &str) -> std::result::Result<(), tonic::Status> { + if bucket == "docs" { + Ok(()) + } else { + Err(tonic::Status::not_found("bucket missing")) + } + } + + async fn head_bucket(&self, bucket: &str) -> std::result::Result<(), tonic::Status> { + if bucket == "docs" { + Ok(()) + } else { + Err(tonic::Status::not_found("bucket missing")) + } + } + + async fn list_objects( + &self, + bucket: &str, + _prefix: Option<&str>, + _marker: Option<&str>, + _delimiter: Option<&str>, + _max_keys: u32, + ) -> std::result::Result { + if bucket != "docs" { + return Err(tonic::Status::not_found("bucket missing")); + } + Ok(ListObjectsResponse { + bucket: "docs".into(), + prefix: None, + marker: None, + next_marker: None, + max_keys: 1000, + is_truncated: false, + contents: vec![ObjectEntry { + key: "readme.txt".into(), + last_modified: None, + etag: "etag-1".into(), + size: 42, + storage_class: "COLD".into(), + }], + common_prefixes: vec![], + }) + } + + async fn put_object( + &self, + _bucket: &str, + _key: &str, + _body: Vec, + _content_type: Option, + ) -> std::result::Result { + Ok(PutObjectResponse { + etag: "etag-put".into(), + version_id: "v1".into(), + }) + } + + async fn get_object( + &self, + bucket: &str, + key: &str, + ) -> std::result::Result { + Ok(DownloadedObject { + head: self.head_object(bucket, key).await?, + body: b"hello world".to_vec(), + }) + } + + async fn delete_object( + &self, + bucket: &str, + key: &str, + ) -> std::result::Result<(), tonic::Status> { + if bucket == "docs" && key == "readme.txt" { + Ok(()) + } else { + Err(tonic::Status::not_found("object missing")) + } + } + + async fn head_object( + &self, + bucket: &str, + key: &str, + ) -> std::result::Result { + if bucket == "docs" && key == "readme.txt" { + Ok(HeadObjectResponse { + content_length: 42, + content_type: Some("text/plain".into()), + etag: "etag-1".into(), + storage_class: 2, + restore_info: Some("ongoing-request=\"false\", expiry-ts=\"123\"".into()), + last_modified: None, + }) + } else { + Err(tonic::Status::not_found("object missing")) + } + } + + async fn restore_object( + &self, + bucket: &str, + key: &str, + _days: u32, + _tier: coldstore_proto::common::RestoreTier, + ) -> std::result::Result { + if bucket == "docs" && key == "readme.txt" { + Ok(RestoreObjectResponse { status_code: 202 }) + } else { + Err(tonic::Status::not_found("object missing")) + } + } + } + + fn state() -> Arc { + Arc::new(GatewayState { + backend: Arc::new(MockGatewayBackend), + }) + } + + #[tokio::test] + async fn health_route_returns_ok() { + let response = test_router(state()) + .oneshot( + Request::builder() + .uri("/health") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + } + + #[tokio::test] + async fn list_buckets_route_returns_xml_from_backend() { + let response = test_router(state()) + .oneshot(Request::builder().uri("/").body(Body::empty()).unwrap()) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + let body = to_bytes(response.into_body(), usize::MAX).await.unwrap(); + assert!(String::from_utf8(body.to_vec()) + .unwrap() + .contains("docs")); + } + + #[tokio::test] + async fn list_objects_route_returns_xml_from_backend() { + let response = test_router(state()) + .oneshot(Request::builder().uri("/docs").body(Body::empty()).unwrap()) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + let body = to_bytes(response.into_body(), usize::MAX).await.unwrap(); + let text = String::from_utf8(body.to_vec()).unwrap(); + assert!(text.contains("")); + assert!(text.contains("readme.txt")); + } + + #[tokio::test] + async fn create_bucket_route_uses_backend() { + let response = test_router(state()) + .oneshot( + Request::builder() + .method("PUT") + .uri("/new-bucket") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + } + + #[tokio::test] + async fn delete_bucket_route_uses_backend() { + let response = test_router(state()) + .oneshot( + Request::builder() + .method("DELETE") + .uri("/docs") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::NO_CONTENT); + } + + #[tokio::test] + async fn put_object_route_uses_backend() { + let response = test_router(state()) + .oneshot( + Request::builder() + .method("PUT") + .uri("/docs/readme.txt") + .header("content-type", "text/plain") + .body(Body::from("hello world")) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + assert_eq!(response.headers()["etag"], "etag-put"); + } + + #[tokio::test] + async fn get_object_route_returns_body_from_backend() { + let response = test_router(state()) + .oneshot( + Request::builder() + .uri("/docs/readme.txt") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + let body = to_bytes(response.into_body(), usize::MAX).await.unwrap(); + assert_eq!(body.as_ref(), b"hello world"); + } + + #[tokio::test] + async fn delete_object_route_uses_backend() { + let response = test_router(state()) + .oneshot( + Request::builder() + .method("DELETE") + .uri("/docs/readme.txt") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::NO_CONTENT); + } + + #[tokio::test] + async fn restore_post_route_uses_backend() { + let response = test_router(state()) + .oneshot( + Request::builder() + .method("POST") + .uri("/docs/readme.txt?restore=true&days=2&tier=Bulk") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::ACCEPTED); + } + + #[tokio::test] + async fn head_bucket_route_uses_backend() { + let response = test_router(state()) + .oneshot( + Request::builder() + .method("HEAD") + .uri("/docs") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + } + + #[tokio::test] + async fn head_object_route_sets_restore_header() { + let response = test_router(state()) + .oneshot( + Request::builder() + .method("HEAD") + .uri("/docs/readme.txt") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + assert_eq!(response.headers()["etag"], "etag-1"); + assert_eq!( + response.headers()["x-amz-restore"], + "ongoing-request=\"false\", expiry-date=\"123\"" + ); + } +} diff --git a/crates/gateway/src/lib.rs b/crates/gateway/src/lib.rs index 91185c7..e3f265b 100644 --- a/crates/gateway/src/lib.rs +++ b/crates/gateway/src/lib.rs @@ -3,19 +3,280 @@ pub mod protocol; use anyhow::Result; use coldstore_common::config::GatewayConfig; +use coldstore_proto::common; use coldstore_proto::scheduler::scheduler_service_client::SchedulerServiceClient; +use coldstore_proto::scheduler::{ + CreateBucketRequest, DeleteBucketRequest, DeleteObjectRequest, HeadBucketRequest, + HeadObjectRequest, HeadObjectResponse, ListBucketsResponse, ListObjectsRequest, + ListObjectsResponse, PutObjectMeta, PutObjectRequest, PutObjectResponse, RestoreObjectRequest, + RestoreObjectResponse, +}; +use std::sync::Arc; use tonic::transport::Channel; use tracing::info; +pub struct DownloadedObject { + pub head: HeadObjectResponse, + pub body: Vec, +} + +#[tonic::async_trait] +pub trait GatewayBackend: Send + Sync + 'static { + async fn list_buckets(&self) -> std::result::Result; + async fn create_bucket(&self, bucket: &str) -> std::result::Result<(), tonic::Status>; + async fn delete_bucket(&self, bucket: &str) -> std::result::Result<(), tonic::Status>; + async fn head_bucket(&self, bucket: &str) -> std::result::Result<(), tonic::Status>; + async fn list_objects( + &self, + bucket: &str, + prefix: Option<&str>, + marker: Option<&str>, + delimiter: Option<&str>, + max_keys: u32, + ) -> std::result::Result; + async fn put_object( + &self, + bucket: &str, + key: &str, + body: Vec, + content_type: Option, + ) -> std::result::Result; + async fn get_object( + &self, + bucket: &str, + key: &str, + ) -> std::result::Result; + async fn delete_object( + &self, + bucket: &str, + key: &str, + ) -> std::result::Result<(), tonic::Status>; + async fn head_object( + &self, + bucket: &str, + key: &str, + ) -> std::result::Result; + async fn restore_object( + &self, + bucket: &str, + key: &str, + days: u32, + tier: common::RestoreTier, + ) -> std::result::Result; +} + +pub struct GrpcGatewayBackend { + scheduler_addr: String, +} + +impl GrpcGatewayBackend { + pub fn new(scheduler_addr: String) -> Self { + Self { scheduler_addr } + } + + async fn connect(&self) -> std::result::Result, tonic::Status> { + SchedulerServiceClient::connect(self.scheduler_addr.clone()) + .await + .map_err(|err| tonic::Status::unavailable(err.to_string())) + } +} + +#[tonic::async_trait] +impl GatewayBackend for GrpcGatewayBackend { + async fn list_buckets(&self) -> std::result::Result { + let mut client = self.connect().await?; + client.list_buckets(()).await.map(|r| r.into_inner()) + } + + async fn create_bucket(&self, bucket: &str) -> std::result::Result<(), tonic::Status> { + let mut client = self.connect().await?; + client + .create_bucket(CreateBucketRequest { + bucket: bucket.to_string(), + }) + .await + .map(|_| ()) + } + + async fn delete_bucket(&self, bucket: &str) -> std::result::Result<(), tonic::Status> { + let mut client = self.connect().await?; + client + .delete_bucket(DeleteBucketRequest { + bucket: bucket.to_string(), + }) + .await + .map(|_| ()) + } + + async fn head_bucket(&self, bucket: &str) -> std::result::Result<(), tonic::Status> { + let mut client = self.connect().await?; + client + .head_bucket(HeadBucketRequest { + bucket: bucket.to_string(), + }) + .await + .map(|_| ()) + } + + async fn list_objects( + &self, + bucket: &str, + prefix: Option<&str>, + marker: Option<&str>, + delimiter: Option<&str>, + max_keys: u32, + ) -> std::result::Result { + let mut client = self.connect().await?; + client + .list_objects(ListObjectsRequest { + bucket: bucket.to_string(), + prefix: prefix.map(str::to_string), + marker: marker.map(str::to_string), + delimiter: delimiter.map(str::to_string), + max_keys, + }) + .await + .map(|r| r.into_inner()) + } + + async fn put_object( + &self, + bucket: &str, + key: &str, + body: Vec, + content_type: Option, + ) -> std::result::Result { + let mut client = self.connect().await?; + let stream = tokio_stream::iter(vec![ + PutObjectRequest { + payload: Some( + coldstore_proto::scheduler::put_object_request::Payload::Meta(PutObjectMeta { + bucket: bucket.to_string(), + key: key.to_string(), + content_length: body.len() as u64, + content_type, + checksum_sha256: None, + }), + ), + }, + PutObjectRequest { + payload: Some(coldstore_proto::scheduler::put_object_request::Payload::Data(body)), + }, + ]); + client.put_object(stream).await.map(|r| r.into_inner()) + } + + async fn get_object( + &self, + bucket: &str, + key: &str, + ) -> std::result::Result { + let mut client = self.connect().await?; + let mut stream = client + .get_object(coldstore_proto::scheduler::GetObjectRequest { + bucket: bucket.to_string(), + key: key.to_string(), + version_id: None, + }) + .await? + .into_inner(); + + let first = stream + .message() + .await? + .ok_or_else(|| tonic::Status::internal("missing object metadata chunk"))?; + let head = match first.payload { + Some(coldstore_proto::scheduler::get_object_response::Payload::Meta(meta)) => { + HeadObjectResponse { + content_length: meta.content_length, + content_type: meta.content_type, + etag: meta.etag, + storage_class: meta.storage_class, + restore_info: meta.restore_info, + last_modified: meta.last_modified, + } + } + _ => { + return Err(tonic::Status::internal( + "first object chunk was not metadata", + )) + } + }; + + let mut body = Vec::new(); + while let Some(chunk) = stream.message().await? { + if let Some(coldstore_proto::scheduler::get_object_response::Payload::Data(bytes)) = + chunk.payload + { + body.extend_from_slice(&bytes); + } + } + + Ok(DownloadedObject { head, body }) + } + + async fn delete_object( + &self, + bucket: &str, + key: &str, + ) -> std::result::Result<(), tonic::Status> { + let mut client = self.connect().await?; + client + .delete_object(DeleteObjectRequest { + bucket: bucket.to_string(), + key: key.to_string(), + version_id: None, + }) + .await + .map(|_| ()) + } + + async fn head_object( + &self, + bucket: &str, + key: &str, + ) -> std::result::Result { + let mut client = self.connect().await?; + client + .head_object(HeadObjectRequest { + bucket: bucket.to_string(), + key: key.to_string(), + version_id: None, + }) + .await + .map(|r| r.into_inner()) + } + + async fn restore_object( + &self, + bucket: &str, + key: &str, + days: u32, + tier: common::RestoreTier, + ) -> std::result::Result { + let mut client = self.connect().await?; + client + .restore_object(RestoreObjectRequest { + bucket: bucket.to_string(), + key: key.to_string(), + version_id: None, + days, + tier: tier as i32, + }) + .await + .map(|r| r.into_inner()) + } +} + pub struct GatewayState { - pub scheduler: SchedulerServiceClient, + pub backend: Arc, } pub async fn run(config: GatewayConfig) -> Result<()> { let scheduler_addr = format!("http://{}", &config.scheduler_addrs[0]); - let scheduler = SchedulerServiceClient::connect(scheduler_addr).await?; - - let state = std::sync::Arc::new(GatewayState { scheduler }); + let state = Arc::new(GatewayState { + backend: Arc::new(GrpcGatewayBackend::new(scheduler_addr)), + }); let app = handler::router(state); diff --git a/crates/gateway/src/protocol.rs b/crates/gateway/src/protocol.rs index cc8d62e..209a5fa 100644 --- a/crates/gateway/src/protocol.rs +++ b/crates/gateway/src/protocol.rs @@ -8,12 +8,14 @@ //! - GET 行为控制 (冷对象需先 Restore) /// S3 错误码 +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum S3ErrorCode { InvalidObjectState, RestoreAlreadyInProgress, GlacierExpeditedRetrievalNotAvailable, NoSuchKey, NoSuchBucket, + NotImplemented, } impl S3ErrorCode { @@ -26,6 +28,7 @@ impl S3ErrorCode { } S3ErrorCode::NoSuchKey => "NoSuchKey", S3ErrorCode::NoSuchBucket => "NoSuchBucket", + S3ErrorCode::NotImplemented => "NotImplemented", } } @@ -36,10 +39,29 @@ impl S3ErrorCode { S3ErrorCode::GlacierExpeditedRetrievalNotAvailable => 503, S3ErrorCode::NoSuchKey => 404, S3ErrorCode::NoSuchBucket => 404, + S3ErrorCode::NotImplemented => 501, } } } +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct S3ErrorResponse<'a> { + pub code: S3ErrorCode, + pub message: &'a str, + pub resource: &'a str, +} + +impl<'a> S3ErrorResponse<'a> { + pub fn to_xml(&self) -> String { + format!( + "{}{}{}", + self.code.as_str(), + self.message, + self.resource, + ) + } +} + /// 生成 x-amz-restore 响应头 pub fn format_restore_header(ongoing: bool, expiry_date: Option<&str>) -> String { if ongoing { @@ -50,3 +72,41 @@ pub fn format_restore_header(ongoing: bool, expiry_date: Option<&str>) -> String "ongoing-request=\"false\"".to_string() } } + +pub fn is_restore_request(query: Option<&str>) -> bool { + query + .unwrap_or_default() + .split('&') + .any(|item| item == "restore" || item.starts_with("restore=")) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn restore_header_formats_completed_state() { + let value = format_restore_header(false, Some("Fri, 28 Feb 2025 12:00:00 GMT")); + assert!(value.contains("ongoing-request=\"false\"")); + assert!(value.contains("expiry-date=\"Fri, 28 Feb 2025 12:00:00 GMT\"")); + } + + #[test] + fn restore_query_is_detected() { + assert!(is_restore_request(Some("restore"))); + assert!(is_restore_request(Some("foo=bar&restore=true"))); + assert!(!is_restore_request(Some("foo=bar"))); + } + + #[test] + fn s3_error_xml_contains_code_and_resource() { + let xml = S3ErrorResponse { + code: S3ErrorCode::NotImplemented, + message: "operation is not implemented", + resource: "/docs/readme.txt", + } + .to_xml(); + assert!(xml.contains("NotImplemented")); + assert!(xml.contains("/docs/readme.txt")); + } +} diff --git a/crates/metadata/Cargo.toml b/crates/metadata/Cargo.toml index a83f3e4..8e9594e 100644 --- a/crates/metadata/Cargo.toml +++ b/crates/metadata/Cargo.toml @@ -15,6 +15,7 @@ coldstore-common = { workspace = true } tokio = { workspace = true } tonic = { workspace = true } prost = { workspace = true } +prost-types = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } anyhow = { workspace = true } diff --git a/crates/metadata/src/service.rs b/crates/metadata/src/service.rs index bdf7aac..d058647 100644 --- a/crates/metadata/src/service.rs +++ b/crates/metadata/src/service.rs @@ -3,359 +3,1211 @@ use coldstore_common::config::MetadataConfig; use coldstore_proto::common; use coldstore_proto::metadata::metadata_service_server::MetadataService; use coldstore_proto::metadata::*; +use prost_types::Timestamp; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::RwLock; use tonic::{Request, Response, Status}; +#[derive(Debug, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)] +struct ObjectKey { + bucket: String, + key: String, + version_id: Option, +} + +impl ObjectKey { + fn new(bucket: String, key: String, version_id: Option) -> Self { + Self { + bucket, + key, + version_id, + } + } +} + +#[derive(Default)] +struct MetadataState { + objects: HashMap, + buckets: HashMap, + archive_bundles: HashMap, + archive_tasks: HashMap, + recall_tasks: HashMap, + tapes: HashMap, + scheduler_workers: HashMap, + cache_workers: HashMap, + tape_workers: HashMap, +} + pub struct MetadataServiceImpl { - _config: MetadataConfig, - // TODO: Phase 2 — Raft + RocksDB - // raft: Arc>, - // store: Arc, + config: MetadataConfig, + state: Arc>, } impl MetadataServiceImpl { pub async fn new(config: &MetadataConfig) -> Result { - // TODO: 初始化 Raft 集群和 RocksDB Ok(Self { - _config: config.clone(), + config: config.clone(), + state: Arc::new(RwLock::new(MetadataState::default())), }) } + + fn metadata_nodes(&self) -> Vec { + self.config + .cluster + .split(',') + .filter_map(|entry| { + let (node_id, addr) = entry.split_once(':')?; + let node_id = node_id.parse::().ok()?; + Some(common::MetadataNodeInfo { + node_id, + addr: addr.to_string(), + raft_role: if node_id == self.config.node_id { + "LeaderCandidate".into() + } else { + "Follower".into() + }, + last_heartbeat: Some(now_timestamp()), + status: common::NodeStatus::NodeOnline as i32, + }) + }) + .collect() + } } #[tonic::async_trait] impl MetadataService for MetadataServiceImpl { - // ── ObjectApi ── - async fn put_object( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let mut object = request.into_inner(); + let mut state = self.state.write().await; + if !state.buckets.contains_key(&object.bucket) { + return Err(Status::not_found(format!( + "bucket not found: {}", + object.bucket + ))); + } + + let now = now_timestamp(); + if object.created_at.is_none() { + object.created_at = Some(now); + } + object.updated_at = Some(now); + + let key = ObjectKey::new( + object.bucket.clone(), + object.key.clone(), + object.version_id.clone(), + ); + state.objects.insert(key, object.clone()); + refresh_bucket_stats(&mut state, &object.bucket); + Ok(Response::new(())) } async fn get_object( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let request = request.into_inner(); + let state = self.state.read().await; + let object = find_object(&state, &request.bucket, &request.key, None)?; + Ok(Response::new(object)) } async fn get_object_version( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let request = request.into_inner(); + let state = self.state.read().await; + let object = find_object( + &state, + &request.bucket, + &request.key, + Some(request.version_id.as_str()), + )?; + Ok(Response::new(object)) } async fn delete_object( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let request = request.into_inner(); + let mut state = self.state.write().await; + let removed = state.objects.remove(&ObjectKey::new( + request.bucket.clone(), + request.key.clone(), + None, + )); + if removed.is_none() { + return Err(Status::not_found("object not found")); + } + refresh_bucket_stats(&mut state, &request.bucket); + Ok(Response::new(())) } async fn head_object( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let request = request.into_inner(); + let state = self.state.read().await; + let object = find_object(&state, &request.bucket, &request.key, None)?; + Ok(Response::new(object)) } async fn list_objects( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let request = request.into_inner(); + let state = self.state.read().await; + if !state.buckets.contains_key(&request.bucket) { + return Err(Status::not_found(format!( + "bucket not found: {}", + request.bucket + ))); + } + + let prefix = request.prefix.unwrap_or_default(); + let marker = request.marker.unwrap_or_default(); + let limit = if request.max_keys == 0 { + usize::MAX + } else { + request.max_keys as usize + }; + + let mut objects: Vec<_> = state + .objects + .values() + .filter(|object| object.bucket == request.bucket) + .filter(|object| object.key.starts_with(&prefix)) + .filter(|object| object.key > marker) + .cloned() + .collect(); + objects.sort_by(|a, b| { + a.key + .cmp(&b.key) + .then_with(|| a.version_id.cmp(&b.version_id)) + }); + + let is_truncated = objects.len() > limit; + let next_marker = if is_truncated { + objects.get(limit - 1).map(|object| object.key.clone()) + } else { + None + }; + + Ok(Response::new(ListObjectsResponse { + objects: objects.into_iter().take(limit).collect(), + next_marker, + is_truncated, + })) } async fn update_storage_class( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let request = request.into_inner(); + let mut state = self.state.write().await; + let object = find_object_mut(&mut state, &request.bucket, &request.key, None)?; + object.storage_class = request.storage_class; + object.updated_at = Some(now_timestamp()); + Ok(Response::new(())) } async fn update_archive_location( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let request = request.into_inner(); + let mut state = self.state.write().await; + let object = find_object_mut(&mut state, &request.bucket, &request.key, None)?; + object.archive_id = Some(request.archive_id); + object.tape_id = Some(request.tape_id); + object.tape_set = request.tape_set; + object.tape_block_offset = Some(request.tape_block_offset); + object.updated_at = Some(now_timestamp()); + Ok(Response::new(())) } async fn update_restore_status( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let request = request.into_inner(); + let mut state = self.state.write().await; + let object = find_object_mut(&mut state, &request.bucket, &request.key, None)?; + validate_restore_transition(object.restore_status, request.status)?; + object.restore_status = Some(request.status); + object.restore_expire_at = request.expire_at; + object.updated_at = Some(now_timestamp()); + Ok(Response::new(())) } async fn scan_cold_pending( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let request = request.into_inner(); + let limit = if request.limit == 0 { + usize::MAX + } else { + request.limit as usize + }; + let state = self.state.read().await; + let mut objects: Vec<_> = state + .objects + .values() + .filter(|object| object.storage_class == common::StorageClass::ColdPending as i32) + .cloned() + .collect(); + objects.sort_by(|a, b| a.bucket.cmp(&b.bucket).then_with(|| a.key.cmp(&b.key))); + Ok(Response::new(ScanColdPendingResponse { + objects: objects.into_iter().take(limit).collect(), + })) } - // ── BucketApi ── - async fn create_bucket( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let mut bucket = request.into_inner(); + let mut state = self.state.write().await; + if state.buckets.contains_key(&bucket.name) { + return Err(Status::already_exists(format!( + "bucket already exists: {}", + bucket.name + ))); + } + if bucket.created_at.is_none() { + bucket.created_at = Some(now_timestamp()); + } + state.buckets.insert(bucket.name.clone(), bucket); + Ok(Response::new(())) } async fn get_bucket( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let request = request.into_inner(); + let state = self.state.read().await; + let bucket = state + .buckets + .get(&request.name) + .cloned() + .ok_or_else(|| Status::not_found(format!("bucket not found: {}", request.name)))?; + Ok(Response::new(bucket)) } async fn delete_bucket( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let request = request.into_inner(); + let mut state = self.state.write().await; + let has_objects = state + .objects + .values() + .any(|object| object.bucket == request.name); + if has_objects { + return Err(Status::failed_precondition("bucket is not empty")); + } + state + .buckets + .remove(&request.name) + .ok_or_else(|| Status::not_found(format!("bucket not found: {}", request.name)))?; + Ok(Response::new(())) } async fn list_buckets( &self, _request: Request<()>, ) -> std::result::Result, Status> { - todo!() + let state = self.state.read().await; + let mut buckets: Vec<_> = state.buckets.values().cloned().collect(); + buckets.sort_by(|a, b| a.name.cmp(&b.name)); + Ok(Response::new(ListBucketsResponse { buckets })) } - // ── ArchiveApi ── - async fn put_archive_bundle( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let mut bundle = request.into_inner(); + if bundle.created_at.is_none() { + bundle.created_at = Some(now_timestamp()); + } + let mut state = self.state.write().await; + state.archive_bundles.insert(bundle.id.clone(), bundle); + Ok(Response::new(())) } async fn get_archive_bundle( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let request = request.into_inner(); + let state = self.state.read().await; + let bundle = state + .archive_bundles + .get(&request.id) + .cloned() + .ok_or_else(|| Status::not_found("archive bundle not found"))?; + Ok(Response::new(bundle)) } async fn update_archive_bundle_status( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let request = request.into_inner(); + let mut state = self.state.write().await; + let bundle = state + .archive_bundles + .get_mut(&request.id) + .ok_or_else(|| Status::not_found("archive bundle not found"))?; + validate_archive_bundle_transition(bundle.status, request.status)?; + bundle.status = request.status; + if request.status == common::ArchiveBundleStatus::BundleCompleted as i32 { + bundle.completed_at = Some(now_timestamp()); + } + Ok(Response::new(())) } async fn list_bundles_by_tape( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let request = request.into_inner(); + let state = self.state.read().await; + let bundle_ids = state + .archive_bundles + .values() + .filter(|bundle| bundle.tape_id == request.tape_id) + .map(|bundle| bundle.id.clone()) + .collect(); + Ok(Response::new(ListBundlesByTapeResponse { bundle_ids })) } async fn put_archive_task( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let mut task = request.into_inner(); + if task.created_at.is_none() { + task.created_at = Some(now_timestamp()); + } + let mut state = self.state.write().await; + state.archive_tasks.insert(task.id.clone(), task); + Ok(Response::new(())) } async fn get_archive_task( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let request = request.into_inner(); + let state = self.state.read().await; + let task = state + .archive_tasks + .get(&request.id) + .cloned() + .ok_or_else(|| Status::not_found("archive task not found"))?; + Ok(Response::new(task)) } async fn update_archive_task( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let task = request.into_inner(); + let mut state = self.state.write().await; + let current = state + .archive_tasks + .get(&task.id) + .ok_or_else(|| Status::not_found("archive task not found"))?; + validate_archive_task_transition(current.status, task.status)?; + state.archive_tasks.insert(task.id.clone(), task); + Ok(Response::new(())) } async fn list_pending_archive_tasks( &self, _request: Request<()>, ) -> std::result::Result, Status> { - todo!() + let state = self.state.read().await; + let tasks = state + .archive_tasks + .values() + .filter(|task| { + matches!( + common::ArchiveTaskStatus::try_from(task.status), + Ok(common::ArchiveTaskStatus::ArchiveTaskPending) + | Ok(common::ArchiveTaskStatus::ArchiveTaskInProgress) + ) + }) + .cloned() + .collect(); + Ok(Response::new(ListArchiveTasksResponse { tasks })) } - // ── RecallApi ── - async fn put_recall_task( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let mut task = request.into_inner(); + if task.created_at.is_none() { + task.created_at = Some(now_timestamp()); + } + let mut state = self.state.write().await; + state.recall_tasks.insert(task.id.clone(), task); + Ok(Response::new(())) } async fn get_recall_task( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let request = request.into_inner(); + let state = self.state.read().await; + let task = state + .recall_tasks + .get(&request.id) + .cloned() + .ok_or_else(|| Status::not_found("recall task not found"))?; + Ok(Response::new(task)) } async fn update_recall_task( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let task = request.into_inner(); + let mut state = self.state.write().await; + let current = state + .recall_tasks + .get(&task.id) + .ok_or_else(|| Status::not_found("recall task not found"))?; + validate_restore_transition(Some(current.status), task.status)?; + state.recall_tasks.insert(task.id.clone(), task); + Ok(Response::new(())) } async fn list_pending_recall_tasks( &self, _request: Request<()>, ) -> std::result::Result, Status> { - todo!() + let state = self.state.read().await; + let tasks = state + .recall_tasks + .values() + .filter(|task| is_pending_restore_status(task.status)) + .cloned() + .collect(); + Ok(Response::new(ListRecallTasksResponse { tasks })) } async fn list_recall_tasks_by_tape( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let request = request.into_inner(); + let state = self.state.read().await; + let tasks = state + .recall_tasks + .values() + .filter(|task| task.tape_id == request.tape_id) + .cloned() + .collect(); + Ok(Response::new(ListRecallTasksResponse { tasks })) } async fn find_active_recall( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let request = request.into_inner(); + let state = self.state.read().await; + let task = state + .recall_tasks + .values() + .find(|task| { + task.bucket == request.bucket + && task.key == request.key + && is_active_restore_status(task.status) + }) + .cloned(); + Ok(Response::new(FindActiveRecallResponse { task })) } - // ── TapeApi ── - async fn put_tape( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let mut tape = request.into_inner(); + if tape.registered_at.is_none() { + tape.registered_at = Some(now_timestamp()); + } + let mut state = self.state.write().await; + state.tapes.insert(tape.id.clone(), tape); + Ok(Response::new(())) } async fn get_tape( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let request = request.into_inner(); + let state = self.state.read().await; + let tape = state + .tapes + .get(&request.tape_id) + .cloned() + .ok_or_else(|| Status::not_found("tape not found"))?; + Ok(Response::new(tape)) } async fn update_tape( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let tape = request.into_inner(); + let mut state = self.state.write().await; + state.tapes.insert(tape.id.clone(), tape); + Ok(Response::new(())) } async fn list_tapes( &self, _request: Request<()>, ) -> std::result::Result, Status> { - todo!() + let state = self.state.read().await; + let tapes = state.tapes.values().cloned().collect(); + Ok(Response::new(ListTapesResponse { tapes })) } async fn list_tapes_by_status( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let request = request.into_inner(); + let state = self.state.read().await; + let tapes = state + .tapes + .values() + .filter(|tape| tape.status == request.status) + .cloned() + .collect(); + Ok(Response::new(ListTapesResponse { tapes })) } - // ── ClusterApi ── - async fn get_cluster_info( &self, _request: Request<()>, ) -> std::result::Result, Status> { - todo!() + let state = self.state.read().await; + Ok(Response::new(common::ClusterInfo { + cluster_id: "coldstore-phase1".into(), + metadata_nodes: self.metadata_nodes(), + scheduler_workers: state.scheduler_workers.values().cloned().collect(), + cache_workers: state.cache_workers.values().cloned().collect(), + tape_workers: state.tape_workers.values().cloned().collect(), + leader_id: Some(self.config.node_id), + term: 1, + committed_index: state.objects.len() as u64, + })) } async fn register_scheduler_worker( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let mut worker = request.into_inner(); + worker.last_heartbeat = Some(now_timestamp()); + let mut state = self.state.write().await; + state.scheduler_workers.insert(worker.node_id, worker); + Ok(Response::new(())) } async fn deregister_scheduler_worker( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let request = request.into_inner(); + let mut state = self.state.write().await; + state.scheduler_workers.remove(&request.node_id); + Ok(Response::new(())) } async fn list_online_scheduler_workers( &self, _request: Request<()>, ) -> std::result::Result, Status> { - todo!() + let state = self.state.read().await; + let workers = state + .scheduler_workers + .values() + .filter(|worker| worker.status == common::NodeStatus::NodeOnline as i32) + .cloned() + .collect(); + Ok(Response::new(ListSchedulerWorkersResponse { workers })) } async fn register_cache_worker( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let mut worker = request.into_inner(); + worker.last_heartbeat = Some(now_timestamp()); + let mut state = self.state.write().await; + state.cache_workers.insert(worker.node_id, worker); + Ok(Response::new(())) } async fn deregister_cache_worker( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let request = request.into_inner(); + let mut state = self.state.write().await; + state.cache_workers.remove(&request.node_id); + Ok(Response::new(())) } async fn list_online_cache_workers( &self, _request: Request<()>, ) -> std::result::Result, Status> { - todo!() + let state = self.state.read().await; + let workers = state + .cache_workers + .values() + .filter(|worker| worker.status == common::NodeStatus::NodeOnline as i32) + .cloned() + .collect(); + Ok(Response::new(ListCacheWorkersResponse { workers })) } async fn register_tape_worker( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let mut worker = request.into_inner(); + worker.last_heartbeat = Some(now_timestamp()); + let mut state = self.state.write().await; + state.tape_workers.insert(worker.node_id, worker); + Ok(Response::new(())) } async fn deregister_tape_worker( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let request = request.into_inner(); + let mut state = self.state.write().await; + state.tape_workers.remove(&request.node_id); + Ok(Response::new(())) } async fn list_online_tape_workers( &self, _request: Request<()>, ) -> std::result::Result, Status> { - todo!() + let state = self.state.read().await; + let workers = state + .tape_workers + .values() + .filter(|worker| worker.status == common::NodeStatus::NodeOnline as i32) + .cloned() + .collect(); + Ok(Response::new(ListTapeWorkersResponse { workers })) } async fn update_worker_status( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let request = request.into_inner(); + let mut state = self.state.write().await; + match common::WorkerType::try_from(request.worker_type) { + Ok(common::WorkerType::WorkerScheduler) => state + .scheduler_workers + .get_mut(&request.node_id) + .map(|worker| worker.status = request.status), + Ok(common::WorkerType::WorkerCache) => state + .cache_workers + .get_mut(&request.node_id) + .map(|worker| worker.status = request.status), + Ok(common::WorkerType::WorkerTape) => state + .tape_workers + .get_mut(&request.node_id) + .map(|worker| worker.status = request.status), + _ => None, + } + .ok_or_else(|| Status::not_found("worker not found"))?; + Ok(Response::new(())) } async fn drain_worker( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let request = request.into_inner(); + self.update_worker_status(Request::new(UpdateWorkerStatusRequest { + worker_type: request.worker_type, + node_id: request.node_id, + status: common::NodeStatus::NodeDraining as i32, + })) + .await?; + Ok(Response::new(())) } - // ── HeartbeatApi ── - async fn heartbeat( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let request = request.into_inner(); + let now = Some(now_timestamp()); + let mut state = self.state.write().await; + + match common::WorkerType::try_from(request.worker_type) { + Ok(common::WorkerType::WorkerScheduler) => { + let worker = state + .scheduler_workers + .get_mut(&request.node_id) + .ok_or_else(|| Status::not_found("scheduler worker not found"))?; + worker.last_heartbeat = now; + if let Some(heartbeat_request::Payload::Scheduler(payload)) = request.payload { + worker.pending_archive_tasks = payload.pending_archive_tasks; + worker.pending_recall_tasks = payload.pending_recall_tasks; + worker.active_jobs = payload.active_jobs; + } + } + Ok(common::WorkerType::WorkerCache) => { + let worker = state + .cache_workers + .get_mut(&request.node_id) + .ok_or_else(|| Status::not_found("cache worker not found"))?; + worker.last_heartbeat = now; + if let Some(heartbeat_request::Payload::Cache(payload)) = request.payload { + worker.used_capacity = payload.used_capacity; + worker.blob_count = payload.blob_count; + } + } + Ok(common::WorkerType::WorkerTape) => { + let worker = state + .tape_workers + .get_mut(&request.node_id) + .ok_or_else(|| Status::not_found("tape worker not found"))?; + worker.last_heartbeat = now; + if let Some(heartbeat_request::Payload::Tape(payload)) = request.payload { + worker.drives = payload.drives; + } + } + _ => return Err(Status::invalid_argument("unknown worker type")), + } + + Ok(Response::new(())) + } +} + +fn now_timestamp() -> Timestamp { + Timestamp { + seconds: std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("system clock before unix epoch") + .as_secs() as i64, + nanos: 0, + } +} + +fn refresh_bucket_stats(state: &mut MetadataState, bucket_name: &str) { + if let Some(bucket) = state.buckets.get_mut(bucket_name) { + let mut object_count = 0_u64; + let mut total_size = 0_u64; + for object in state + .objects + .values() + .filter(|object| object.bucket == bucket_name) + { + object_count += 1; + total_size += object.size; + } + bucket.object_count = object_count; + bucket.total_size = total_size; + } +} + +#[allow(clippy::result_large_err)] +fn find_object( + state: &MetadataState, + bucket: &str, + key: &str, + version_id: Option<&str>, +) -> Result { + if let Some(version_id) = version_id { + state + .objects + .get(&ObjectKey::new( + bucket.to_string(), + key.to_string(), + Some(version_id.to_string()), + )) + .cloned() + .ok_or_else(|| Status::not_found("object version not found")) + } else { + state + .objects + .iter() + .filter(|(candidate, _)| candidate.bucket == bucket && candidate.key == key) + .max_by(|(_, left), (_, right)| { + timestamp_sort_key(&left.updated_at).cmp(×tamp_sort_key(&right.updated_at)) + }) + .map(|(_, object)| object.clone()) + .ok_or_else(|| Status::not_found("object not found")) + } +} + +#[allow(clippy::result_large_err)] +fn find_object_mut<'a>( + state: &'a mut MetadataState, + bucket: &str, + key: &str, + version_id: Option<&str>, +) -> Result<&'a mut common::ObjectMetadata, Status> { + if let Some(version_id) = version_id { + state + .objects + .get_mut(&ObjectKey::new( + bucket.to_string(), + key.to_string(), + Some(version_id.to_string()), + )) + .ok_or_else(|| Status::not_found("object version not found")) + } else { + let selected = state + .objects + .keys() + .filter(|candidate| candidate.bucket == bucket && candidate.key == key) + .max_by(|left, right| { + let left_ts = state + .objects + .get(*left) + .map(|obj| timestamp_sort_key(&obj.updated_at)) + .unwrap_or_default(); + let right_ts = state + .objects + .get(*right) + .map(|obj| timestamp_sort_key(&obj.updated_at)) + .unwrap_or_default(); + left_ts.cmp(&right_ts) + }) + .cloned() + .ok_or_else(|| Status::not_found("object not found"))?; + state + .objects + .get_mut(&selected) + .ok_or_else(|| Status::not_found("object not found")) + } +} + +fn timestamp_sort_key(ts: &Option) -> (i64, i32) { + ts.as_ref() + .map(|ts| (ts.seconds, ts.nanos)) + .unwrap_or_default() +} + +#[allow(clippy::result_large_err)] +fn validate_restore_transition(current: Option, next: i32) -> Result<(), Status> { + let current = current.and_then(|value| common::RestoreStatus::try_from(value).ok()); + let next = common::RestoreStatus::try_from(next) + .map_err(|_| Status::invalid_argument("invalid restore status"))?; + let valid = match current { + None => true, + Some(common::RestoreStatus::RestorePending) => matches!( + next, + common::RestoreStatus::RestoreInProgress + | common::RestoreStatus::RestoreWaitingForMedia + | common::RestoreStatus::RestoreFailed + ), + Some(common::RestoreStatus::RestoreWaitingForMedia) => matches!( + next, + common::RestoreStatus::RestorePending | common::RestoreStatus::RestoreFailed + ), + Some(common::RestoreStatus::RestoreInProgress) => { + matches!( + next, + common::RestoreStatus::RestoreCompleted | common::RestoreStatus::RestoreFailed + ) + } + Some(common::RestoreStatus::RestoreCompleted) => { + matches!(next, common::RestoreStatus::RestoreExpired) + } + Some(common::RestoreStatus::RestoreExpired | common::RestoreStatus::RestoreFailed) => false, + Some(common::RestoreStatus::Unspecified) => true, + }; + + if valid { + Ok(()) + } else { + Err(Status::failed_precondition( + "invalid restore state transition", + )) + } +} + +#[allow(clippy::result_large_err)] +fn validate_archive_bundle_transition(current: i32, next: i32) -> Result<(), Status> { + let current = common::ArchiveBundleStatus::try_from(current) + .map_err(|_| Status::invalid_argument("invalid archive bundle status"))?; + let next = common::ArchiveBundleStatus::try_from(next) + .map_err(|_| Status::invalid_argument("invalid archive bundle status"))?; + let valid = match current { + common::ArchiveBundleStatus::BundlePending => { + matches!(next, common::ArchiveBundleStatus::BundleWriting) + } + common::ArchiveBundleStatus::BundleWriting => matches!( + next, + common::ArchiveBundleStatus::BundleCompleted + | common::ArchiveBundleStatus::BundleFailed + ), + common::ArchiveBundleStatus::BundleFailed => { + matches!(next, common::ArchiveBundleStatus::BundlePending) + } + common::ArchiveBundleStatus::BundleCompleted => false, + common::ArchiveBundleStatus::Unspecified => true, + }; + if valid { + Ok(()) + } else { + Err(Status::failed_precondition( + "invalid archive bundle state transition", + )) + } +} + +#[allow(clippy::result_large_err)] +fn validate_archive_task_transition(current: i32, next: i32) -> Result<(), Status> { + let current = common::ArchiveTaskStatus::try_from(current) + .map_err(|_| Status::invalid_argument("invalid archive task status"))?; + let next = common::ArchiveTaskStatus::try_from(next) + .map_err(|_| Status::invalid_argument("invalid archive task status"))?; + let valid = match current { + common::ArchiveTaskStatus::ArchiveTaskPending => { + matches!(next, common::ArchiveTaskStatus::ArchiveTaskInProgress) + } + common::ArchiveTaskStatus::ArchiveTaskInProgress => matches!( + next, + common::ArchiveTaskStatus::ArchiveTaskCompleted + | common::ArchiveTaskStatus::ArchiveTaskFailed + ), + common::ArchiveTaskStatus::ArchiveTaskFailed => { + matches!(next, common::ArchiveTaskStatus::ArchiveTaskPending) + } + common::ArchiveTaskStatus::ArchiveTaskCompleted => false, + common::ArchiveTaskStatus::Unspecified => true, + }; + if valid { + Ok(()) + } else { + Err(Status::failed_precondition( + "invalid archive task state transition", + )) + } +} + +fn is_pending_restore_status(status: i32) -> bool { + matches!( + common::RestoreStatus::try_from(status), + Ok(common::RestoreStatus::RestorePending) + | Ok(common::RestoreStatus::RestoreWaitingForMedia) + | Ok(common::RestoreStatus::RestoreInProgress) + ) +} + +fn is_active_restore_status(status: i32) -> bool { + matches!( + common::RestoreStatus::try_from(status), + Ok(common::RestoreStatus::RestorePending) + | Ok(common::RestoreStatus::RestoreWaitingForMedia) + | Ok(common::RestoreStatus::RestoreInProgress) + | Ok(common::RestoreStatus::RestoreCompleted) + ) +} + +#[cfg(test)] +mod tests { + use super::*; + use coldstore_proto::common::{BucketInfo, ObjectMetadata}; + + fn test_bucket(name: &str) -> BucketInfo { + BucketInfo { + name: name.into(), + created_at: Some(Timestamp { + seconds: 1, + nanos: 0, + }), + owner: Some("tester".into()), + versioning_enabled: false, + object_count: 0, + total_size: 0, + } + } + + fn test_object(bucket: &str, key: &str) -> ObjectMetadata { + ObjectMetadata { + bucket: bucket.into(), + key: key.into(), + version_id: None, + size: 5, + checksum: "sum".into(), + content_type: Some("text/plain".into()), + etag: Some("etag".into()), + storage_class: common::StorageClass::ColdPending as i32, + archive_id: None, + tape_id: None, + tape_set: vec![], + tape_block_offset: None, + restore_status: Some(common::RestoreStatus::RestorePending as i32), + restore_expire_at: None, + created_at: Some(Timestamp { + seconds: 1, + nanos: 0, + }), + updated_at: Some(Timestamp { + seconds: 1, + nanos: 0, + }), + } + } + + #[tokio::test] + async fn create_and_get_bucket_round_trip() { + let svc = MetadataServiceImpl::new(&MetadataConfig::default()) + .await + .expect("service init"); + + let bucket = test_bucket("docs"); + svc.create_bucket(Request::new(bucket.clone())) + .await + .expect("create bucket should succeed"); + + let got = svc + .get_bucket(Request::new(GetBucketRequest { + name: bucket.name.clone(), + })) + .await + .expect("get bucket should succeed") + .into_inner(); + + assert_eq!(got.name, bucket.name); + assert_eq!(got.owner, bucket.owner); + } + + #[tokio::test] + async fn object_lifecycle_and_bucket_stats_work() { + let svc = MetadataServiceImpl::new(&MetadataConfig::default()) + .await + .expect("service init"); + svc.create_bucket(Request::new(test_bucket("docs"))) + .await + .expect("create bucket"); + + svc.put_object(Request::new(test_object("docs", "readme.txt"))) + .await + .expect("put object"); + svc.update_storage_class(Request::new(UpdateStorageClassRequest { + bucket: "docs".into(), + key: "readme.txt".into(), + storage_class: common::StorageClass::Cold as i32, + })) + .await + .expect("update storage class"); + svc.update_restore_status(Request::new(UpdateRestoreStatusRequest { + bucket: "docs".into(), + key: "readme.txt".into(), + status: common::RestoreStatus::RestoreInProgress as i32, + expire_at: None, + })) + .await + .expect("update restore status"); + + let listed = svc + .list_objects(Request::new(ListObjectsRequest { + bucket: "docs".into(), + prefix: None, + marker: None, + max_keys: 100, + })) + .await + .expect("list objects") + .into_inner(); + assert_eq!(listed.objects.len(), 1); + assert_eq!( + listed.objects[0].storage_class, + common::StorageClass::Cold as i32 + ); + assert_eq!( + listed.objects[0].restore_status, + Some(common::RestoreStatus::RestoreInProgress as i32) + ); + + let bucket = svc + .get_bucket(Request::new(GetBucketRequest { + name: "docs".into(), + })) + .await + .expect("get bucket") + .into_inner(); + assert_eq!(bucket.object_count, 1); + assert_eq!(bucket.total_size, 5); + } + + #[tokio::test] + async fn worker_registration_and_heartbeat_update_cluster_state() { + let svc = MetadataServiceImpl::new(&MetadataConfig::default()) + .await + .expect("service init"); + svc.register_scheduler_worker(Request::new(common::SchedulerWorkerInfo { + node_id: 7, + addr: "127.0.0.1:22001".into(), + status: common::NodeStatus::NodeOnline as i32, + last_heartbeat: None, + is_active: true, + pending_archive_tasks: 0, + pending_recall_tasks: 0, + active_jobs: 0, + paired_cache_worker_id: 0, + })) + .await + .expect("register worker"); + + svc.heartbeat(Request::new(HeartbeatRequest { + worker_type: common::WorkerType::WorkerScheduler as i32, + node_id: 7, + payload: Some(heartbeat_request::Payload::Scheduler(SchedulerHeartbeat { + pending_archive_tasks: 3, + pending_recall_tasks: 2, + active_jobs: 1, + })), + })) + .await + .expect("heartbeat"); + + let cluster = svc + .get_cluster_info(Request::new(())) + .await + .expect("cluster info") + .into_inner(); + assert_eq!(cluster.scheduler_workers.len(), 1); + assert_eq!(cluster.scheduler_workers[0].pending_archive_tasks, 3); + assert_eq!(cluster.scheduler_workers[0].pending_recall_tasks, 2); + assert_eq!(cluster.scheduler_workers[0].active_jobs, 1); } } diff --git a/crates/scheduler/Cargo.toml b/crates/scheduler/Cargo.toml index 8f7fd8b..b68ef4c 100644 --- a/crates/scheduler/Cargo.toml +++ b/crates/scheduler/Cargo.toml @@ -23,5 +23,10 @@ tracing-subscriber = { workspace = true } chrono = { workspace = true } uuid = { workspace = true } config = { workspace = true } +sha2 = { workspace = true } tokio-stream = { workspace = true } +prost-types = { workspace = true } + +[dev-dependencies] +coldstore-metadata = { path = "../metadata" } diff --git a/crates/scheduler/src/service.rs b/crates/scheduler/src/service.rs index 91934e7..f53db75 100644 --- a/crates/scheduler/src/service.rs +++ b/crates/scheduler/src/service.rs @@ -1,16 +1,400 @@ use crate::SchedulerState; +use coldstore_proto::common; use coldstore_proto::scheduler::scheduler_service_server::SchedulerService; use coldstore_proto::scheduler::*; +use prost_types::Timestamp; +use sha2::{Digest, Sha256}; +#[cfg(test)] +use std::collections::HashMap; use std::sync::Arc; +#[cfg(test)] +use std::sync::RwLock; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Status, Streaming}; +#[tonic::async_trait] +pub trait Phase1SchedulerBackend: Send + Sync + 'static { + async fn list_buckets(&self) -> std::result::Result, Status>; + async fn create_bucket(&self, bucket: &str) -> std::result::Result<(), Status>; + async fn delete_bucket(&self, bucket: &str) -> std::result::Result<(), Status>; + async fn head_bucket(&self, bucket: &str) -> std::result::Result<(), Status>; + async fn head_object( + &self, + bucket: &str, + key: &str, + ) -> std::result::Result; + async fn get_object( + &self, + bucket: &str, + key: &str, + ) -> std::result::Result<(common::ObjectMetadata, Vec), Status>; + async fn put_object( + &self, + bucket: &str, + key: &str, + body: Vec, + content_type: Option, + ) -> std::result::Result; + async fn delete_object(&self, bucket: &str, key: &str) -> std::result::Result<(), Status>; + async fn restore_object( + &self, + bucket: &str, + key: &str, + days: u32, + tier: common::RestoreTier, + ) -> std::result::Result; + async fn list_objects( + &self, + bucket: &str, + prefix: Option<&str>, + marker: Option<&str>, + max_keys: u32, + ) -> std::result::Result, Status>; +} + +struct MetadataBackedSchedulerBackend { + metadata: coldstore_proto::metadata::metadata_service_client::MetadataServiceClient< + tonic::transport::Channel, + >, +} + +impl MetadataBackedSchedulerBackend { + fn new( + metadata: coldstore_proto::metadata::metadata_service_client::MetadataServiceClient< + tonic::transport::Channel, + >, + ) -> Self { + Self { metadata } + } +} + +#[tonic::async_trait] +impl Phase1SchedulerBackend for MetadataBackedSchedulerBackend { + async fn list_buckets(&self) -> std::result::Result, Status> { + let mut client = self.metadata.clone(); + Ok(client + .list_buckets(Request::new(())) + .await? + .into_inner() + .buckets) + } + + async fn create_bucket(&self, bucket: &str) -> std::result::Result<(), Status> { + let mut client = self.metadata.clone(); + client + .create_bucket(Request::new(common::BucketInfo { + name: bucket.into(), + created_at: Some(now_timestamp()), + owner: None, + versioning_enabled: false, + object_count: 0, + total_size: 0, + })) + .await?; + Ok(()) + } + + async fn delete_bucket(&self, bucket: &str) -> std::result::Result<(), Status> { + let mut client = self.metadata.clone(); + client + .delete_bucket(Request::new( + coldstore_proto::metadata::DeleteBucketRequest { + name: bucket.into(), + }, + )) + .await?; + Ok(()) + } + + async fn head_bucket(&self, bucket: &str) -> std::result::Result<(), Status> { + let mut client = self.metadata.clone(); + client + .get_bucket(Request::new(coldstore_proto::metadata::GetBucketRequest { + name: bucket.into(), + })) + .await?; + Ok(()) + } + + async fn head_object( + &self, + bucket: &str, + key: &str, + ) -> std::result::Result { + let mut client = self.metadata.clone(); + Ok(client + .head_object(Request::new(coldstore_proto::metadata::HeadObjectRequest { + bucket: bucket.into(), + key: key.into(), + })) + .await? + .into_inner()) + } + + async fn get_object( + &self, + bucket: &str, + key: &str, + ) -> std::result::Result<(common::ObjectMetadata, Vec), Status> { + let object = self.head_object(bucket, key).await?; + Err(Status::failed_precondition(format!( + "scheduler.get_object requires phase-1 cache wiring; metadata is available for {}/{} but body retrieval is not yet connected", + object.bucket, object.key + ))) + } + + async fn put_object( + &self, + bucket: &str, + key: &str, + body: Vec, + content_type: Option, + ) -> std::result::Result { + let checksum = sha256_hex(&body); + let now = now_timestamp(); + let object = common::ObjectMetadata { + bucket: bucket.into(), + key: key.into(), + version_id: None, + size: body.len() as u64, + checksum: checksum.clone(), + content_type, + etag: Some(checksum.clone()), + storage_class: common::StorageClass::ColdPending as i32, + archive_id: None, + tape_id: None, + tape_set: vec![], + tape_block_offset: None, + restore_status: None, + restore_expire_at: None, + created_at: Some(now), + updated_at: Some(now), + }; + let mut client = self.metadata.clone(); + client.put_object(Request::new(object)).await?; + Ok(PutObjectResponse { + etag: checksum, + version_id: String::new(), + }) + } + + async fn delete_object(&self, bucket: &str, key: &str) -> std::result::Result<(), Status> { + let mut client = self.metadata.clone(); + client + .delete_object(Request::new( + coldstore_proto::metadata::DeleteObjectRequest { + bucket: bucket.into(), + key: key.into(), + }, + )) + .await?; + Ok(()) + } + + async fn restore_object( + &self, + bucket: &str, + key: &str, + days: u32, + _tier: common::RestoreTier, + ) -> std::result::Result { + let mut client = self.metadata.clone(); + let object = client + .get_object(Request::new(coldstore_proto::metadata::GetObjectRequest { + bucket: bucket.into(), + key: key.into(), + })) + .await? + .into_inner(); + + if object.storage_class != common::StorageClass::Cold as i32 { + return Err(Status::failed_precondition( + "restore_object requires an archived COLD object in phase-1 metadata-backed mode", + )); + } + + let restore_status = object + .restore_status + .and_then(|status| common::RestoreStatus::try_from(status).ok()); + + match restore_status { + Some(common::RestoreStatus::RestoreCompleted) => { + Ok(RestoreObjectResponse { status_code: 200 }) + } + Some( + common::RestoreStatus::RestorePending + | common::RestoreStatus::RestoreWaitingForMedia + | common::RestoreStatus::RestoreInProgress, + ) => Ok(RestoreObjectResponse { status_code: 202 }), + Some(common::RestoreStatus::RestoreExpired | common::RestoreStatus::RestoreFailed) => { + Err(Status::failed_precondition( + "restore_object cannot reopen expired or failed restores in phase-1 metadata-backed mode", + )) + } + Some(common::RestoreStatus::Unspecified) | None => { + client + .update_restore_status(Request::new( + coldstore_proto::metadata::UpdateRestoreStatusRequest { + bucket: bucket.into(), + key: key.into(), + status: common::RestoreStatus::RestorePending as i32, + expire_at: Some(days_from_now(days.max(1))), + }, + )) + .await?; + Ok(RestoreObjectResponse { status_code: 202 }) + } + } + } + + async fn list_objects( + &self, + bucket: &str, + prefix: Option<&str>, + marker: Option<&str>, + max_keys: u32, + ) -> std::result::Result, Status> { + let mut client = self.metadata.clone(); + Ok(client + .list_objects(Request::new( + coldstore_proto::metadata::ListObjectsRequest { + bucket: bucket.into(), + prefix: prefix.map(str::to_owned), + marker: marker.map(str::to_owned), + max_keys, + }, + )) + .await? + .into_inner() + .objects) + } +} + pub struct SchedulerServiceImpl { _state: Arc, + backend: Arc, } impl SchedulerServiceImpl { pub fn new(state: Arc) -> Self { - Self { _state: state } + let backend = Arc::new(MetadataBackedSchedulerBackend::new(state.metadata.clone())); + Self { + _state: state, + backend, + } + } + + pub fn new_with_backend( + state: Arc, + backend: Arc, + ) -> Self { + Self { + _state: state, + backend, + } + } +} + +#[cfg(test)] +fn phase1_unimplemented(op: &str) -> Status { + Status::unimplemented(format!( + "{op} is not implemented in phase-1 safe mode; use unit-tested metadata/cache services only" + )) +} + +fn sha256_hex(body: &[u8]) -> String { + let mut hasher = Sha256::new(); + hasher.update(body); + format!("{:x}", hasher.finalize()) +} + +fn now_timestamp() -> Timestamp { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default(); + Timestamp { + seconds: now.as_secs() as i64, + nanos: now.subsec_nanos() as i32, + } +} + +fn days_from_now(days: u32) -> Timestamp { + let now = chrono::Utc::now() + chrono::Duration::days(days as i64); + Timestamp { + seconds: now.timestamp(), + nanos: now.timestamp_subsec_nanos() as i32, + } +} + +fn build_restore_info( + restore_status: Option, + restore_expire_at: Option<&Timestamp>, +) -> Option { + match restore_status.and_then(|status| common::RestoreStatus::try_from(status).ok()) { + Some(common::RestoreStatus::RestoreInProgress) + | Some(common::RestoreStatus::RestorePending) + | Some(common::RestoreStatus::RestoreWaitingForMedia) => { + Some("ongoing-request=\"true\"".into()) + } + Some(common::RestoreStatus::RestoreCompleted) => { + if let Some(expire_at) = restore_expire_at { + Some(format!( + "ongoing-request=\"false\", expiry-ts=\"{}\"", + expire_at.seconds + )) + } else { + Some("ongoing-request=\"false\"".into()) + } + } + _ => None, + } +} + +fn build_head_object_response(object: &common::ObjectMetadata) -> HeadObjectResponse { + HeadObjectResponse { + content_length: object.size, + content_type: object.content_type.clone(), + etag: object.etag.clone().unwrap_or_default(), + storage_class: object.storage_class, + restore_info: build_restore_info(object.restore_status, object.restore_expire_at.as_ref()), + last_modified: object.updated_at, + } +} + +fn build_get_object_meta(object: &common::ObjectMetadata) -> GetObjectMeta { + GetObjectMeta { + content_length: object.size, + content_type: object.content_type.clone(), + etag: object.etag.clone().unwrap_or_default(), + storage_class: object.storage_class, + restore_info: build_restore_info(object.restore_status, object.restore_expire_at.as_ref()), + last_modified: object.updated_at, + } +} + +fn build_object_entry(object: &common::ObjectMetadata) -> ObjectEntry { + ObjectEntry { + key: object.key.clone(), + last_modified: object.updated_at, + etag: object.etag.clone().unwrap_or_default(), + size: object.size, + storage_class: storage_class_label(object.storage_class).into(), + } +} + +fn build_bucket_entry(bucket: &common::BucketInfo) -> BucketEntry { + BucketEntry { + name: bucket.name.clone(), + creation_date: bucket.created_at, + } +} + +fn storage_class_label(storage_class: i32) -> &'static str { + match common::StorageClass::try_from(storage_class).ok() { + Some(common::StorageClass::ColdPending) => "COLD_PENDING", + Some(common::StorageClass::Cold) => "COLD", + _ => "UNKNOWN", } } @@ -18,90 +402,826 @@ impl SchedulerServiceImpl { impl SchedulerService for SchedulerServiceImpl { async fn put_object( &self, - _request: Request>, + request: Request>, ) -> std::result::Result, Status> { - // 1. 从 stream 接收元数据和数据 - // 2. 写入 Metadata (PutObject, storage_class = ColdPending) - // 3. 将数据暂存到 Cache Worker (PutStaging) - // 4. 返回 ETag - todo!() + let mut stream = request.into_inner(); + let mut meta: Option = None; + let mut body = Vec::new(); + while let Some(chunk) = stream.message().await? { + match chunk.payload { + Some(put_object_request::Payload::Meta(m)) => meta = Some(m), + Some(put_object_request::Payload::Data(bytes)) => body.extend_from_slice(&bytes), + None => return Err(Status::invalid_argument("empty put_object chunk")), + } + } + let meta = meta.ok_or_else(|| Status::invalid_argument("missing put_object metadata"))?; + if meta.content_length != body.len() as u64 { + return Err(Status::invalid_argument( + "content_length does not match body size", + )); + } + let response = self + .backend + .put_object(&meta.bucket, &meta.key, body, meta.content_type) + .await?; + Ok(Response::new(response)) } - type GetObjectStream = - tokio_stream::wrappers::ReceiverStream>; + type GetObjectStream = ReceiverStream>; async fn get_object( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - // 1. 查询 Metadata (HeadObject) - // 2. 检查 storage_class 和 restore_status - // 3. 若 Cold + Completed: 从 Cache Worker 读取数据 - // 4. 若 Cold + 未解冻: 返回 InvalidObjectState - // 5. Stream 返回数据 - todo!() + let request = request.into_inner(); + let (object, data) = self + .backend + .get_object(&request.bucket, &request.key) + .await?; + let (tx, rx) = mpsc::channel(8); + tokio::spawn(async move { + let _ = tx + .send(Ok(GetObjectResponse { + payload: Some(get_object_response::Payload::Meta(build_get_object_meta( + &object, + ))), + })) + .await; + let _ = tx + .send(Ok(GetObjectResponse { + payload: Some(get_object_response::Payload::Data(data)), + })) + .await; + }); + Ok(Response::new(ReceiverStream::new(rx))) } async fn head_object( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - // 查询 Metadata, 生成 x-amz-restore 头信息 - todo!() + let request = request.into_inner(); + let object = self + .backend + .head_object(&request.bucket, &request.key) + .await?; + Ok(Response::new(build_head_object_response(&object))) } async fn delete_object( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - // 1. 删除 Metadata 中的对象 - // 2. 清理 Cache Worker 中的缓存/暂存数据 - todo!() + let request = request.into_inner(); + self.backend + .delete_object(&request.bucket, &request.key) + .await?; + Ok(Response::new(())) } async fn restore_object( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - // 1. 检查对象是否为 Cold 状态 - // 2. 线性读检查是否已有进行中的 Restore - // 3. 创建 RecallTask 写入 Metadata - // 4. 返回 202 Accepted - todo!() + let request = request.into_inner(); + let tier = common::RestoreTier::try_from(request.tier) + .map_err(|_| Status::invalid_argument("invalid restore tier"))?; + let response = self + .backend + .restore_object(&request.bucket, &request.key, request.days, tier) + .await?; + Ok(Response::new(response)) } async fn list_objects( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + let request = request.into_inner(); + let objects = self + .backend + .list_objects( + &request.bucket, + request.prefix.as_deref(), + request.marker.as_deref(), + request.max_keys, + ) + .await?; + let is_truncated = request.max_keys > 0 && objects.len() > request.max_keys as usize; + let next_marker = if is_truncated { + objects + .get(request.max_keys as usize - 1) + .map(|object| object.key.clone()) + } else { + None + }; + Ok(Response::new(ListObjectsResponse { + bucket: request.bucket, + prefix: request.prefix, + marker: request.marker, + next_marker, + max_keys: request.max_keys, + is_truncated, + contents: objects + .into_iter() + .take(if request.max_keys == 0 { + usize::MAX + } else { + request.max_keys as usize + }) + .map(|object| build_object_entry(&object)) + .collect(), + common_prefixes: vec![], + })) } async fn create_bucket( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + self.backend + .create_bucket(&request.into_inner().bucket) + .await?; + Ok(Response::new(())) } async fn delete_bucket( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + self.backend + .delete_bucket(&request.into_inner().bucket) + .await?; + Ok(Response::new(())) } async fn head_bucket( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - todo!() + self.backend + .head_bucket(&request.into_inner().bucket) + .await?; + Ok(Response::new(())) } async fn list_buckets( &self, _request: Request<()>, ) -> std::result::Result, Status> { - todo!() + let buckets = self.backend.list_buckets().await?; + Ok(Response::new(ListBucketsResponse { + buckets: buckets.iter().map(build_bucket_entry).collect(), + })) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use coldstore_common::config::{MetadataConfig, SchedulerConfig}; + use coldstore_metadata::service::MetadataServiceImpl; + use coldstore_proto::metadata::metadata_service_server::MetadataServiceServer; + use tokio::sync::oneshot; + use tokio::time::{sleep, Duration}; + use tokio_stream::StreamExt; + use tonic::transport::Server; + + #[derive(Default)] + struct InMemoryBackend { + buckets: RwLock>, + objects: RwLock)>>, + } + + impl InMemoryBackend { + fn with_fixture() -> Self { + let bucket = common::BucketInfo { + name: "docs".into(), + created_at: Some(Timestamp { + seconds: 5, + nanos: 0, + }), + owner: None, + versioning_enabled: false, + object_count: 1, + total_size: 42, + }; + let object = common::ObjectMetadata { + bucket: "docs".into(), + key: "readme.txt".into(), + version_id: None, + size: 42, + checksum: "sum".into(), + content_type: Some("text/plain".into()), + etag: Some("etag-1".into()), + storage_class: common::StorageClass::Cold as i32, + archive_id: Some("archive-1".into()), + tape_id: Some("tape-1".into()), + tape_set: vec!["tape-1".into()], + tape_block_offset: Some(1), + restore_status: Some(common::RestoreStatus::RestoreCompleted as i32), + restore_expire_at: Some(Timestamp { + seconds: 123, + nanos: 0, + }), + created_at: Some(Timestamp { + seconds: 1, + nanos: 0, + }), + updated_at: Some(Timestamp { + seconds: 2, + nanos: 0, + }), + }; + let mut objects = HashMap::new(); + objects.insert("docs/readme.txt".into(), (object, b"hello world".to_vec())); + Self { + buckets: RwLock::new(vec![bucket]), + objects: RwLock::new(objects), + } + } + } + + #[tonic::async_trait] + impl Phase1SchedulerBackend for InMemoryBackend { + async fn list_buckets(&self) -> std::result::Result, Status> { + Ok(self.buckets.read().unwrap().clone()) + } + async fn create_bucket(&self, bucket: &str) -> std::result::Result<(), Status> { + let mut buckets = self.buckets.write().unwrap(); + if buckets.iter().any(|b| b.name == bucket) { + return Err(Status::already_exists("bucket exists")); + } + buckets.push(common::BucketInfo { + name: bucket.into(), + created_at: None, + owner: None, + versioning_enabled: false, + object_count: 0, + total_size: 0, + }); + Ok(()) + } + async fn delete_bucket(&self, bucket: &str) -> std::result::Result<(), Status> { + let mut buckets = self.buckets.write().unwrap(); + let before = buckets.len(); + buckets.retain(|b| b.name != bucket); + if buckets.len() == before { + Err(Status::not_found("bucket missing")) + } else { + Ok(()) + } + } + async fn head_bucket(&self, bucket: &str) -> std::result::Result<(), Status> { + if self + .buckets + .read() + .unwrap() + .iter() + .any(|b| b.name == bucket) + { + Ok(()) + } else { + Err(Status::not_found("bucket missing")) + } + } + async fn head_object( + &self, + bucket: &str, + key: &str, + ) -> std::result::Result { + self.objects + .read() + .unwrap() + .get(&format!("{bucket}/{key}")) + .map(|(o, _)| o.clone()) + .ok_or_else(|| Status::not_found("object missing")) + } + async fn get_object( + &self, + bucket: &str, + key: &str, + ) -> std::result::Result<(common::ObjectMetadata, Vec), Status> { + self.objects + .read() + .unwrap() + .get(&format!("{bucket}/{key}")) + .cloned() + .ok_or_else(|| Status::not_found("object missing")) + } + async fn put_object( + &self, + bucket: &str, + key: &str, + body: Vec, + content_type: Option, + ) -> std::result::Result { + let object = common::ObjectMetadata { + bucket: bucket.into(), + key: key.into(), + version_id: None, + size: body.len() as u64, + checksum: "sum".into(), + content_type, + etag: Some("etag-put".into()), + storage_class: common::StorageClass::ColdPending as i32, + archive_id: None, + tape_id: None, + tape_set: vec![], + tape_block_offset: None, + restore_status: None, + restore_expire_at: None, + created_at: Some(Timestamp { + seconds: 10, + nanos: 0, + }), + updated_at: Some(Timestamp { + seconds: 10, + nanos: 0, + }), + }; + self.objects + .write() + .unwrap() + .insert(format!("{bucket}/{key}"), (object, body)); + Ok(PutObjectResponse { + etag: "etag-put".into(), + version_id: "v1".into(), + }) + } + async fn delete_object(&self, bucket: &str, key: &str) -> std::result::Result<(), Status> { + if self + .objects + .write() + .unwrap() + .remove(&format!("{bucket}/{key}")) + .is_some() + { + Ok(()) + } else { + Err(Status::not_found("object missing")) + } + } + async fn restore_object( + &self, + bucket: &str, + key: &str, + _days: u32, + _tier: common::RestoreTier, + ) -> std::result::Result { + let mut objects = self.objects.write().unwrap(); + let (object, _) = objects + .get_mut(&format!("{bucket}/{key}")) + .ok_or_else(|| Status::not_found("object missing"))?; + let status_code = + if object.restore_status == Some(common::RestoreStatus::RestoreCompleted as i32) { + 200 + } else { + 202 + }; + object.restore_status = Some(common::RestoreStatus::RestoreCompleted as i32); + object.restore_expire_at = Some(Timestamp { + seconds: 999, + nanos: 0, + }); + Ok(RestoreObjectResponse { status_code }) + } + async fn list_objects( + &self, + bucket: &str, + prefix: Option<&str>, + marker: Option<&str>, + _max_keys: u32, + ) -> std::result::Result, Status> { + let prefix = prefix.unwrap_or_default(); + let marker = marker.unwrap_or_default(); + let mut objects: Vec<_> = self + .objects + .read() + .unwrap() + .values() + .map(|(o, _)| o.clone()) + .filter(|o| o.bucket == bucket) + .filter(|o| o.key.starts_with(prefix)) + .filter(|o| o.key.as_str() > marker) + .collect(); + objects.sort_by(|a, b| a.key.cmp(&b.key)); + Ok(objects) + } + } + + fn service() -> SchedulerServiceImpl { + let state = Arc::new(SchedulerState { + metadata: + coldstore_proto::metadata::metadata_service_client::MetadataServiceClient::new( + tonic::transport::Channel::from_static("http://127.0.0.1:1").connect_lazy(), + ), + cache: None, + tape: None, + config: SchedulerConfig::default(), + }); + SchedulerServiceImpl::new_with_backend(state, Arc::new(InMemoryBackend::with_fixture())) + } + + #[test] + fn helper_head_object_response_contains_restore_info() { + let (object, _) = InMemoryBackend::with_fixture() + .objects + .read() + .unwrap() + .get("docs/readme.txt") + .unwrap() + .clone(); + let response = build_head_object_response(&object); + assert_eq!(response.content_length, 42); + assert_eq!(response.etag, "etag-1"); + assert_eq!( + response.restore_info.as_deref(), + Some("ongoing-request=\"false\", expiry-ts=\"123\"") + ); + } + + #[tokio::test] + async fn get_object_stream_uses_backend() { + let mut stream = service() + .get_object(Request::new(GetObjectRequest { + bucket: "docs".into(), + key: "readme.txt".into(), + version_id: None, + })) + .await + .unwrap() + .into_inner(); + let first = stream.next().await.unwrap().unwrap(); + match first.payload { + Some(get_object_response::Payload::Meta(meta)) => assert_eq!(meta.etag, "etag-1"), + _ => panic!("expected meta"), + } + let second = stream.next().await.unwrap().unwrap(); + match second.payload { + Some(get_object_response::Payload::Data(bytes)) => assert_eq!(bytes, b"hello world"), + _ => panic!("expected data"), + } + } + + #[tokio::test] + async fn delete_object_uses_backend() { + let svc = service(); + svc.delete_object(Request::new(DeleteObjectRequest { + bucket: "docs".into(), + key: "readme.txt".into(), + version_id: None, + })) + .await + .unwrap(); + assert!(svc + .head_object(Request::new(HeadObjectRequest { + bucket: "docs".into(), + key: "readme.txt".into(), + version_id: None + })) + .await + .is_err()); + } + + #[tokio::test] + async fn restore_object_uses_backend() { + let response = service() + .restore_object(Request::new(RestoreObjectRequest { + bucket: "docs".into(), + key: "readme.txt".into(), + version_id: None, + days: 2, + tier: common::RestoreTier::Standard as i32, + })) + .await + .unwrap() + .into_inner(); + assert_eq!(response.status_code, 200); + } + + #[tokio::test] + async fn list_buckets_uses_backend() { + let response = service() + .list_buckets(Request::new(())) + .await + .unwrap() + .into_inner(); + assert_eq!(response.buckets.len(), 1); + assert_eq!(response.buckets[0].name, "docs"); + } + + #[tokio::test] + async fn create_bucket_uses_backend() { + let svc = service(); + svc.create_bucket(Request::new(CreateBucketRequest { + bucket: "new-bucket".into(), + })) + .await + .unwrap(); + let response = svc + .list_buckets(Request::new(())) + .await + .unwrap() + .into_inner(); + assert!(response.buckets.iter().any(|b| b.name == "new-bucket")); + } + + #[tokio::test] + async fn delete_bucket_uses_backend() { + let svc = service(); + svc.delete_bucket(Request::new(DeleteBucketRequest { + bucket: "docs".into(), + })) + .await + .unwrap(); + let response = svc + .list_buckets(Request::new(())) + .await + .unwrap() + .into_inner(); + assert!(!response.buckets.iter().any(|b| b.name == "docs")); + } + + #[tokio::test] + async fn head_bucket_uses_backend() { + assert!(service() + .head_bucket(Request::new(HeadBucketRequest { + bucket: "docs".into() + })) + .await + .is_ok()); + } + + #[tokio::test] + async fn head_object_uses_backend() { + let response = service() + .head_object(Request::new(HeadObjectRequest { + bucket: "docs".into(), + key: "readme.txt".into(), + version_id: None, + })) + .await + .unwrap() + .into_inner(); + assert_eq!(response.content_length, 42); + assert_eq!(response.etag, "etag-1"); + } + + #[tokio::test] + async fn list_objects_uses_backend() { + let response = service() + .list_objects(Request::new(ListObjectsRequest { + bucket: "docs".into(), + prefix: Some("read".into()), + marker: None, + delimiter: None, + max_keys: 100, + })) + .await + .unwrap() + .into_inner(); + assert_eq!(response.contents.len(), 1); + assert_eq!(response.contents[0].key, "readme.txt"); + assert_eq!(response.contents[0].storage_class, "COLD"); + } + + async fn metadata_backed_service() -> ( + SchedulerServiceImpl, + Arc, + oneshot::Sender<()>, + ) { + let metadata = MetadataServiceImpl::new(&MetadataConfig::default()) + .await + .expect("metadata service init"); + let listener = std::net::TcpListener::bind("127.0.0.1:0").expect("bind test listener"); + let addr = listener.local_addr().expect("listener addr"); + drop(listener); + + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + tokio::spawn(async move { + Server::builder() + .add_service(MetadataServiceServer::new(metadata)) + .serve_with_shutdown(addr, async { + let _ = shutdown_rx.await; + }) + .await + .expect("metadata server should run"); + }); + + let mut metadata_client = None; + for _ in 0..20 { + match coldstore_proto::metadata::metadata_service_client::MetadataServiceClient::connect( + format!("http://{addr}"), + ) + .await + { + Ok(client) => { + metadata_client = Some(client); + break; + } + Err(_) => sleep(Duration::from_millis(25)).await, + } + } + let metadata = metadata_client.expect("connect metadata client"); + let state = Arc::new(SchedulerState { + metadata, + cache: None, + tape: None, + config: SchedulerConfig::default(), + }); + (SchedulerServiceImpl::new(state.clone()), state, shutdown_tx) + } + + #[tokio::test] + async fn default_service_uses_metadata_for_bucket_ops() { + let (svc, _state, shutdown_tx) = metadata_backed_service().await; + + svc.create_bucket(Request::new(CreateBucketRequest { + bucket: "phase1-bucket".into(), + })) + .await + .expect("create bucket through metadata-backed scheduler"); + + let buckets = svc + .list_buckets(Request::new(())) + .await + .expect("list buckets through metadata-backed scheduler") + .into_inner(); + assert!(buckets + .buckets + .iter() + .any(|bucket| bucket.name == "phase1-bucket")); + + shutdown_tx.send(()).ok(); + } + + #[tokio::test] + async fn default_service_uses_metadata_for_object_metadata_ops() { + let (svc, state, shutdown_tx) = metadata_backed_service().await; + + svc.create_bucket(Request::new(CreateBucketRequest { + bucket: "docs".into(), + })) + .await + .expect("create bucket"); + + let mut metadata = state.metadata.clone(); + metadata + .put_object(Request::new(common::ObjectMetadata { + bucket: "docs".into(), + key: "guide.txt".into(), + version_id: None, + size: 5, + checksum: "seed-checksum".into(), + content_type: Some("text/plain".into()), + etag: Some("seed-etag".into()), + storage_class: common::StorageClass::ColdPending as i32, + archive_id: None, + tape_id: None, + tape_set: vec![], + tape_block_offset: None, + restore_status: None, + restore_expire_at: None, + created_at: Some(Timestamp { + seconds: 10, + nanos: 0, + }), + updated_at: Some(Timestamp { + seconds: 10, + nanos: 0, + }), + })) + .await + .expect("seed object in metadata"); + + let head = svc + .head_object(Request::new(HeadObjectRequest { + bucket: "docs".into(), + key: "guide.txt".into(), + version_id: None, + })) + .await + .expect("head object") + .into_inner(); + assert_eq!(head.content_length, 5); + assert_eq!(head.storage_class, common::StorageClass::ColdPending as i32); + + let list = svc + .list_objects(Request::new(ListObjectsRequest { + bucket: "docs".into(), + prefix: Some("gui".into()), + marker: None, + delimiter: None, + max_keys: 10, + })) + .await + .expect("list objects") + .into_inner(); + assert_eq!(list.contents.len(), 1); + assert_eq!(list.contents[0].key, "guide.txt"); + + let mut metadata = state.metadata.clone(); + metadata + .update_storage_class(Request::new( + coldstore_proto::metadata::UpdateStorageClassRequest { + bucket: "docs".into(), + key: "guide.txt".into(), + storage_class: common::StorageClass::Cold as i32, + }, + )) + .await + .expect("mark object as cold"); + + let restore = svc + .restore_object(Request::new(RestoreObjectRequest { + bucket: "docs".into(), + key: "guide.txt".into(), + version_id: None, + days: 3, + tier: common::RestoreTier::Standard as i32, + })) + .await + .expect("restore object") + .into_inner(); + assert_eq!(restore.status_code, 202); + + let restored = svc + .head_object(Request::new(HeadObjectRequest { + bucket: "docs".into(), + key: "guide.txt".into(), + version_id: None, + })) + .await + .expect("head restored object") + .into_inner(); + assert_eq!( + restored.restore_info.as_deref(), + Some("ongoing-request=\"true\"") + ); + + svc.delete_object(Request::new(DeleteObjectRequest { + bucket: "docs".into(), + key: "guide.txt".into(), + version_id: None, + })) + .await + .expect("delete object through metadata-backed scheduler"); + + assert!(svc + .head_object(Request::new(HeadObjectRequest { + bucket: "docs".into(), + key: "guide.txt".into(), + version_id: None, + })) + .await + .is_err()); + + shutdown_tx.send(()).ok(); + } + + #[tokio::test] + async fn default_metadata_backend_puts_object_and_reports_cache_gap() { + let (_svc, state, shutdown_tx) = metadata_backed_service().await; + let backend = MetadataBackedSchedulerBackend::new(state.metadata.clone()); + + backend + .create_bucket("docs") + .await + .expect("create bucket through metadata backend"); + + let put = backend + .put_object( + "docs", + "guide.txt", + b"hello".to_vec(), + Some("text/plain".into()), + ) + .await + .expect("put object through metadata backend"); + assert!(!put.etag.is_empty()); + + let listed = backend + .list_objects("docs", Some("gui"), None, 10) + .await + .expect("list objects through metadata backend"); + assert_eq!(listed.len(), 1); + assert_eq!(listed[0].etag.as_deref(), Some(put.etag.as_str())); + + let err = backend + .get_object("docs", "guide.txt") + .await + .expect_err("get_object should explain that cache is still not wired"); + assert_eq!(err.code(), tonic::Code::FailedPrecondition); + assert!(err.message().contains("cache wiring")); + + shutdown_tx.send(()).ok(); + } + + #[test] + fn phase1_unimplemented_message_is_stable() { + let status = phase1_unimplemented("scheduler.list_buckets"); + assert_eq!(status.code(), tonic::Code::Unimplemented); + assert!(status.message().contains("phase-1 safe mode")); } } diff --git a/crates/tape/src/service.rs b/crates/tape/src/service.rs index 7432f48..86f6a90 100644 --- a/crates/tape/src/service.rs +++ b/crates/tape/src/service.rs @@ -16,82 +16,98 @@ impl TapeServiceImpl { } } +fn phase1_unimplemented(op: &str) -> Status { + Status::unimplemented(format!( + "{op} is not implemented in phase-1 safe mode; no tape devices are accessed during unit-test runs" + )) +} + #[tonic::async_trait] impl TapeService for TapeServiceImpl { async fn write_bundle( &self, _req: Request>, ) -> std::result::Result, Status> { - todo!() + Err(phase1_unimplemented("tape.write_bundle")) } type ReadBundleStream = tokio_stream::wrappers::ReceiverStream>; + async fn read_bundle( &self, _req: Request, ) -> std::result::Result, Status> { - todo!() + Err(phase1_unimplemented("tape.read_bundle")) } async fn list_drives( &self, _req: Request<()>, ) -> std::result::Result, Status> { - todo!() + Err(phase1_unimplemented("tape.list_drives")) } + async fn get_drive_status( &self, _req: Request, ) -> std::result::Result, Status> { - todo!() + Err(phase1_unimplemented("tape.get_drive_status")) } + async fn acquire_drive( &self, _req: Request, ) -> std::result::Result, Status> { - todo!() + Err(phase1_unimplemented("tape.acquire_drive")) } + async fn release_drive( &self, _req: Request, ) -> std::result::Result, Status> { - todo!() + Err(phase1_unimplemented("tape.release_drive")) } + async fn load_tape( &self, _req: Request, ) -> std::result::Result, Status> { - todo!() + Err(phase1_unimplemented("tape.load_tape")) } + async fn unload_tape( &self, _req: Request, ) -> std::result::Result, Status> { - todo!() + Err(phase1_unimplemented("tape.unload_tape")) } + async fn rewind( &self, _req: Request, ) -> std::result::Result, Status> { - todo!() + Err(phase1_unimplemented("tape.rewind")) } + async fn seek_to_filemark( &self, _req: Request, ) -> std::result::Result, Status> { - todo!() + Err(phase1_unimplemented("tape.seek_to_filemark")) } + async fn get_tape_media_status( &self, _req: Request, ) -> std::result::Result, Status> { - todo!() + Err(phase1_unimplemented("tape.get_tape_media_status")) } + async fn inventory( &self, _req: Request<()>, ) -> std::result::Result, Status> { - todo!() + Err(phase1_unimplemented("tape.inventory")) } } diff --git a/docs/DESIGN.md b/docs/DESIGN.md index 2002973..0c6c796 100644 --- a/docs/DESIGN.md +++ b/docs/DESIGN.md @@ -10,8 +10,25 @@ > **模块设计文档**:各层独立设计详见 [modules/](modules/) 目录。 1. **协议层**:兼容 S3 冷归档协议(Glacier 语义) -2. **集群元数据**:基于 [OpenRaft](https://github.com/databendlabs/openraft) + RocksDB 持久化 -3. **数据缓存层**:基于 [async-spdk](https://github.com/madsys-dev/async-spdk) 的高性能用户态缓存 +2. **集群元数据**:长期目标为 [OpenRaft](https://github.com/databendlabs/openraft) + RocksDB 持久化 +3. **数据缓存层**:长期目标为 [async-spdk](https://github.com/madsys-dev/async-spdk) 用户态高性能缓存 + +### 1.1 当前实现分期与文档纠偏 + +为了避免“设计已完整、实现仍是骨架”带来的偏差,本文档将系统能力分为两个层次: + +| 分期 | 状态 | 说明 | +|------|------|------| +| **Phase 1(当前代码基线)** | 已落地 / 可单测 | Metadata 使用内存状态机;Cache 使用 HDD 后端 + 进程内索引;Scheduler/Tape 对未完成路径返回显式 `UNIMPLEMENTED`;仅保证编译、静态检查与单元测试 | +| **Phase 2(目标架构)** | 设计目标 | Metadata 切换到 OpenRaft + RocksDB;Cache 支持 SPDK;Tape 接入真实 SCSI/LTFS/厂商 SDK;补齐跨进程集成测试 | + +Phase 1 的设计原则: +- 不访问真实磁带设备 +- 不启动真实分布式集群 +- 不执行会改变主机外部环境的集成测试 +- 先把协议对象、状态流转、缓存读写、元数据 CRUD 做成可验证的本地能力 + +因此,本文档中凡涉及 Raft、RocksDB、SPDK、真实带库控制的内容,均视为 **目标态设计**;凡涉及内存版 Metadata、HDD Cache、显式 `UNIMPLEMENTED` 边界的内容,均视为 **当前实现态设计**。 ### 1.1 核心技术选型(已确定) @@ -98,8 +115,14 @@ Gateway 仅连接 Scheduler Worker,Console 仅连接 Metadata。 ### 2.3 交互模式 -**Scheduler Worker 是唯一的元数据读写入口**(Console 管控除外)。 -Gateway、Cache Worker、Tape Worker 均不直接访问 Metadata。 +**目标态**中,Scheduler Worker 是唯一的元数据业务读写入口(Console 管控除外),Gateway、Cache Worker、Tape Worker 均不直接访问 Metadata。 + +**当前实现态(Phase 1)**中,真正落地并可被单元测试验证的是: +- MetadataService 的进程内内存状态机实现 +- CacheService 的 HDD 本地缓存实现 +- Scheduler/Tape 的显式未实现边界 + +也就是说,当前代码基线验证的是“协议和状态语义正确性”,不是“完整分布式系统联调”。 ``` Console ──管控读写──▶ Metadata @@ -220,9 +243,27 @@ ColdStore 是纯冷归档系统(类似 AWS Glacier Deep Archive),所有对 --- -## 4. 集群元数据:Raft + RocksDB 设计 +## 4. 集群元数据:Phase 1 当前实现与 Phase 2 目标设计 + +### 4.0 当前实现态(Phase 1) -### 4.1 设计目标 +当前代码中,MetadataService 采用内存状态机实现,提供以下可单测能力: +- Bucket CRUD +- Object CRUD / List / Head +- StorageClass / RestoreStatus / ArchiveLocation 更新 +- ArchiveBundle / ArchiveTask / RecallTask / TapeInfo 的基础 CRUD +- Worker 注册、状态更新与心跳 +- ClusterInfo 的静态/内存视图输出 + +该实现的目的不是替代最终的分布式元数据集群,而是: +- 提前冻结 protobuf 契约 +- 验证状态流转和对象模型 +- 为 Scheduler / Gateway / Console 后续对接提供稳定 mockable backend +- 在不引入 RocksDB、Raft、网络联调的情况下保证核心语义可测 + +### 4.1 目标设计(Phase 2) + +### 4.2 设计目标 - **强一致性**:元数据读写通过 Raft 达成共识 - **高可用**:多数节点存活即可服务 @@ -332,9 +373,26 @@ enum ColdStoreRequest { --- -## 5. 数据缓存层:async-spdk 设计 +## 5. 数据缓存层:Phase 1 当前实现与 Phase 2 目标设计 + +### 5.0 当前实现态(Phase 1) + +当前代码中,CacheService 先落地 HDD 后端,职责包括: +- 暂存 PutObject 数据(staging) +- 保存恢复后的 restored 数据 +- 通过进程内索引实现 Contains / Get / GetStaging / Delete / DeleteStaging / Stats +- 启动时扫描本地元数据文件并重建索引 + +当前实现明确不包含: +- SPDK bdev / blobstore +- 真正的缓存淘汰策略调度线程 +- 多节点缓存一致性 + +当前阶段的核心目标是确保缓存协议、对象索引、流式读写与统计逻辑正确。 + +### 5.1 目标设计(Phase 2) -### 5.1 设计目标 +### 5.2 设计目标 - **高性能**:用户态 I/O,绕过内核,降低延迟 - **高吞吐**:Poll 模式、零拷贝,充分发挥 NVMe 性能 diff --git a/docs/plans/2026-04-15-phase1-safe-implementation-plan.md b/docs/plans/2026-04-15-phase1-safe-implementation-plan.md new file mode 100644 index 0000000..835219d --- /dev/null +++ b/docs/plans/2026-04-15-phase1-safe-implementation-plan.md @@ -0,0 +1,39 @@ +# ColdStore Phase-1 Safe Implementation Plan + +> For Hermes: follow strict TDD where practical and keep execution limited to unit tests, fmt, clippy, and compile checks. Do not run integration tests or commands that touch real tape devices or host services. + +Goal: make the repository materially more real without risking the host environment by implementing an in-memory metadata service, a functional HDD-backed cache service, safer non-panicking placeholders for unimplemented external-service paths, richer design docs, and Makefile targets that clearly separate unit-only verification from future integration work. + +Architecture: +1. Metadata service becomes the source of truth for unit-testable CRUD/state-transition behavior using an in-memory state store behind async locks. +2. Cache service becomes a real local data/cache layer using the existing HDD backend plus an in-memory secondary index for object lookup, streaming, deletion, and stats. +3. Scheduler/tape/gateway keep distributed boundaries intact but stop using todo!() in externally callable handlers where practical, returning explicit gRPC errors instead of panicking when phase-2 features are not implemented yet. +4. Design docs are corrected to distinguish phase-1 safe local implementation from phase-2 distributed Raft/tape integration. + +Tech stack: Rust, Tokio, tonic/prost, serde, chrono/prost_types timestamps, filesystem-backed HDD cache, unit tests only. + +Planned file scope: +- Modify: docs/DESIGN.md +- Create: docs/plans/2026-04-15-phase1-safe-implementation-plan.md +- Modify/Create under crates/metadata/src/: service.rs plus in-memory store helpers/tests +- Modify/Create under crates/cache/src/: service.rs plus index/helpers/tests +- Modify: crates/cache/src/backend.rs +- Modify: crates/cache/src/hdd.rs +- Modify: crates/common/src/models.rs if state-transition helpers need tests or small corrections +- Modify: crates/scheduler/src/service.rs (replace panicking todo! with safe explicit unimplemented paths or implement minimal pure helpers if small) +- Modify: crates/tape/src/service.rs (same principle) +- Modify: Makefile + +Verification commands (unit-only): +- cargo fmt --all -- --check +- cargo clippy --workspace --all-targets -- -D warnings +- cargo test --workspace --lib --bins +- make unit +- make check-safe + +Non-goals for this session: +- No Raft/OpenRaft integration +- No RocksDB integration +- No real tape device access +- No end-to-end multi-process integration tests +- No host-environment mutation outside local build/test artifacts