Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 2 additions & 5 deletions backend/src/api_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub enum ApiError {
PayloadTooLarge(String),

/// Client has exceeded the configured rate limit for the endpoint.
#[error("Rate limit exceeded. Please slow down and retry after the indicated period.")]
#[error("Rate limit exceeded: {0}")]
TooManyRequests(String),
}

Expand Down Expand Up @@ -194,10 +194,7 @@ impl IntoResponse for ApiError {
}
Self::Timeout => {
tracing::warn!(error_code = "TIMEOUT", "Request timeout");
crate::error_tracking::capture_message(
"Request timed out",
sentry::Level::Warning,
);
crate::error_tracking::capture_message("Request timed out", sentry::Level::Warning);
(StatusCode::GATEWAY_TIMEOUT, self.to_string())
}
Self::ServiceUnavailable(msg) => {
Expand Down
21 changes: 7 additions & 14 deletions backend/src/app.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::validation::Path;
use axum::{
extract::{Query, State},
http::HeaderMap,
Expand All @@ -6,7 +7,6 @@ use axum::{
routing::{delete, get, post, put},
Json, Router,
};
use crate::validation::Path;
use metrics_exporter_prometheus::PrometheusHandle;
use serde_json::{json, Value};
use sqlx::PgPool;
Expand Down Expand Up @@ -2269,12 +2269,9 @@ async fn get_plan_events(
Path(plan_id): Path<Uuid>,
AuthenticatedUser(user): AuthenticatedUser,
) -> Result<Json<Value>, ApiError> {
let events = crate::will_events::WillEventService::get_plan_events(
&state.db,
plan_id,
user.user_id,
)
.await?;
let events =
crate::will_events::WillEventService::get_plan_events(&state.db, plan_id, user.user_id)
.await?;
Ok(Json(
json!({ "status": "success", "data": events, "count": events.len() }),
))
Expand Down Expand Up @@ -2988,13 +2985,9 @@ async fn get_collateral_value(
state.db.clone(),
3600,
));
let info = CollateralManagementService::get_collateral_value(
&state.db,
price_feed,
id,
user.user_id,
)
.await?;
let info =
CollateralManagementService::get_collateral_value(&state.db, price_feed, id, user.user_id)
.await?;
Ok(Json(json!({ "status": "success", "data": info })))
}

Expand Down
52 changes: 27 additions & 25 deletions backend/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use axum::{extract::State, Json};
use bcrypt::verify;
use chrono::{DateTime, Duration, Utc};
use hex;
use jsonwebtoken::{encode, decode, EncodingKey, Header, Validation};
use jsonwebtoken::{decode, encode, EncodingKey, Header, Validation};
use ring::signature;
use serde::{Deserialize, Serialize};
use sqlx::FromRow;
Expand Down Expand Up @@ -98,9 +98,11 @@ pub async fn web3_login(
let public_key_bytes = {
// Enforce strict Stellar address validation
if !payload.wallet_address.starts_with('G') || payload.wallet_address.len() != 56 {
return Err(ApiError::BadRequest("Invalid Stellar address format".to_string()));
return Err(ApiError::BadRequest(
"Invalid Stellar address format".to_string(),
));
}

let strkey = Strkey::from_string(&payload.wallet_address)
.map_err(|_| ApiError::BadRequest("Invalid Stellar address".to_string()))?;

Expand Down Expand Up @@ -513,17 +515,17 @@ where
}
let token = auth_header.strip_prefix("Bearer ").unwrap();
let mut validation = Validation::default();
// Ensure expiration is always validated
validation.validate_exp = true;
validation.required_spec_claims.insert("exp".to_string());
let claims: UserClaims = decode(
token,
&jsonwebtoken::DecodingKey::from_secret(config.jwt_secret.as_bytes()),
&validation,
)
.map_err(|_| ApiError::Unauthorized)?
.claims;
// Ensure expiration is always validated
validation.validate_exp = true;
validation.required_spec_claims.insert("exp".to_string());

let claims: UserClaims = decode(
token,
&jsonwebtoken::DecodingKey::from_secret(config.jwt_secret.as_bytes()),
&validation,
)
.map_err(|_| ApiError::Unauthorized)?
.claims;
return Ok(AuthenticatedUser(claims));
}

Expand Down Expand Up @@ -570,17 +572,17 @@ where
}
let token = auth_header.strip_prefix("Bearer ").unwrap();
let mut validation = Validation::default();
// Ensure expiration is always validated
validation.validate_exp = true;
validation.required_spec_claims.insert("exp".to_string());
let claims: AdminClaims = decode(
token,
&jsonwebtoken::DecodingKey::from_secret(config.jwt_secret.as_bytes()),
&validation,
)
.map_err(|_| ApiError::Unauthorized)?
.claims;
// Ensure expiration is always validated
validation.validate_exp = true;
validation.required_spec_claims.insert("exp".to_string());

let claims: AdminClaims = decode(
token,
&jsonwebtoken::DecodingKey::from_secret(config.jwt_secret.as_bytes()),
&validation,
)
.map_err(|_| ApiError::Unauthorized)?
.claims;
return Ok(AuthenticatedAdmin(claims));
}

Expand Down
4 changes: 2 additions & 2 deletions backend/src/bin/test_stmt_timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use std::time::Instant;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let database_url = std::env::var("DATABASE_URL")
.expect("DATABASE_URL must be set for this test");
let database_url =
std::env::var("DATABASE_URL").expect("DATABASE_URL must be set for this test");

// Create pool using env-configured settings (including DB_POOL_QUERY_TIMEOUT_SECS).
let pool = db::create_pool(&database_url).await?;
Expand Down
6 changes: 4 additions & 2 deletions backend/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,8 @@ impl CacheService {

/// Invalidate all notification caches for a user.
pub async fn invalidate_notification_caches(&self, user_id: &str) -> Result<u64, ApiError> {
self.invalidate_prefix(&format!("notifications:{user_id}:")).await
self.invalidate_prefix(&format!("notifications:{user_id}:"))
.await
}

/// Invalidate all audit log caches.
Expand All @@ -383,7 +384,8 @@ impl CacheService {

/// Invalidate all collateral-related caches.
pub async fn invalidate_collateral_caches(&self, user_id: &str) -> Result<u64, ApiError> {
self.invalidate_prefix(&format!("collateral:{user_id}:")).await
self.invalidate_prefix(&format!("collateral:{user_id}:"))
.await
}

/// Invalidate price feed caches.
Expand Down
31 changes: 15 additions & 16 deletions backend/src/compliance.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::api_error::ApiError;
use crate::events::{EventType, LendingEvent};
use crate::external_integrations::{
AnchorIntegrationClient, ComplianceApiClient, SanctionsApiClient,
};
use crate::notifications::{
audit_action, entity_type, notif_type, AuditLogService, NotificationService,
};
use crate::events::{EventType, LendingEvent};
use async_trait::async_trait;
use chrono::{DateTime, Duration as ChronoDuration, Utc};
use once_cell::sync::OnceCell;
Expand All @@ -23,7 +23,8 @@ const ALERT_COOLDOWN_WINDOW: Duration = Duration::from_secs(300);
const EVENT_COMMIT_POLL_INTERVAL: Duration = Duration::from_millis(200);
const EVENT_COMMIT_MAX_RETRIES: usize = 5;

static REALTIME_COMPLIANCE_LISTENER: OnceCell<Arc<dyn RealtimeComplianceListener>> = OnceCell::new();
static REALTIME_COMPLIANCE_LISTENER: OnceCell<Arc<dyn RealtimeComplianceListener>> =
OnceCell::new();

#[async_trait]
pub trait RealtimeComplianceListener: Send + Sync {
Expand Down Expand Up @@ -158,12 +159,11 @@ impl ComplianceEngine {

async fn wait_for_event_commit(&self, event_id: Uuid) -> Result<bool, ApiError> {
for _ in 0..EVENT_COMMIT_MAX_RETRIES {
let exists: Option<bool> = sqlx::query_scalar(
"SELECT true FROM lending_events WHERE id = $1",
)
.bind(event_id)
.fetch_optional(&self.db)
.await?;
let exists: Option<bool> =
sqlx::query_scalar("SELECT true FROM lending_events WHERE id = $1")
.bind(event_id)
.fetch_optional(&self.db)
.await?;

if exists.is_some() {
return Ok(true);
Expand Down Expand Up @@ -263,12 +263,11 @@ impl ComplianceEngine {
None => return Ok(()),
};

let plan_created_at: Option<DateTime<Utc>> = sqlx::query_scalar(
"SELECT created_at FROM plans WHERE id = $1",
)
.bind(plan_id)
.fetch_optional(&self.db)
.await?;
let plan_created_at: Option<DateTime<Utc>> =
sqlx::query_scalar("SELECT created_at FROM plans WHERE id = $1")
.bind(plan_id)
.fetch_optional(&self.db)
.await?;

let plan_created_at = match plan_created_at {
Some(created_at) => created_at,
Expand Down Expand Up @@ -719,11 +718,11 @@ impl RealtimeComplianceListener for ComplianceEngine {
#[cfg(test)]
mod tests {
use super::*;
use rust_decimal_macros::dec;
use sqlx::PgPool;
use anyhow::anyhow;
use chrono::Utc;
use rust_decimal_macros::dec;
use serde_json::json;
use sqlx::PgPool;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{oneshot, Mutex};
Expand Down
15 changes: 7 additions & 8 deletions backend/src/csrf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,13 @@ async fn rotate_csrf_token(
db: &sqlx::PgPool,
old_token: &str,
) -> Result<(String, chrono::DateTime<chrono::Utc>), ()> {
let user_id: Option<Uuid> = sqlx::query_scalar(
"SELECT user_id FROM csrf_tokens WHERE token = $1 AND used = FALSE",
)
.bind(old_token)
.fetch_optional(db)
.await
.map_err(|_| ())?
.ok_or(())?;
let user_id: Option<Uuid> =
sqlx::query_scalar("SELECT user_id FROM csrf_tokens WHERE token = $1 AND used = FALSE")
.bind(old_token)
.fetch_optional(db)
.await
.map_err(|_| ())?
.ok_or(())?;

let new_token = generate_csrf_token();
let expires_at = Utc::now() + Duration::minutes(60);
Expand Down
Loading
Loading