From 854777ab71ebe6a69a9db9d1b0fc0b2302c2cbe5 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 19 Jun 2026 18:32:43 +0000 Subject: [PATCH 1/2] feat(consumer): log resolved consumer config on every start The Rust consumers (rust_consumer and accepted_outcomes_consumer) resolve their full config from CLI args + storage definitions but never logged it, making it hard to confirm what a running consumer was actually configured with (topics, brokers, batch settings, retention, etc). Log the resolved ConsumerConfig as JSON at startup, right after it's parsed. Secrets (ClickHouse password, Sentry DSN, and Kafka credentials in broker configs such as sasl.password) are redacted so they never reach the logs. Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_01Cfqpz6SMijmjzTCbLosEvG --- rust_snuba/src/accepted_outcomes_consumer.rs | 6 + rust_snuba/src/config.rs | 166 +++++++++++++++++++ rust_snuba/src/consumer.rs | 7 + 3 files changed, 179 insertions(+) diff --git a/rust_snuba/src/accepted_outcomes_consumer.rs b/rust_snuba/src/accepted_outcomes_consumer.rs index 756835e0f83..5d914f18752 100644 --- a/rust_snuba/src/accepted_outcomes_consumer.rs +++ b/rust_snuba/src/accepted_outcomes_consumer.rs @@ -125,6 +125,12 @@ pub fn accepted_outcomes_consumer_impl( let consumer_config = config::ConsumerConfig::load_from_str(consumer_config_raw).unwrap(); + tracing::info!( + consumer_group, + "Consumer config: {}", + consumer_config.redacted_for_logging() + ); + assert_eq!(consumer_config.storages.len(), 1); let mut _sentry_guard = None; diff --git a/rust_snuba/src/config.rs b/rust_snuba/src/config.rs index 8754d412723..c41cff504eb 100644 --- a/rust_snuba/src/config.rs +++ b/rust_snuba/src/config.rs @@ -80,6 +80,88 @@ impl ConsumerConfig { let d: Self = serde_json::from_str(payload)?; Ok(d) } + + /// Returns a JSON representation of the consumer config suitable for + /// logging, with secrets (ClickHouse password, Sentry DSN, Kafka + /// credentials in broker configs) redacted. + pub fn redacted_for_logging(&self) -> Value { + let storages: Vec = self + .storages + .iter() + .map(|storage| { + serde_json::json!({ + "name": storage.name, + "clickhouse_table_name": storage.clickhouse_table_name, + "clickhouse_cluster": { + "host": storage.clickhouse_cluster.host, + "port": storage.clickhouse_cluster.port, + "http_port": storage.clickhouse_cluster.http_port, + "user": storage.clickhouse_cluster.user, + "password": REDACTED, + "database": storage.clickhouse_cluster.database, + "secure": storage.clickhouse_cluster.secure, + }, + "message_processor": { + "python_class_name": storage.message_processor.python_class_name, + "python_module": storage.message_processor.python_module, + }, + }) + }) + .collect(); + + serde_json::json!({ + "storages": storages, + "raw_topic": redacted_topic_config(&self.raw_topic), + "commit_log_topic": self.commit_log_topic.as_ref().map(redacted_topic_config), + "replacements_topic": self.replacements_topic.as_ref().map(redacted_topic_config), + "accepted_outcomes_topic": self.accepted_outcomes_topic.as_ref().map(redacted_topic_config), + "dlq_topic": self.dlq_topic.as_ref().map(redacted_topic_config), + "accountant_topic": redacted_topic_config(&self.accountant_topic), + "max_batch_size": self.max_batch_size, + "max_batch_time_ms": self.max_batch_time_ms, + "max_batch_size_calculation": format!("{:?}", self.max_batch_size_calculation), + "env": { + "sentry_dsn": self.env.sentry_dsn.as_ref().map(|_| REDACTED), + "dogstatsd_host": self.env.dogstatsd_host, + "dogstatsd_port": self.env.dogstatsd_port, + "default_retention_days": self.env.default_retention_days, + "lower_retention_days": self.env.lower_retention_days, + "valid_retention_days": self.env.valid_retention_days, + "record_cogs": self.env.record_cogs, + "project_stacktrace_blacklist": self.env.project_stacktrace_blacklist, + }, + }) + } +} + +const REDACTED: &str = "[REDACTED]"; + +/// Redacts values of broker config keys that may contain credentials +/// (e.g. `sasl.password`). +fn redacted_broker_config(broker_config: &BrokerConfig) -> Value { + let redacted = broker_config + .iter() + .map(|(key, value)| { + let lower = key.to_lowercase(); + let value = if lower.contains("password") || lower.contains("secret") { + Value::String(REDACTED.to_string()) + } else { + Value::String(value.clone()) + }; + (key.clone(), value) + }) + .collect(); + + Value::Object(redacted) +} + +fn redacted_topic_config(topic_config: &TopicConfig) -> Value { + serde_json::json!({ + "physical_topic_name": topic_config.physical_topic_name, + "logical_topic_name": topic_config.logical_topic_name, + "broker_config": redacted_broker_config(&topic_config.broker_config), + "quantized_rebalance_consumer_group_delay_secs": topic_config.quantized_rebalance_consumer_group_delay_secs, + }) } #[derive(Deserialize, Clone, Debug)] @@ -154,4 +236,88 @@ mod tests { "10000" ); } + + #[test] + fn test_redacted_for_logging() { + let raw = r#"{ + "storages": [{ + "name": "errors", + "clickhouse_table_name": "errors_local", + "clickhouse_cluster": { + "host": "localhost", + "port": 9000, + "secure": false, + "http_port": 8123, + "user": "default", + "password": "super-secret", + "database": "default" + }, + "message_processor": { + "python_class_name": "ErrorsProcessor", + "python_module": "snuba.datasets.processors.errors_processor" + } + }], + "raw_topic": { + "physical_topic_name": "events", + "logical_topic_name": "events", + "broker_config": { + "bootstrap.servers": "127.0.0.1:9092", + "sasl.username": "user", + "sasl.password": "kafka-secret" + }, + "quantized_rebalance_consumer_group_delay_secs": null + }, + "commit_log_topic": null, + "replacements_topic": null, + "accepted_outcomes_topic": null, + "dlq_topic": null, + "accountant_topic": { + "physical_topic_name": "snuba-shared-resources-usage", + "logical_topic_name": "snuba-shared-resources-usage", + "broker_config": {}, + "quantized_rebalance_consumer_group_delay_secs": null + }, + "max_batch_size": 1, + "max_batch_time_ms": 1000, + "env": { + "sentry_dsn": "https://secret@sentry.io/1", + "dogstatsd_host": null, + "dogstatsd_port": null, + "default_retention_days": 90, + "lower_retention_days": 30, + "valid_retention_days": [30, 60, 90], + "record_cogs": false, + "project_stacktrace_blacklist": [] + } + }"#; + + let consumer_config = ConsumerConfig::load_from_str(raw).unwrap(); + let redacted = consumer_config.redacted_for_logging(); + let serialized = serde_json::to_string(&redacted).unwrap(); + + // Secrets must not leak into logs. + assert!(!serialized.contains("super-secret")); + assert!(!serialized.contains("kafka-secret")); + assert!(!serialized.contains("https://secret@sentry.io/1")); + + // Non-sensitive values are preserved. + assert_eq!(redacted["storages"][0]["clickhouse_cluster"]["host"], "localhost"); + assert_eq!( + redacted["storages"][0]["clickhouse_cluster"]["password"], + REDACTED + ); + assert_eq!( + redacted["raw_topic"]["broker_config"]["bootstrap.servers"], + "127.0.0.1:9092" + ); + assert_eq!( + redacted["raw_topic"]["broker_config"]["sasl.username"], + "user" + ); + assert_eq!( + redacted["raw_topic"]["broker_config"]["sasl.password"], + REDACTED + ); + assert_eq!(redacted["env"]["sentry_dsn"], REDACTED); + } } diff --git a/rust_snuba/src/consumer.rs b/rust_snuba/src/consumer.rs index ff3e33b52d0..5c632e0df40 100644 --- a/rust_snuba/src/consumer.rs +++ b/rust_snuba/src/consumer.rs @@ -100,6 +100,13 @@ pub fn consumer_impl( .expect("failed to initialize sentry-options"); let consumer_config = config::ConsumerConfig::load_from_str(consumer_config_raw).unwrap(); + + tracing::info!( + consumer_group, + "Consumer config: {}", + consumer_config.redacted_for_logging() + ); + let max_batch_size = consumer_config.max_batch_size; let max_batch_time = Duration::from_millis(consumer_config.max_batch_time_ms); let max_batch_size_calculation = consumer_config.max_batch_size_calculation; From 4ecf5d18720bddd9c49106beafe9ba2ad796d692 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 19 Jun 2026 18:34:57 +0000 Subject: [PATCH 2/2] style: rustfmt formatting Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_01Cfqpz6SMijmjzTCbLosEvG --- rust_snuba/src/config.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/rust_snuba/src/config.rs b/rust_snuba/src/config.rs index c41cff504eb..8f0570b545b 100644 --- a/rust_snuba/src/config.rs +++ b/rust_snuba/src/config.rs @@ -301,7 +301,10 @@ mod tests { assert!(!serialized.contains("https://secret@sentry.io/1")); // Non-sensitive values are preserved. - assert_eq!(redacted["storages"][0]["clickhouse_cluster"]["host"], "localhost"); + assert_eq!( + redacted["storages"][0]["clickhouse_cluster"]["host"], + "localhost" + ); assert_eq!( redacted["storages"][0]["clickhouse_cluster"]["password"], REDACTED