From ba9f9710774c83348b499f78045115736c3757da Mon Sep 17 00:00:00 2001 From: Casey Hartman Date: Fri, 21 Nov 2025 17:25:56 -0500 Subject: [PATCH] implement log_group_class config for cw sink --- src/sinks/aws_cloudwatch_logs/config.rs | 12 +++++++++++ src/sinks/aws_cloudwatch_logs/request.rs | 27 +++++++++++++++++++++++- src/sinks/aws_cloudwatch_logs/service.rs | 8 +++++++ 3 files changed, 46 insertions(+), 1 deletion(-) diff --git a/src/sinks/aws_cloudwatch_logs/config.rs b/src/sinks/aws_cloudwatch_logs/config.rs index db7e1d5d4da7a..72d7c119d817f 100644 --- a/src/sinks/aws_cloudwatch_logs/config.rs +++ b/src/sinks/aws_cloudwatch_logs/config.rs @@ -183,6 +183,17 @@ pub struct CloudwatchLogsSinkConfig { docs::additional_props_description = "A tag represented as a key-value pair" ))] pub tags: Option>, + + /// The [log group class][log_group_class] of the target log group. + /// + /// Defaults to `standard` when creating the group without this optional configuration. + /// + /// Allowed values are `standard` and `infrequent_access`. + /// + /// [log_group_class]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/working-with-log-groups-and-streams.html + #[configurable(derived)] + #[serde(default)] + pub log_group_class: Option, } impl CloudwatchLogsSinkConfig { @@ -269,6 +280,7 @@ fn default_config(encoding: EncodingConfig) -> CloudwatchLogsSinkConfig { acknowledgements: Default::default(), kms_key: Default::default(), tags: Default::default(), + log_group_class: Default::default(), } } diff --git a/src/sinks/aws_cloudwatch_logs/request.rs b/src/sinks/aws_cloudwatch_logs/request.rs index e052c05d38578..26ca3408c2a5a 100644 --- a/src/sinks/aws_cloudwatch_logs/request.rs +++ b/src/sinks/aws_cloudwatch_logs/request.rs @@ -6,7 +6,10 @@ use aws_sdk_cloudwatchlogs::{ put_log_events::{PutLogEventsError, PutLogEventsOutput}, put_retention_policy::PutRetentionPolicyError, }, - types::InputLogEvent, + types::{ + InputLogEvent, + LogGroupClass, + }, Client as CloudwatchLogsClient, }; use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError}; @@ -42,6 +45,7 @@ struct Client { retention_days: u32, kms_key: Option, tags: Option>, + log_group_class: Option, } type ClientResult = BoxFuture<'static, Result>>; @@ -67,6 +71,7 @@ impl CloudwatchFuture { retention: Retention, kms_key: Option, tags: Option>, + log_group_class: Option, mut events: Vec>, token: Option, token_tx: oneshot::Sender>, @@ -80,6 +85,7 @@ impl CloudwatchFuture { retention_days, kms_key, tags, + log_group_class, }; let state = if let Some(token) = token { @@ -174,6 +180,20 @@ impl Future for CloudwatchFuture { info!(message = "Group created.", name = %self.client.group_name); + // MIGHT NOT BE NECESSARY???? just let the putRetention error be returned? + // Local validation for the DELIVERY class constraint + /* + if let Some(class) = &self.client.log_group_class { + if class.to_uppercase() == "DELIVERY" && self.retention_enabled && self.client.retention_days != 1 { + let msg = format!( + "The log_group_class 'DELIVERY' requires retention to be exactly 1 day. Configured retention: {} days.", + self.client.retention_days + ); + return Poll::Ready(Err(CloudwatchError::Config(msg))); + } + } + */ + if self.retention_enabled { self.state = State::PutRetentionPolicy(self.client.put_retention_policy()); continue; @@ -296,12 +316,17 @@ impl Client { let group_name = self.group_name.clone(); let kms_key = self.kms_key.clone(); let tags = self.tags.clone(); + let log_group_class = self.log_group_class.clone(); Box::pin(async move { + let log_group_class_enum = log_group_class.and_then(|s| { + s.to_uppercase().parse::().ok() + }); client .create_log_group() .log_group_name(group_name) .set_kms_key_id(kms_key) .set_tags(tags) + .set_log_group_class(log_group_class_enum) .send() .await?; Ok(()) diff --git a/src/sinks/aws_cloudwatch_logs/service.rs b/src/sinks/aws_cloudwatch_logs/service.rs index f914029dbc35f..5baa664dbaaad 100644 --- a/src/sinks/aws_cloudwatch_logs/service.rs +++ b/src/sinks/aws_cloudwatch_logs/service.rs @@ -65,6 +65,7 @@ pub enum CloudwatchError { CreateGroup(SdkError), PutRetentionPolicy(SdkError), NoStreamsFound, + Config(String), } impl fmt::Display for CloudwatchError { @@ -84,6 +85,9 @@ impl fmt::Display for CloudwatchError { CloudwatchError::PutRetentionPolicy(error) => { write!(f, "CloudwatchError::PutRetentionPolicy: {}", error) } + CloudwatchError::Config(msg) => { + write!(f, "CloudwatchError: Configuration error: {}", msg) + } } } } @@ -242,6 +246,7 @@ impl CloudwatchLogsSvc { let kms_key = config.kms_key.clone(); let tags = config.tags.clone(); + let log_group_class = config.log_group_class.clone(); CloudwatchLogsSvc { headers, @@ -253,6 +258,7 @@ impl CloudwatchLogsSvc { retention, kms_key, tags, + log_group_class, token: None, token_rx: None, } @@ -329,6 +335,7 @@ impl Service> for CloudwatchLogsSvc { self.retention.clone(), self.kms_key.clone(), self.tags.clone(), + self.log_group_class.clone(), event_batches, self.token.take(), tx, @@ -349,6 +356,7 @@ pub struct CloudwatchLogsSvc { retention: Retention, kms_key: Option, tags: Option>, + log_group_class: Option, token: Option, token_rx: Option>>, }