Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions rust_snuba/src/accepted_outcomes_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ pub fn accepted_outcomes_consumer_impl(

let consumer_config = config::ConsumerConfig::load_from_str(consumer_config_raw).unwrap();

tracing::info!(
consumer_group,
"Consumer config: {}",
consumer_config.redacted_for_logging()
);

assert_eq!(consumer_config.storages.len(), 1);

let mut _sentry_guard = None;
Expand Down
169 changes: 169 additions & 0 deletions rust_snuba/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,88 @@ impl ConsumerConfig {
let d: Self = serde_json::from_str(payload)?;
Ok(d)
}

/// Returns a JSON representation of the consumer config suitable for
/// logging, with secrets (ClickHouse password, Sentry DSN, Kafka
/// credentials in broker configs) redacted.
pub fn redacted_for_logging(&self) -> Value {
let storages: Vec<Value> = self
.storages
.iter()
.map(|storage| {
serde_json::json!({
"name": storage.name,
"clickhouse_table_name": storage.clickhouse_table_name,
"clickhouse_cluster": {
"host": storage.clickhouse_cluster.host,
"port": storage.clickhouse_cluster.port,
"http_port": storage.clickhouse_cluster.http_port,
"user": storage.clickhouse_cluster.user,
"password": REDACTED,
"database": storage.clickhouse_cluster.database,
"secure": storage.clickhouse_cluster.secure,
},
"message_processor": {
"python_class_name": storage.message_processor.python_class_name,
"python_module": storage.message_processor.python_module,
},
})
})
.collect();

serde_json::json!({
"storages": storages,
"raw_topic": redacted_topic_config(&self.raw_topic),
"commit_log_topic": self.commit_log_topic.as_ref().map(redacted_topic_config),
"replacements_topic": self.replacements_topic.as_ref().map(redacted_topic_config),
"accepted_outcomes_topic": self.accepted_outcomes_topic.as_ref().map(redacted_topic_config),
"dlq_topic": self.dlq_topic.as_ref().map(redacted_topic_config),
"accountant_topic": redacted_topic_config(&self.accountant_topic),
"max_batch_size": self.max_batch_size,
"max_batch_time_ms": self.max_batch_time_ms,
"max_batch_size_calculation": format!("{:?}", self.max_batch_size_calculation),
"env": {
"sentry_dsn": self.env.sentry_dsn.as_ref().map(|_| REDACTED),
"dogstatsd_host": self.env.dogstatsd_host,
"dogstatsd_port": self.env.dogstatsd_port,
"default_retention_days": self.env.default_retention_days,
"lower_retention_days": self.env.lower_retention_days,
"valid_retention_days": self.env.valid_retention_days,
"record_cogs": self.env.record_cogs,
"project_stacktrace_blacklist": self.env.project_stacktrace_blacklist,
},
})
}
}

const REDACTED: &str = "[REDACTED]";

/// Redacts values of broker config keys that may contain credentials
/// (e.g. `sasl.password`).
fn redacted_broker_config(broker_config: &BrokerConfig) -> Value {
let redacted = broker_config
.iter()
.map(|(key, value)| {
let lower = key.to_lowercase();
let value = if lower.contains("password") || lower.contains("secret") {
Value::String(REDACTED.to_string())
} else {
Value::String(value.clone())
};
(key.clone(), value)
})
.collect();

Value::Object(redacted)
}

fn redacted_topic_config(topic_config: &TopicConfig) -> Value {
serde_json::json!({
"physical_topic_name": topic_config.physical_topic_name,
"logical_topic_name": topic_config.logical_topic_name,
"broker_config": redacted_broker_config(&topic_config.broker_config),
"quantized_rebalance_consumer_group_delay_secs": topic_config.quantized_rebalance_consumer_group_delay_secs,
})
}

#[derive(Deserialize, Clone, Debug)]
Expand Down Expand Up @@ -154,4 +236,91 @@ mod tests {
"10000"
);
}

#[test]
fn test_redacted_for_logging() {
let raw = r#"{
"storages": [{
"name": "errors",
"clickhouse_table_name": "errors_local",
"clickhouse_cluster": {
"host": "localhost",
"port": 9000,
"secure": false,
"http_port": 8123,
"user": "default",
"password": "super-secret",
"database": "default"
},
"message_processor": {
"python_class_name": "ErrorsProcessor",
"python_module": "snuba.datasets.processors.errors_processor"
}
}],
"raw_topic": {
"physical_topic_name": "events",
"logical_topic_name": "events",
"broker_config": {
"bootstrap.servers": "127.0.0.1:9092",
"sasl.username": "user",
"sasl.password": "kafka-secret"
},
"quantized_rebalance_consumer_group_delay_secs": null
},
"commit_log_topic": null,
"replacements_topic": null,
"accepted_outcomes_topic": null,
"dlq_topic": null,
"accountant_topic": {
"physical_topic_name": "snuba-shared-resources-usage",
"logical_topic_name": "snuba-shared-resources-usage",
"broker_config": {},
"quantized_rebalance_consumer_group_delay_secs": null
},
"max_batch_size": 1,
"max_batch_time_ms": 1000,
"env": {
"sentry_dsn": "https://secret@sentry.io/1",
"dogstatsd_host": null,
"dogstatsd_port": null,
"default_retention_days": 90,
"lower_retention_days": 30,
"valid_retention_days": [30, 60, 90],
"record_cogs": false,
"project_stacktrace_blacklist": []
}
}"#;

let consumer_config = ConsumerConfig::load_from_str(raw).unwrap();
let redacted = consumer_config.redacted_for_logging();
let serialized = serde_json::to_string(&redacted).unwrap();

// Secrets must not leak into logs.
assert!(!serialized.contains("super-secret"));
assert!(!serialized.contains("kafka-secret"));
assert!(!serialized.contains("https://secret@sentry.io/1"));

// Non-sensitive values are preserved.
assert_eq!(
redacted["storages"][0]["clickhouse_cluster"]["host"],
"localhost"
);
assert_eq!(
redacted["storages"][0]["clickhouse_cluster"]["password"],
REDACTED
);
assert_eq!(
redacted["raw_topic"]["broker_config"]["bootstrap.servers"],
"127.0.0.1:9092"
);
assert_eq!(
redacted["raw_topic"]["broker_config"]["sasl.username"],
"user"
);
assert_eq!(
redacted["raw_topic"]["broker_config"]["sasl.password"],
REDACTED
);
assert_eq!(redacted["env"]["sentry_dsn"], REDACTED);
}
}
7 changes: 7 additions & 0 deletions rust_snuba/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ pub fn consumer_impl(
.expect("failed to initialize sentry-options");

let consumer_config = config::ConsumerConfig::load_from_str(consumer_config_raw).unwrap();

tracing::info!(
consumer_group,
"Consumer config: {}",
consumer_config.redacted_for_logging()
);

let max_batch_size = consumer_config.max_batch_size;
let max_batch_time = Duration::from_millis(consumer_config.max_batch_time_ms);
let max_batch_size_calculation = consumer_config.max_batch_size_calculation;
Expand Down
Loading