Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions .env.example
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
# Non-sensitive info
RUST_LOG="info" # For application-level tracing - Consistent log level throughout application

DTIM__DEFAULT__ADDRESS="0.0.0.0"
DTIM__DEFAULT__PORT=3030
DTIM__DEFAULT__LOG_LEVEL="info"
DTIM__DEFAULT__LOG_LEVEL=${RUST_LOG} # Uses same LevelFilter as `RUST_LOG`

# Sensitive info (.env only)
DATABASE_URL="postgres://postgres:@localhost:5432/postgres" # For Diesel CLI
DATABASE_URL="postgres://postgres:@localhost:5432/postgres" # For Diesel CLI - Add password after 'postgres:' for production use

DTIM__DEFAULT__STORAGE__DATABASE_URL=${DATABASE_URL}
DTIM__DEFAULT__STORAGE__DATABASE_URL=${DATABASE_URL} # Uses same connection as Diesel CLI
DTIM__DEFAULT__WATCHERS__VIRUSTOTAL_API_KEY="your_api_key"
2 changes: 1 addition & 1 deletion diesel.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ file = "src/db/schema.rs"
custom_type_derives = ["diesel::query_builder::QueryId", "Clone"]

[migrations_directory]
dir = "/Users/caden/Developer/dtim/migrations"
dir = "migrations"
Original file line number Diff line number Diff line change
@@ -1 +1 @@
DROP TABLE encrypted_indicators;
DROP TABLE IF EXISTS encrypted_indicators CASCADE;
13 changes: 8 additions & 5 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,16 @@ async fn gossip_indicators_handler(
State(state): State<Arc<AppState>>,
Json(indicators): Json<Vec<EncryptedIndicator>>,
) -> ApiResponse<GossipIndicatorsResponse> {
let mut decrypted: Vec<ThreatIndicator> = indicators
.into_iter()
.filter_map(|enc| ThreatIndicator::decrypt(&enc, &state.key_mgr).ok())
.collect();
let mut node = state.node.lock().await;
let mut count = 0;
for encrypted in indicators {
if let Ok(indicator) = ThreatIndicator::decrypt(&encrypted, &state.key_mgr) {
node.add_or_increment_indicator(indicator)
.map_err(|_| ApiError::INTERNAL_SERVER_ERROR)?;
count += 1;
for indicator in decrypted.drain(..) {
match node.add_or_increment_indicator(indicator) {
Ok(_) => count += 1,
Err(_) => log::error!("DB insert failed"),
}
}
Ok((
Expand Down
19 changes: 16 additions & 3 deletions src/crypto/mesh_identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use aes_gcm::aead::OsRng;
use base64::{engine::general_purpose::STANDARD as BASE64_STANDARD, Engine as _};
use ed25519_dalek::{Signature, Signer, SigningKey, Verifier, VerifyingKey};
use sha2::{Digest as _, Sha256};
use std::fs::{self, File};
use std::fs::{self, File, OpenOptions};
use std::io::{Read, Write};
use std::os::unix::fs::OpenOptionsExt as _;
use std::path::Path;

pub const PRIVATE_KEY_PATH: &str = "data/keys/mesh.key";
Expand Down Expand Up @@ -48,8 +49,20 @@ impl MeshIdentity {
let mut csprng = OsRng;
let signing_key = SigningKey::generate(&mut csprng);
let verifying_key = signing_key.verifying_key();
File::create(PRIVATE_KEY_PATH)?.write_all(&signing_key.to_bytes())?;
File::create(PUBLIC_KEY_PATH)?.write_all(&verifying_key.to_bytes())?;
let mut priv_file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.mode(0o600)
.open(PRIVATE_KEY_PATH)?;
let mut pub_file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.mode(0o600)
.open(PUBLIC_KEY_PATH)?;
priv_file.write_all(&signing_key.to_bytes())?;
pub_file.write_all(&verifying_key.to_bytes())?;
Ok(())
}

Expand Down
59 changes: 34 additions & 25 deletions src/crypto/symmetric_key_manager.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use aes_gcm::aead::rand_core::RngCore;
use aes_gcm::aead::{Aead, KeyInit, OsRng};
use aes_gcm::{Aes256Gcm, Key, Nonce};
use std::fs::{self, File};
use std::fs::{self, File, OpenOptions};
use std::io::{Read, Write};
use std::os::unix::fs::OpenOptionsExt as _;
use std::path::Path;
use std::time::{SystemTime, UNIX_EPOCH};

Expand All @@ -26,14 +27,15 @@ impl SymmetricKeyManager {
let mut current_bytes = [0u8; 32];
let mut prev_bytes = [0u8; 32];

let current_key = if Path::new(SYMM_KEY_PATH).exists() {
File::open(SYMM_KEY_PATH)?.read_exact(&mut current_bytes)?;
let current_key = if Path::new(SYMM_KEY_PATH).exists()
&& File::open(SYMM_KEY_PATH)
.and_then(|mut f| f.read_exact(&mut current_bytes))
.is_ok()
{
Key::<Aes256Gcm>::from_slice(&current_bytes).to_owned()
} else {
let key = Self::generate_key();
let mut file = File::create(SYMM_KEY_PATH)?;
file.write_all(key.as_slice())?;
key
log::warn!("symm.key missing or invalid – generating new key");
Self::generate_key()?
};

let previous_key = if Path::new(SYMM_PREV_KEY_PATH).exists() {
Expand All @@ -56,24 +58,18 @@ impl SymmetricKeyManager {
})
}

fn generate_key() -> Key<Aes256Gcm> {
fn generate_key() -> std::io::Result<Key<Aes256Gcm>> {
let mut key_bytes = [0u8; 32];
OsRng.fill_bytes(&mut key_bytes);
Key::<Aes256Gcm>::from_slice(&key_bytes).to_owned()
}

pub fn save_keys(&self) -> std::io::Result<()> {
fs::create_dir_all("data/keys")?;
let mut file = File::create(SYMM_KEY_PATH)?;
file.write_all(self.current_key.as_slice())?;

if let Some(prev) = &self.previous_key {
let mut prev_file = File::create(SYMM_PREV_KEY_PATH)?;
prev_file.write_all(prev.as_slice())?;
} else if Path::new(SYMM_PREV_KEY_PATH).exists() {
fs::remove_file(SYMM_PREV_KEY_PATH)?;
}
Ok(())
let key = Key::<Aes256Gcm>::from_slice(&key_bytes).to_owned();
let mut file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.mode(0o600)
.open(SYMM_KEY_PATH)?;
file.write_all(key.as_slice())?;
Ok(key)
}

pub fn rotate_key(&mut self) -> std::io::Result<()> {
Expand All @@ -83,9 +79,19 @@ impl SymmetricKeyManager {
.as_secs();
if now - self.key_rotation_time >= self.rotation_interval {
self.previous_key = Some(self.current_key);
self.current_key = Self::generate_key();
self.current_key = Self::generate_key()?;
self.key_rotation_time = now;
self.save_keys()?;
if let Some(prev) = &self.previous_key {
let mut prev_file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.mode(0o600)
.open(SYMM_PREV_KEY_PATH)?;
prev_file.write_all(prev.as_slice())?;
} else if Path::new(SYMM_PREV_KEY_PATH).exists() {
fs::remove_file(SYMM_PREV_KEY_PATH)?;
}
}
Ok(())
}
Expand All @@ -112,6 +118,9 @@ impl SymmetricKeyManager {
tag: Vec<u8>,
) -> Result<Vec<u8>, String> {
ciphertext.extend_from_slice(&tag);
if nonce.len() != 12 {
return Err("Invalid nonce length".into());
}
let nonce = Nonce::from_slice(&nonce);
let try_decrypt = |key: &Key<Aes256Gcm>| {
let cipher = Aes256Gcm::new(key);
Expand Down
4 changes: 2 additions & 2 deletions src/logging.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use base64::prelude::BASE64_STANDARD;
use base64::Engine as _;
use chrono::Utc;
use log::{error, Level, LevelFilter, Metadata, Record};
use log::{error, Level, LevelFilter, Log, Metadata, Record};
use std::fs::{self, OpenOptions};
use std::io::Write as _;
use std::path::PathBuf;
Expand Down Expand Up @@ -106,7 +106,7 @@ impl EncryptedLogger {
}
}

impl log::Log for EncryptedLogger {
impl Log for EncryptedLogger {
fn enabled(&self, metadata: &Metadata) -> bool {
metadata.level() <= self.level
}
Expand Down
16 changes: 10 additions & 6 deletions src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ impl ThreatIndicator {
hasher.update(indicator_type.to_string().as_bytes());
hasher.update(value.as_bytes());
hasher.update(tlp.to_string().as_bytes());
for tag in tags {
let mut sorted_tags: Vec<&String> = tags.iter().collect();
sorted_tags.sort();
for tag in sorted_tags {
hasher.update(tag.as_bytes());
}
let hash = hasher.finalize();
Expand All @@ -83,7 +85,8 @@ impl ThreatIndicator {
&self,
key_mgr: &mut SymmetricKeyManager,
) -> Result<EncryptedIndicator, std::io::Error> {
let serialized = serde_json::to_vec(self).expect("Failed to serialize ThreatIndicator");
let serialized = serde_json::to_vec(self)
.map_err(|e| std::io::Error::other(format!("Serialization failed: {e}")))?;
let (ciphertext, nonce, mac) = key_mgr
.encrypt(&serialized)
.map_err(|e| std::io::Error::other(format!("Encryption failed: {}", e)))?;
Expand All @@ -100,17 +103,18 @@ impl ThreatIndicator {
pub fn decrypt(
encrypted: &EncryptedIndicator,
key_mgr: &SymmetricKeyManager,
) -> Result<Self, String> {
) -> Result<Self, std::io::Error> {
let decrypted = key_mgr
.decrypt(
encrypted.ciphertext.clone(),
encrypted.nonce.clone(),
encrypted.mac.clone(),
)
.map_err(|e| format!("Decryption failed: {}", e))?;
.map_err(|e| std::io::Error::other(format!("Decryption failed: {}", e)))?;

serde_json::from_slice(&decrypted)
.map_err(|e| format!("Failed to deserialize ThreatIndicator: {}", e))
serde_json::from_slice(&decrypted).map_err(|e| {
std::io::Error::other(format!("Failed to deserialize ThreatIndicator: {}", e))
})
}

#[allow(unused)] // TODO: implement in watchers
Expand Down
22 changes: 15 additions & 7 deletions src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,26 +81,34 @@ impl Node {
&mut self,
indicator: ThreatIndicator,
) -> Result<EncryptedIndicator, Box<dyn std::error::Error + Send + Sync>> {
use self::db::schema::encrypted_indicators;
use self::db::schema::encrypted_indicators::dsl::*;

let encrypted_indicator = indicator.encrypt(&mut self.key_mgr)?;

let mut conn = self.db_pool.get()?;
let res = diesel::insert_into(encrypted_indicators::table)
let res = diesel::insert_into(encrypted_indicators)
.values(&encrypted_indicator)
.on_conflict(id)
.do_nothing()
.returning(EncryptedIndicator::as_returning())
.get_result(&mut conn)?;

let _ = self
.logger
.write_log(log::Level::Info, &format!("Added indicator: {:?}", res));
let _ = self.logger.write_log(
log::Level::Info,
&format!("Added indicator with id: {}", res.id),
);
Ok(res)
}

pub fn add_or_increment_indicator(
&mut self,
new_indicator: ThreatIndicator,
) -> Result<EncryptedIndicator, Box<dyn std::error::Error + Send + Sync>> {
/// FIXME: `add_indicator` inserts unconditionally – handle primary-key clashes.
/// If an indicator with the same deterministic UUID already exists, this insertion will violate the primary-key constraint and bubble an error.
/// Possible solution:
/// Use ON CONFLICT (id) DO UPDATE (diesel::upsert) to increment sightings. (TODO: possibly make sightings unencrypted metadata, TBD)
/// Failing to do so exposes the API to 500s on legitimate duplicate submissions.
Comment on lines +107 to +111

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Documented known issue with primary key conflicts

The detailed FIXME comment clearly explains the current limitation with indicator insertion and proposes a solution using Diesel's upsert functionality. This is good documentation for future improvements.

Consider implementing the suggested fix using Diesel's upsert to increment sightings on conflict, as this would further improve robustness:

- let res = diesel::insert_into(encrypted_indicators)
-     .values(&encrypted)
-     .returning(EncryptedIndicator::as_returning())
-     .get_result(&mut conn)?;
+ let res = diesel::insert_into(encrypted_indicators)
+     .values(&encrypted)
+     .on_conflict(id)
+     .do_update()
+     .set(ciphertext.eq(excluded(ciphertext))) // Update with the new values
+     .returning(EncryptedIndicator::as_returning())
+     .get_result(&mut conn)?;

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In src/node.rs around lines 107 to 111, the current add_indicator function
inserts indicators unconditionally, causing primary-key conflicts and errors on
duplicates. To fix this, refactor the insertion logic to use Diesel's upsert
feature with ON CONFLICT (id) DO UPDATE, incrementing the sightings count
instead of failing. This will prevent 500 errors on duplicate submissions and
improve API robustness.

use self::db::schema::encrypted_indicators::dsl::*;

let indicator_id = new_indicator.get_id();
Expand Down Expand Up @@ -128,7 +136,7 @@ impl Node {

let _ = self.logger.write_log(
log::Level::Info,
&format!("Incrementing indicator: {:?}", res),
&format!("Incrementing indicator with id: {}", res.id),
);
Ok(res)
} else {
Expand Down Expand Up @@ -178,7 +186,7 @@ impl Node {
let indicators = indicators
.iter()
.map(|i| ThreatIndicator::decrypt(i, &self.key_mgr))
.collect::<Result<Vec<ThreatIndicator>, std::string::String>>()?;
.collect::<Result<Vec<ThreatIndicator>, std::io::Error>>()?;

Ok(indicators)
}
Expand Down
Loading