Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,60 @@ pub struct Options {
)]
pub hot_tier_storage_path: Option<PathBuf>,

#[arg(
long = "hot-tier-download-chunk-size",
env = "P_HOT_TIER_DOWNLOAD_CHUNK_SIZE",
value_parser = clap::value_parser!(u64).range(5242880..),
default_value = "8388608",
help = "Chunk size in bytes for parallel hot tier downloads (default 8 MiB)"
)]
pub hot_tier_download_chunk_size: u64,

#[arg(
long = "hot-tier-download-concurrency",
env = "P_HOT_TIER_DOWNLOAD_CONCURRENCY",
value_parser = clap::value_parser!(u64).range(1..),
default_value = "16",
help = "Number of concurrent range requests per hot tier download"
)]
pub hot_tier_download_concurrency: u64,
Comment thread
parmesant marked this conversation as resolved.

#[arg(
long = "hot-tier-files-per-stream-concurrency",
env = "P_HOT_TIER_FILES_PER_STREAM_CONCURRENCY",
value_parser = clap::value_parser!(u32).range(1..),
default_value = "4",
help = "Number of concurrent parquet file downloads per stream during hot tier sync"
)]
pub hot_tier_files_per_stream_concurrency: u32,

#[arg(
long = "hot-tier-latest-minutes",
env = "P_HOT_TIER_LATEST_MINUTES",
value_parser = clap::value_parser!(u64).range(1..),
default_value = "10",
help = "Files whose timestamp is within the last N minutes are 'latest'; rest are 'historic'."
)]
pub hot_tier_latest_minutes: u64,

#[arg(
long = "hot-tier-per-tick-cap",
env = "P_HISTORIC_PER_TICK_CAP",
value_parser = clap::value_parser!(u32).range(10..),
default_value = "100",
help = "Maximum files to download per historic tick."
)]
pub historic_per_tick_cap: u32,

#[arg(
long = "hot-tier-historic-sync-minutes",
env = "P_HOT_TIER_HISTORIC_SYNC_MINUTES",
value_parser = clap::value_parser!(u32).range(1..),
default_value = "5",
help = "Interval (minutes) at which the historic hot-tier sync runs."
)]
pub hot_tier_historic_sync_minutes: u32,
Comment thread
coderabbitai[bot] marked this conversation as resolved.

//TODO: remove this when smart cache is implemented
#[arg(
long = "index-storage-path",
Expand Down
54 changes: 43 additions & 11 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use self::error::StreamError;
use super::cluster::utils::{IngestionStats, QueriedStats, StorageStats};
use super::query::update_schema_when_distributed;
use crate::event::format::override_data_type;
use crate::hottier::{CURRENT_HOT_TIER_VERSION, HotTierManager, StreamHotTier};
use crate::hottier::{CURRENT_HOT_TIER_VERSION, GLOBAL_HOTTIER, StreamHotTier};
use crate::metadata::SchemaVersion;
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
use crate::parseable::{DEFAULT_TENANT, PARSEABLE, StreamNotFound};
Expand All @@ -47,7 +47,7 @@ use itertools::Itertools;
use serde_json::{Value, json};
use std::fs;
use std::sync::Arc;
use tracing::warn;
use tracing::{Instrument, warn};

pub async fn delete(
req: HttpRequest,
Expand Down Expand Up @@ -77,7 +77,7 @@ pub async fn delete(
)
}

if let Some(hot_tier_manager) = HotTierManager::global()
if let Some(hot_tier_manager) = GLOBAL_HOTTIER.get()
&& hot_tier_manager.check_stream_hot_tier_exists(&stream_name, &tenant_id)
{
hot_tier_manager
Expand Down Expand Up @@ -413,16 +413,25 @@ pub async fn get_stream_info(
Ok((web::Json(stream_info), StatusCode::OK))
}

#[tracing::instrument(
name = "http.put_stream_hot_tier",
skip(req, logstream, hottier),
fields(stream = tracing::field::Empty, tenant = tracing::field::Empty, size = hottier.size)
)]
pub async fn put_stream_hot_tier(
req: HttpRequest,
logstream: Path<String>,
Json(mut hottier): Json<StreamHotTier>,
) -> Result<impl Responder, StreamError> {
let stream_name = logstream.into_inner();
let tenant_id = get_tenant_id_from_request(&req);
let current_span = tracing::Span::current();
current_span
.record("stream", tracing::field::display(&stream_name))
.record("tenant", tracing::field::debug(&tenant_id));
// For query mode, if the stream not found in memory map,
//check if it exists in the storage
//create stream and schema from storage
// check if it exists in the storage
// create stream and schema from storage
if !PARSEABLE
.check_or_load_stream(&stream_name, &tenant_id)
.await
Expand All @@ -441,16 +450,14 @@ pub async fn put_stream_hot_tier(

validator::hot_tier(&hottier.size.to_string())?;

// TODO tenants
stream.set_hot_tier(Some(hottier.clone()));
let Some(hot_tier_manager) = HotTierManager::global() else {
let Some(hot_tier_manager) = GLOBAL_HOTTIER.get() else {
return Err(StreamError::HotTierNotEnabled(stream_name));
};
let existing_hot_tier_used_size = hot_tier_manager
.validate_hot_tier_size(&stream_name, hottier.size, &tenant_id)
.await?;
hottier.used_size = existing_hot_tier_used_size;
hottier.available_size = hottier.size;
hottier.available_size = hottier.size.saturating_sub(existing_hot_tier_used_size);
hottier.version = Some(CURRENT_HOT_TIER_VERSION.to_string());
hot_tier_manager
.put_hot_tier(&stream_name, &mut hottier, &tenant_id)
Expand All @@ -469,19 +476,34 @@ pub async fn put_stream_hot_tier(
.metastore
.put_stream_json(&stream_metadata, &stream_name, &tenant_id)
.await?;
stream.set_hot_tier(Some(hottier.clone()));
let stream = stream_name.clone();
let tenant = tenant_id.clone();
hot_tier_manager
.spawn_stream_task(stream, tenant)
.instrument(current_span)
.await;

Ok((
format!("hot tier set for stream {stream_name}"),
StatusCode::OK,
))
}

#[tracing::instrument(
name = "http.get_stream_hot_tier",
skip(req, logstream),
fields(stream = tracing::field::Empty, tenant = tracing::field::Empty)
)]
pub async fn get_stream_hot_tier(
req: HttpRequest,
logstream: Path<String>,
) -> Result<impl Responder, StreamError> {
let stream_name = logstream.into_inner();
let tenant_id = get_tenant_id_from_request(&req);
tracing::Span::current()
.record("stream", tracing::field::display(&stream_name))
.record("tenant", tracing::field::debug(&tenant_id));
// For query mode, if the stream not found in memory map,
//check if it exists in the storage
//create stream and schema from storage
Expand All @@ -492,7 +514,7 @@ pub async fn get_stream_hot_tier(
return Err(StreamNotFound(stream_name.clone()).into());
}

let Some(hot_tier_manager) = HotTierManager::global() else {
let Some(hot_tier_manager) = GLOBAL_HOTTIER.get() else {
return Err(StreamError::HotTierNotEnabled(stream_name));
};
let meta = hot_tier_manager
Expand All @@ -502,12 +524,21 @@ pub async fn get_stream_hot_tier(
Ok((web::Json(meta), StatusCode::OK))
}

#[tracing::instrument(
name = "http.delete_stream_hot_tier",
skip(req, logstream),
fields(stream = tracing::field::Empty, tenant = tracing::field::Empty)
)]
pub async fn delete_stream_hot_tier(
req: HttpRequest,
logstream: Path<String>,
) -> Result<impl Responder, StreamError> {
let stream_name = logstream.into_inner();
let tenant_id = get_tenant_id_from_request(&req);
let current_span = tracing::Span::current();
current_span
.record("stream", tracing::field::display(&stream_name))
.record("tenant", tracing::field::debug(&tenant_id));
// For query mode, if the stream not found in memory map,
//check if it exists in the storage
//create stream and schema from storage
Expand All @@ -529,12 +560,13 @@ pub async fn delete_stream_hot_tier(
});
}

let Some(hot_tier_manager) = HotTierManager::global() else {
let Some(hot_tier_manager) = GLOBAL_HOTTIER.get() else {
return Err(StreamError::HotTierNotEnabled(stream_name));
};

hot_tier_manager
.delete_hot_tier(&stream_name, &tenant_id)
.instrument(tracing::Span::current())
.await?;

let mut stream_metadata: ObjectStoreFormat = serde_json::from_slice(
Expand Down
24 changes: 7 additions & 17 deletions src/handlers/http/modal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::{
alerts::{ALERTS, get_alert_manager, target::TARGETS},
cli::Options,
correlation::CORRELATIONS,
hottier::{HotTierManager, StreamHotTier},
hottier::{GLOBAL_HOTTIER, HotTierManager, StreamHotTier},
metastore::metastore_traits::MetastoreObject,
oauth::{OAuthProvider, connect_oidc},
option::Mode,
Expand Down Expand Up @@ -160,6 +160,11 @@ pub trait ParseableServer {
// Shutdown resource monitor
let _ = resource_shutdown_tx.send(());

// Shutdown hottier
if let Some(htm) = GLOBAL_HOTTIER.get() {
htm.abort_all().await;
}

// Initiate graceful shutdown
info!("Graceful shutdown of HTTP server triggered");
srv_handle.stop(true).await;
Expand Down Expand Up @@ -627,7 +632,7 @@ pub type PrismMetadata = NodeMetadata;
/// in their stream metadata but don't have local hot tier metadata files yet.
/// This function is called once during query server startup.
pub async fn initialize_hot_tier_metadata_on_startup(
hot_tier_manager: &HotTierManager,
hot_tier_manager: &'static HotTierManager,
) -> anyhow::Result<()> {
// Collect hot tier configurations from streams before doing async operations
let hot_tier_configs: Vec<(String, Option<String>, StreamHotTier)> = {
Expand All @@ -653,21 +658,6 @@ pub async fn initialize_hot_tier_metadata_on_startup(
})
})
.collect()
// let streams_guard = PARSEABLE.streams.read().unwrap();
// streams_guard
// .iter()
// .filter_map(|(stream_name, stream)| {
// // Skip if hot tier metadata file already exists for this stream
// if hot_tier_manager.check_stream_hot_tier_exists(stream_name) {
// return None;
// }

// // Get the hot tier configuration from the in-memory stream metadata
// stream
// .get_hot_tier()
// .map(|config| (stream_name.clone(), config))
// })
// .collect()
};

for (stream_name, tenant_id, hot_tier_config) in hot_tier_configs {
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/http/modal/query/querier_logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use tracing::{error, warn};
pub static CREATE_STREAM_LOCK: Mutex<()> = Mutex::const_new(());

use crate::handlers::http::middleware::{CLUSTER_SECRET, CLUSTER_SECRET_HEADER};
use crate::hottier::GLOBAL_HOTTIER;
use crate::parseable::DEFAULT_TENANT;
use crate::utils::get_user_from_request;
use crate::{
Expand All @@ -47,7 +48,6 @@ use crate::{
modal::{NodeMetadata, NodeType},
},
},
hottier::HotTierManager,
parseable::{PARSEABLE, StreamNotFound},
stats,
storage::{ObjectStoreFormat, StreamType},
Expand Down Expand Up @@ -85,7 +85,7 @@ pub async fn delete(
)
}

if let Some(hot_tier_manager) = HotTierManager::global()
if let Some(hot_tier_manager) = GLOBAL_HOTTIER.get()
&& hot_tier_manager.check_stream_hot_tier_exists(&stream_name, &tenant_id)
{
hot_tier_manager
Expand Down
34 changes: 27 additions & 7 deletions src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,18 @@ use crate::handlers::http::middleware::{DisAllowRootUser, RouteExt};
use crate::handlers::http::modal::initialize_hot_tier_metadata_on_startup;
use crate::handlers::http::{base_path, prism_base_path};
use crate::handlers::http::{rbac, role};
use crate::hottier::GLOBAL_HOTTIER;
use crate::hottier::HotTierManager;
use crate::hottier::HotTierMessage;
use crate::hottier::hottier_runtime;
use crate::rbac::role::Action;
use crate::{analytics, migration, storage, sync};
use actix_web::web::{ServiceConfig, resource};
use actix_web::{Scope, web};
use actix_web_prometheus::PrometheusMetrics;
use async_trait::async_trait;
use bytes::Bytes;
use tokio::sync::mpsc;
use tokio::sync::{OnceCell, oneshot};

use crate::Server;
Expand Down Expand Up @@ -126,13 +130,29 @@ impl ParseableServer for QueryServer {
analytics::init_analytics_scheduler()?;
}

if let Some(hot_tier_manager) = HotTierManager::global() {
// Initialize hot tier metadata files for streams that have hot tier configuration
// but don't have local hot tier metadata files yet
if let Err(e) = initialize_hot_tier_metadata_on_startup(hot_tier_manager).await {
tracing::warn!("Failed to initialize hot tier metadata on startup: {}", e);
}
hot_tier_manager.download_from_s3()?;
// Initialize hot tier metadata files for streams that have hot tier configuration
// but don't have local hot tier metadata files yet
if let Some(htm) = PARSEABLE
.options
.hot_tier_storage_path
.as_ref()
.map(|hot_tier_path| {
// start hottier runtime
let (sender, receiver): (
mpsc::UnboundedSender<HotTierMessage>,
mpsc::UnboundedReceiver<HotTierMessage>,
) = mpsc::unbounded_channel();
std::thread::spawn(|| hottier_runtime(receiver));

// set global hottier
GLOBAL_HOTTIER.get_or_init(|| HotTierManager::new(hot_tier_path, sender))
})
{
// init hottier meta
if let Err(e) = initialize_hot_tier_metadata_on_startup(htm).await {
tracing::error!("Unable to init hottier meta- {e}");
};
htm.start_all_tasks().await;
};

// Run sync on a background thread
Expand Down
Loading
Loading