diff --git a/rust_snuba/src/accepted_outcomes_consumer.rs b/rust_snuba/src/accepted_outcomes_consumer.rs index 756835e0f83..5d914f18752 100644 --- a/rust_snuba/src/accepted_outcomes_consumer.rs +++ b/rust_snuba/src/accepted_outcomes_consumer.rs @@ -125,6 +125,12 @@ pub fn accepted_outcomes_consumer_impl( let consumer_config = config::ConsumerConfig::load_from_str(consumer_config_raw).unwrap(); + tracing::info!( + consumer_group, + "Consumer config: {}", + consumer_config.redacted_for_logging() + ); + assert_eq!(consumer_config.storages.len(), 1); let mut _sentry_guard = None; diff --git a/rust_snuba/src/config.rs b/rust_snuba/src/config.rs index 8754d412723..8f0570b545b 100644 --- a/rust_snuba/src/config.rs +++ b/rust_snuba/src/config.rs @@ -80,6 +80,88 @@ impl ConsumerConfig { let d: Self = serde_json::from_str(payload)?; Ok(d) } + + /// Returns a JSON representation of the consumer config suitable for + /// logging, with secrets (ClickHouse password, Sentry DSN, Kafka + /// credentials in broker configs) redacted. + pub fn redacted_for_logging(&self) -> Value { + let storages: Vec = self + .storages + .iter() + .map(|storage| { + serde_json::json!({ + "name": storage.name, + "clickhouse_table_name": storage.clickhouse_table_name, + "clickhouse_cluster": { + "host": storage.clickhouse_cluster.host, + "port": storage.clickhouse_cluster.port, + "http_port": storage.clickhouse_cluster.http_port, + "user": storage.clickhouse_cluster.user, + "password": REDACTED, + "database": storage.clickhouse_cluster.database, + "secure": storage.clickhouse_cluster.secure, + }, + "message_processor": { + "python_class_name": storage.message_processor.python_class_name, + "python_module": storage.message_processor.python_module, + }, + }) + }) + .collect(); + + serde_json::json!({ + "storages": storages, + "raw_topic": redacted_topic_config(&self.raw_topic), + "commit_log_topic": self.commit_log_topic.as_ref().map(redacted_topic_config), + "replacements_topic": self.replacements_topic.as_ref().map(redacted_topic_config), + "accepted_outcomes_topic": self.accepted_outcomes_topic.as_ref().map(redacted_topic_config), + "dlq_topic": self.dlq_topic.as_ref().map(redacted_topic_config), + "accountant_topic": redacted_topic_config(&self.accountant_topic), + "max_batch_size": self.max_batch_size, + "max_batch_time_ms": self.max_batch_time_ms, + "max_batch_size_calculation": format!("{:?}", self.max_batch_size_calculation), + "env": { + "sentry_dsn": self.env.sentry_dsn.as_ref().map(|_| REDACTED), + "dogstatsd_host": self.env.dogstatsd_host, + "dogstatsd_port": self.env.dogstatsd_port, + "default_retention_days": self.env.default_retention_days, + "lower_retention_days": self.env.lower_retention_days, + "valid_retention_days": self.env.valid_retention_days, + "record_cogs": self.env.record_cogs, + "project_stacktrace_blacklist": self.env.project_stacktrace_blacklist, + }, + }) + } +} + +const REDACTED: &str = "[REDACTED]"; + +/// Redacts values of broker config keys that may contain credentials +/// (e.g. `sasl.password`). +fn redacted_broker_config(broker_config: &BrokerConfig) -> Value { + let redacted = broker_config + .iter() + .map(|(key, value)| { + let lower = key.to_lowercase(); + let value = if lower.contains("password") || lower.contains("secret") { + Value::String(REDACTED.to_string()) + } else { + Value::String(value.clone()) + }; + (key.clone(), value) + }) + .collect(); + + Value::Object(redacted) +} + +fn redacted_topic_config(topic_config: &TopicConfig) -> Value { + serde_json::json!({ + "physical_topic_name": topic_config.physical_topic_name, + "logical_topic_name": topic_config.logical_topic_name, + "broker_config": redacted_broker_config(&topic_config.broker_config), + "quantized_rebalance_consumer_group_delay_secs": topic_config.quantized_rebalance_consumer_group_delay_secs, + }) } #[derive(Deserialize, Clone, Debug)] @@ -154,4 +236,91 @@ mod tests { "10000" ); } + + #[test] + fn test_redacted_for_logging() { + let raw = r#"{ + "storages": [{ + "name": "errors", + "clickhouse_table_name": "errors_local", + "clickhouse_cluster": { + "host": "localhost", + "port": 9000, + "secure": false, + "http_port": 8123, + "user": "default", + "password": "super-secret", + "database": "default" + }, + "message_processor": { + "python_class_name": "ErrorsProcessor", + "python_module": "snuba.datasets.processors.errors_processor" + } + }], + "raw_topic": { + "physical_topic_name": "events", + "logical_topic_name": "events", + "broker_config": { + "bootstrap.servers": "127.0.0.1:9092", + "sasl.username": "user", + "sasl.password": "kafka-secret" + }, + "quantized_rebalance_consumer_group_delay_secs": null + }, + "commit_log_topic": null, + "replacements_topic": null, + "accepted_outcomes_topic": null, + "dlq_topic": null, + "accountant_topic": { + "physical_topic_name": "snuba-shared-resources-usage", + "logical_topic_name": "snuba-shared-resources-usage", + "broker_config": {}, + "quantized_rebalance_consumer_group_delay_secs": null + }, + "max_batch_size": 1, + "max_batch_time_ms": 1000, + "env": { + "sentry_dsn": "https://secret@sentry.io/1", + "dogstatsd_host": null, + "dogstatsd_port": null, + "default_retention_days": 90, + "lower_retention_days": 30, + "valid_retention_days": [30, 60, 90], + "record_cogs": false, + "project_stacktrace_blacklist": [] + } + }"#; + + let consumer_config = ConsumerConfig::load_from_str(raw).unwrap(); + let redacted = consumer_config.redacted_for_logging(); + let serialized = serde_json::to_string(&redacted).unwrap(); + + // Secrets must not leak into logs. + assert!(!serialized.contains("super-secret")); + assert!(!serialized.contains("kafka-secret")); + assert!(!serialized.contains("https://secret@sentry.io/1")); + + // Non-sensitive values are preserved. + assert_eq!( + redacted["storages"][0]["clickhouse_cluster"]["host"], + "localhost" + ); + assert_eq!( + redacted["storages"][0]["clickhouse_cluster"]["password"], + REDACTED + ); + assert_eq!( + redacted["raw_topic"]["broker_config"]["bootstrap.servers"], + "127.0.0.1:9092" + ); + assert_eq!( + redacted["raw_topic"]["broker_config"]["sasl.username"], + "user" + ); + assert_eq!( + redacted["raw_topic"]["broker_config"]["sasl.password"], + REDACTED + ); + assert_eq!(redacted["env"]["sentry_dsn"], REDACTED); + } } diff --git a/rust_snuba/src/consumer.rs b/rust_snuba/src/consumer.rs index ff3e33b52d0..5c632e0df40 100644 --- a/rust_snuba/src/consumer.rs +++ b/rust_snuba/src/consumer.rs @@ -100,6 +100,13 @@ pub fn consumer_impl( .expect("failed to initialize sentry-options"); let consumer_config = config::ConsumerConfig::load_from_str(consumer_config_raw).unwrap(); + + tracing::info!( + consumer_group, + "Consumer config: {}", + consumer_config.redacted_for_logging() + ); + let max_batch_size = consumer_config.max_batch_size; let max_batch_time = Duration::from_millis(consumer_config.max_batch_time_ms); let max_batch_size_calculation = consumer_config.max_batch_size_calculation;