diff --git a/nativelink-config/examples/mongo.json5 b/nativelink-config/examples/mongo.json5 index 74d2168f1..98b62965d 100644 --- a/nativelink-config/examples/mongo.json5 +++ b/nativelink-config/examples/mongo.json5 @@ -93,6 +93,23 @@ worker_timeout_s: 300, }, }, + { + // Example of a scheduler using MongoDB backend for state management + name: "MONGO_BACKED_SCHEDULER", + simple: { + supported_platform_properties: { + cpu_arch: "minimum", + OS: "exact", + }, + max_job_retries: 3, + worker_timeout_s: 300, + experimental_backend: { + mongo: { + mongo_store: "MONGO_SCHEDULER", + }, + }, + }, + }, ], servers: [ { diff --git a/nativelink-config/src/schedulers.rs b/nativelink-config/src/schedulers.rs index 51fb47c55..56637677a 100644 --- a/nativelink-config/src/schedulers.rs +++ b/nativelink-config/src/schedulers.rs @@ -138,6 +138,8 @@ pub enum ExperimentalSimpleSchedulerBackend { Memory, /// Use a redis store for the scheduler. Redis(ExperimentalRedisSchedulerBackend), + /// Use a mongodb store for the scheduler. + Mongo(ExperimentalMongoSchedulerBackend), } #[derive(Deserialize, Debug, Default)] @@ -148,6 +150,14 @@ pub struct ExperimentalRedisSchedulerBackend { pub redis_store: StoreRefName, } +#[derive(Deserialize, Debug, Default)] +#[serde(deny_unknown_fields)] +pub struct ExperimentalMongoSchedulerBackend { + /// A reference to the mongo store to use for the scheduler. + /// Note: This MUST resolve to a `ExperimentalMongoSpec`. + pub mongo_store: StoreRefName, +} + /// A scheduler that simply forwards requests to an upstream scheduler. This /// is useful to use when doing some kind of local action cache or CAS away from /// the main cluster of workers. In general, it's more efficient to point the diff --git a/nativelink-scheduler/src/default_scheduler_factory.rs b/nativelink-scheduler/src/default_scheduler_factory.rs index 3e90b3440..f68c38fea 100644 --- a/nativelink-scheduler/src/default_scheduler_factory.rs +++ b/nativelink-scheduler/src/default_scheduler_factory.rs @@ -21,6 +21,7 @@ use nativelink_config::schedulers::{ use nativelink_config::stores::EvictionPolicy; use nativelink_error::{Error, ResultExt, make_input_err}; use nativelink_proto::com::github::trace_machina::nativelink::events::OriginEvent; +use nativelink_store::mongo_store::ExperimentalMongoStore; use nativelink_store::redis_store::RedisStore; use nativelink_store::store_manager::StoreManager; use nativelink_util::instant_wrapper::InstantWrapper; @@ -150,6 +151,40 @@ fn simple_scheduler_factory( ); Ok((Some(action_scheduler), Some(worker_scheduler))) } + ExperimentalSimpleSchedulerBackend::Mongo(mongo_config) => { + let store = store_manager + .get_store(mongo_config.mongo_store.as_ref()) + .err_tip(|| { + format!( + "'mongo_store': '{}' does not exist", + mongo_config.mongo_store + ) + })?; + let task_change_notify = Arc::new(Notify::new()); + let store = store + .into_inner() + .as_any_arc() + .downcast::() + .map_err(|_| { + make_input_err!( + "Could not downcast to mongo store in MongoAwaitedActionDb::new" + ) + })?; + let awaited_action_db = StoreAwaitedActionDb::new( + store, + task_change_notify.clone(), + now_fn, + Default::default, + ) + .err_tip(|| "In state_manager_factory::mongo_state_manager")?; + let (action_scheduler, worker_scheduler) = SimpleScheduler::new( + spec, + awaited_action_db, + task_change_notify, + maybe_origin_event_tx.cloned(), + ); + Ok((Some(action_scheduler), Some(worker_scheduler))) + } } }