Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/sinks/aws_cloudwatch_logs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,17 @@ pub struct CloudwatchLogsSinkConfig {
docs::additional_props_description = "A tag represented as a key-value pair"
))]
pub tags: Option<HashMap<String, String>>,

/// The [log group class][log_group_class] of the target log group.
///
/// Defaults to `standard` when creating the group without this optional configuration.
///
/// Allowed values are `standard` and `infrequent_access`.
///
/// [log_group_class]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/working-with-log-groups-and-streams.html
#[configurable(derived)]
#[serde(default)]
pub log_group_class: Option<String>,
}

impl CloudwatchLogsSinkConfig {
Expand Down Expand Up @@ -269,6 +280,7 @@ fn default_config(encoding: EncodingConfig) -> CloudwatchLogsSinkConfig {
acknowledgements: Default::default(),
kms_key: Default::default(),
tags: Default::default(),
log_group_class: Default::default(),
}
}

Expand Down
27 changes: 26 additions & 1 deletion src/sinks/aws_cloudwatch_logs/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use aws_sdk_cloudwatchlogs::{
put_log_events::{PutLogEventsError, PutLogEventsOutput},
put_retention_policy::PutRetentionPolicyError,
},
types::InputLogEvent,
types::{
InputLogEvent,
LogGroupClass,
},
Client as CloudwatchLogsClient,
};
use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError};
Expand Down Expand Up @@ -42,6 +45,7 @@ struct Client {
retention_days: u32,
kms_key: Option<String>,
tags: Option<HashMap<String, String>>,
log_group_class: Option<String>,
}

type ClientResult<T, E> = BoxFuture<'static, Result<T, SdkError<E, HttpResponse>>>;
Expand All @@ -67,6 +71,7 @@ impl CloudwatchFuture {
retention: Retention,
kms_key: Option<String>,
tags: Option<HashMap<String, String>>,
log_group_class: Option<String>,
mut events: Vec<Vec<InputLogEvent>>,
token: Option<String>,
token_tx: oneshot::Sender<Option<String>>,
Expand All @@ -80,6 +85,7 @@ impl CloudwatchFuture {
retention_days,
kms_key,
tags,
log_group_class,
};

let state = if let Some(token) = token {
Expand Down Expand Up @@ -174,6 +180,20 @@ impl Future for CloudwatchFuture {

info!(message = "Group created.", name = %self.client.group_name);

// MIGHT NOT BE NECESSARY???? just let the putRetention error be returned?
// Local validation for the DELIVERY class constraint
/*
if let Some(class) = &self.client.log_group_class {
if class.to_uppercase() == "DELIVERY" && self.retention_enabled && self.client.retention_days != 1 {
let msg = format!(
"The log_group_class 'DELIVERY' requires retention to be exactly 1 day. Configured retention: {} days.",
self.client.retention_days
);
return Poll::Ready(Err(CloudwatchError::Config(msg)));
}
}
*/

if self.retention_enabled {
self.state = State::PutRetentionPolicy(self.client.put_retention_policy());
continue;
Expand Down Expand Up @@ -296,12 +316,17 @@ impl Client {
let group_name = self.group_name.clone();
let kms_key = self.kms_key.clone();
let tags = self.tags.clone();
let log_group_class = self.log_group_class.clone();
Box::pin(async move {
let log_group_class_enum = log_group_class.and_then(|s| {
s.to_uppercase().parse::<LogGroupClass>().ok()
});
client
.create_log_group()
.log_group_name(group_name)
.set_kms_key_id(kms_key)
.set_tags(tags)
.set_log_group_class(log_group_class_enum)
.send()
.await?;
Ok(())
Expand Down
8 changes: 8 additions & 0 deletions src/sinks/aws_cloudwatch_logs/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub enum CloudwatchError {
CreateGroup(SdkError<CreateLogGroupError, HttpResponse>),
PutRetentionPolicy(SdkError<PutRetentionPolicyError, HttpResponse>),
NoStreamsFound,
Config(String),
}

impl fmt::Display for CloudwatchError {
Expand All @@ -84,6 +85,9 @@ impl fmt::Display for CloudwatchError {
CloudwatchError::PutRetentionPolicy(error) => {
write!(f, "CloudwatchError::PutRetentionPolicy: {}", error)
}
CloudwatchError::Config(msg) => {
write!(f, "CloudwatchError: Configuration error: {}", msg)
}
}
}
}
Expand Down Expand Up @@ -242,6 +246,7 @@ impl CloudwatchLogsSvc {

let kms_key = config.kms_key.clone();
let tags = config.tags.clone();
let log_group_class = config.log_group_class.clone();

CloudwatchLogsSvc {
headers,
Expand All @@ -253,6 +258,7 @@ impl CloudwatchLogsSvc {
retention,
kms_key,
tags,
log_group_class,
token: None,
token_rx: None,
}
Expand Down Expand Up @@ -329,6 +335,7 @@ impl Service<Vec<InputLogEvent>> for CloudwatchLogsSvc {
self.retention.clone(),
self.kms_key.clone(),
self.tags.clone(),
self.log_group_class.clone(),
event_batches,
self.token.take(),
tx,
Expand All @@ -349,6 +356,7 @@ pub struct CloudwatchLogsSvc {
retention: Retention,
kms_key: Option<String>,
tags: Option<HashMap<String, String>>,
log_group_class: Option<String>,
token: Option<String>,
token_rx: Option<oneshot::Receiver<Option<String>>>,
}
Expand Down