Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ templates/
.env.registry
docs/guides/miden-dashboard/operators.json
docs/guides/miden-dashboard/guardian-dashboard/
docs/guides/horizontal-scaling/operators.json
docs/guides/horizontal-scaling/docker-compose.override.yml

.cursor/
.claude/
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ url = "2.5"
zeroize = { version = "1.7", features = ["derive"] }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tokio-util = { version = "0.7", features = ["rt"] }
tonic = { workspace = true }
tonic-prost = { workspace = true }
tonic-reflection = "0.14"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
DROP INDEX IF EXISTS auth_sessions_realm_expires_idx;
DROP INDEX IF EXISTS auth_sessions_expires_idx;
DROP TABLE IF EXISTS auth_sessions;
21 changes: 21 additions & 0 deletions crates/server/migrations/2026-06-23-000001_auth_sessions/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- Shared operator/EVM session store for horizontal scaling (issue #242).
-- Sessions move out of per-process memory so a session issued on one replica
-- is honored on every replica. Keyed by the SHA-256 digest of the session
-- token (the plaintext token is never stored). The primary key is composite on
-- (realm, token_digest) so operator and EVM sessions share one table with the
-- realm boundary enforced by the database, not merely by token randomness.

CREATE TABLE auth_sessions (
realm TEXT NOT NULL,
token_digest BYTEA NOT NULL,
subject JSONB NOT NULL,
issued_at TIMESTAMPTZ NOT NULL,
expires_at TIMESTAMPTZ NOT NULL,
-- Set on logout; the row is kept until natural expiry so the revocation
-- is honored fleet-wide for as long as the token would have been valid.
revoked_at TIMESTAMPTZ NULL,
PRIMARY KEY (realm, token_digest)
);
Comment thread
coderabbitai[bot] marked this conversation as resolved.

CREATE INDEX auth_sessions_expires_idx ON auth_sessions (expires_at);
CREATE INDEX auth_sessions_realm_expires_idx ON auth_sessions (realm, expires_at);
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
DROP INDEX IF EXISTS auth_challenges_expires_idx;
DROP INDEX IF EXISTS auth_challenges_realm_principal_idx;
DROP TABLE IF EXISTS auth_challenges;
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- Shared operator/EVM login-challenge store for horizontal scaling (issue #242).
-- A challenge issued on one replica must be verifiable on another. Realm-aware
-- so the two verification models coexist: `challenge_key` is the operator
-- signing-digest hex or the EVM nonce, and `payload` carries the realm-specific
-- fields needed to match/recover at verify time. Matching runs in Rust (Falcon
-- verify / ECDSA recover); the store provides the candidates and the single-use
-- claim.

CREATE TABLE auth_challenges (
realm TEXT NOT NULL,
challenge_key TEXT NOT NULL,
principal TEXT NOT NULL,
payload JSONB NOT NULL,
issued_at TIMESTAMPTZ NOT NULL,
expires_at TIMESTAMPTZ NOT NULL,
consumed_at TIMESTAMPTZ NULL,
PRIMARY KEY (realm, challenge_key)
);

CREATE INDEX auth_challenges_realm_principal_idx ON auth_challenges (realm, principal);
CREATE INDEX auth_challenges_expires_idx ON auth_challenges (expires_at);
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS worker_leases;
14 changes: 14 additions & 0 deletions crates/server/migrations/2026-06-24-000001_worker_leases/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-- Single-owner coordination for background workers under horizontal scaling
-- (issue #242, subsumes #190). At most one replica holds a named lease at a
-- time; the holder renews on a heartbeat and a stale lease can be reclaimed by
-- another replica once it expires. `fence_token` increments only on a change of
-- holder (steal), so a superseded holder can be detected at its write boundary.

CREATE TABLE worker_leases (
lease_name TEXT PRIMARY KEY,
holder_id TEXT NOT NULL,
acquired_at TIMESTAMPTZ NOT NULL,
renewed_at TIMESTAMPTZ NOT NULL,
expires_at TIMESTAMPTZ NOT NULL,
fence_token BIGINT NOT NULL DEFAULT 0
);
18 changes: 3 additions & 15 deletions crates/server/src/ack/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub mod miden_falcon_rpo;
mod secrets_manager;

use crate::delta_object::DeltaObject;
use crate::error::{GuardianError, Result};
use crate::error::Result;
use guardian_shared::SignatureScheme;
use miden_protocol::crypto::dsa::ecdsa_k256_keccak::SigningKey as EcdsaSecretKey;
use std::path::{Path, PathBuf};
Expand All @@ -25,9 +25,6 @@ pub(crate) use miden_ecdsa::{
};
pub use miden_falcon_rpo::MidenFalconRpoSigner;

const ENV_GUARDIAN_ENV: &str = "GUARDIAN_ENV";
const PROD_ENV: &str = "prod";

/// The ECDSA signer is abstracted over [`EcdsaSignerBackend`] so its key can live
/// in a hosted backend (e.g. AWS KMS); Falcon stays concrete because hosted
/// backends only support the secp256k1 ECDSA scheme.
Expand All @@ -40,7 +37,7 @@ pub struct AckRegistry {
impl AckRegistry {
pub async fn new(keystore_path: PathBuf) -> Result<Self> {
let ecdsa_backend = EcdsaBackendKind::from_env()?;
if is_prod_environment()? {
if crate::config::stage::is_prod()? {
let provider = AwsSecretsManagerProvider::from_env().await?;
Self::from_provider(keystore_path, ecdsa_backend, Some(&provider)).await
} else {
Expand Down Expand Up @@ -135,19 +132,10 @@ async fn build_ecdsa_signer<P: AckSecretProvider>(
Ok(MidenEcdsaSigner::new(backend))
}

fn is_prod_environment() -> Result<bool> {
match std::env::var(ENV_GUARDIAN_ENV) {
Ok(value) => Ok(value.eq_ignore_ascii_case(PROD_ENV)),
Err(std::env::VarError::NotPresent) => Ok(false),
Err(std::env::VarError::NotUnicode(_)) => Err(GuardianError::ConfigurationError(format!(
"{ENV_GUARDIAN_ENV} must contain valid UTF-8"
))),
}
}

#[cfg(all(test, not(any(feature = "integration", feature = "e2e"))))]
mod tests {
use super::*;
use crate::error::GuardianError;
use async_trait::async_trait;
use miden_keystore::{EcdsaKeyStore, FilesystemEcdsaKeyStore, FilesystemKeyStore, KeyStore};
use miden_protocol::crypto::dsa::falcon512_poseidon2::SecretKey as FalconSecretKey;
Expand Down
16 changes: 12 additions & 4 deletions crates/server/src/api/dashboard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,23 +173,31 @@ pub async fn verify_operator_login(
security(("operator_session" = [])),
responses(
(status = 200, description = "Session invalidated", body = LogoutOperatorResponse),
(status = 500, description = "Session revocation failed", body = crate::openapi::ApiErrorResponse),
)
)]
pub async fn logout_operator(
State(state): State<AppState>,
headers: HeaderMap,
) -> impl IntoResponse {
) -> Result<(
StatusCode,
[(header::HeaderName, String); 1],
Json<LogoutOperatorResponse>,
)> {
let token = extract_cookie(&headers, state.dashboard.cookie_name());
// Fail closed: if the shared session store cannot revoke (e.g. Postgres is
// unavailable), surface the error so the caller can retry instead of being
// told a logout succeeded that did not take effect fleet-wide.
state
.dashboard
.logout(token.as_deref(), state.clock.now())
.await;
.await?;

(
Ok((
StatusCode::OK,
[(header::SET_COOKIE, state.dashboard.clear_cookie_header())],
Json(LogoutOperatorResponse { success: true }),
)
))
}

/// Paginated list of accounts visible to the operator. Requires the
Expand Down
5 changes: 4 additions & 1 deletion crates/server/src/api/evm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,18 +173,21 @@ pub async fn verify_evm_session(
security(("evm_session" = [])),
responses(
(status = 200, description = "Session invalidated", body = LogoutResponse),
(status = 500, description = "Session revocation failed", body = crate::openapi::ApiErrorResponse),
)
)]
pub async fn logout_evm_session(
State(state): State<AppState>,
headers: HeaderMap,
) -> Result<([(header::HeaderName, String); 1], Json<LogoutResponse>)> {
let token = extract_cookie(&headers, state.evm.sessions.cookie_name());
// Fail closed: a revoke failure (e.g. shared store outage) is surfaced so the
// caller can retry rather than believing the session was invalidated.
state
.evm
.sessions
.logout(token.as_deref(), state.clock.now())
.await;
.await?;
Ok((
[(header::SET_COOKIE, state.evm.sessions.clear_cookie_header())],
Json(LogoutResponse { success: true }),
Expand Down
35 changes: 34 additions & 1 deletion crates/server/src/builder/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use crate::state::AppState;
/// Provides methods to run the server with the configured settings.
pub struct ServerHandle {
pub(crate) app_state: AppState,
pub(crate) leader: std::sync::Arc<dyn crate::coordination::LeaderElector>,
pub(crate) startup_info: StartupInfo,
pub(crate) cors_layer: Option<CorsLayer>,
pub(crate) rate_limit_config: Option<RateLimitConfig>,
Expand Down Expand Up @@ -127,13 +128,15 @@ impl ServerHandle {
// Start background jobs based on canonicalization config
if self.app_state.canonicalization.is_some() {
tracing::info!("Starting canonicalization worker");
start_canonicalization_worker(self.app_state.clone());
start_canonicalization_worker(self.app_state.clone(), self.leader.clone());
} else {
tracing::info!(
"Running in optimistic mode - deltas accepted without on-chain verification"
);
}

start_session_sweep_worker(self.app_state.clone());

// Start HTTP server if enabled
if self.http_enabled {
let state = self.app_state.clone();
Expand Down Expand Up @@ -368,3 +371,33 @@ impl ServerHandle {
}
}
}

const SESSION_SWEEP_INTERVAL_SECS: u64 = 60;

/// Periodically reclaim expired operator sessions/challenges from the
/// coordination store. Expiry is enforced on read regardless; this only frees
/// rows (Postgres) or memory (in-memory).
fn start_session_sweep_worker(state: AppState) {
tokio::spawn(async move {
let mut ticker =
tokio::time::interval(std::time::Duration::from_secs(SESSION_SWEEP_INTERVAL_SECS));
loop {
ticker.tick().await;
if let Err(error) = state.dashboard.sweep_expired(state.clock.now()).await {
tracing::warn!(
target: "dashboard.session_sweep",
%error,
"operator session/challenge sweep failed",
);
}
#[cfg(feature = "evm")]
if let Err(error) = state.evm.sessions.sweep_expired(state.clock.now()).await {
tracing::warn!(
target: "evm.session_sweep",
%error,
"EVM session/challenge sweep failed",
);
}
}
});
}
Loading
Loading