Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions bottlecap/src/config/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,15 @@ pub struct EnvConfig {
/// The Datadog organization UUID. When set, delegated auth is auto-enabled.
#[serde(deserialize_with = "deserialize_string_or_int")]
pub org_uuid: Option<String>,

/// @env `DD_DURABLE_FUNCTION_LOG_BUFFER_SIZE`
///
/// Maximum number of request IDs whose logs are held waiting for durable execution
/// context. Set to 0 to disable log holding; logs will be sent immediately without
/// durable execution context enrichment. Useful when the tracer is not installed.
/// Default is `0`.
#[serde(deserialize_with = "deserialize_option_lossless")]
pub durable_function_log_buffer_size: Option<usize>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if this should include the LAMBDA prefix, since we're gonna migrate to the config package from serverless-components eventually,

that way it makes it more descriptive that its for the lambda product

}

#[allow(clippy::too_many_lines)]
Expand Down Expand Up @@ -692,6 +701,7 @@ fn merge_config(config: &mut Config, env_config: &EnvConfig) {
merge_option_to_value!(config, env_config, api_security_sample_delay);

merge_string!(config, dd_org_uuid, env_config, org_uuid);
merge_option_to_value!(config, env_config, durable_function_log_buffer_size);
}

#[derive(Debug, PartialEq, Clone, Copy)]
Expand Down Expand Up @@ -1054,6 +1064,7 @@ mod tests {
api_security_sample_delay: Duration::from_secs(60),

dd_org_uuid: String::default(),
durable_function_log_buffer_size: 0,
};

assert_eq!(config, expected_config);
Expand Down
8 changes: 8 additions & 0 deletions bottlecap/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,12 @@ pub struct Config {
pub appsec_waf_timeout: Duration,
pub api_security_enabled: bool,
pub api_security_sample_delay: Duration,

/// Maximum number of request IDs whose logs are held in `held_logs` waiting for durable
/// execution context. Set to 0 to disable log holding; logs will be flushed immediately
/// without durable execution context enrichment. Defaults to 0 until the tracer-side
/// durable execution support is released; set to 50 to re-enable enrichment.
pub durable_function_log_buffer_size: usize,
}

impl Default for Config {
Expand Down Expand Up @@ -488,6 +494,8 @@ impl Default for Config {
appsec_waf_timeout: Duration::from_millis(5),
api_security_enabled: true,
api_security_sample_delay: Duration::from_secs(30),

durable_function_log_buffer_size: 0,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions bottlecap/src/config/yaml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,7 @@ api_security_sample_delay: 60 # Seconds
dogstatsd_queue_size: Some(2048),

dd_org_uuid: String::default(),
durable_function_log_buffer_size: 0,
};

// Assert that
Expand Down
22 changes: 17 additions & 5 deletions bottlecap/src/logs/lambda/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,13 @@ pub struct LambdaProcessor {
durable_context_map: HashMap<String, DurableExecutionContext>,
// Insertion order for FIFO eviction when map reaches capacity
durable_context_order: VecDeque<String>,
// Max number of request ID keys in held_logs. 0 disables holding entirely.
held_logs_max_keys: usize,
}

// Matches `lifecycle::invocation::ContextBuffer` default capacity: sized to absorb async
// event backlog where invocation contexts may arrive out of order.
const DURABLE_CONTEXT_MAP_CAPACITY: usize = 500;
// Kept intentionally small: at shutdown, all held logs are flushed without durable context.
// A large cap would mean a large batch sent in one shot, increasing the risk of the final
// flush timing out when the tracer is not installed.
const HELD_LOGS_MAX_KEYS: usize = 50;

const OOM_ERRORS: [&str; 7] = [
"fatal error: runtime: out of memory", // Go
Expand Down Expand Up @@ -143,6 +141,7 @@ impl LambdaProcessor {

let processing_rules = &datadog_config.logs_config_processing_rules;
let logs_enabled = datadog_config.serverless_logs_enabled;
let held_logs_max_keys = datadog_config.durable_function_log_buffer_size;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename held_logs_max_keys to durable_function_log_buffer_size or sth. similar for better readability.

let rules = LambdaProcessor::compile_rules(processing_rules);
LambdaProcessor {
function_arn,
Expand All @@ -160,6 +159,7 @@ impl LambdaProcessor {
held_logs_order: VecDeque::new(),
durable_context_map: HashMap::with_capacity(DURABLE_CONTEXT_MAP_CAPACITY),
durable_context_order: VecDeque::with_capacity(DURABLE_CONTEXT_MAP_CAPACITY),
held_logs_max_keys,
}
}

Expand Down Expand Up @@ -684,7 +684,7 @@ impl LambdaProcessor {
/// arrives.
fn hold_log(&mut self, request_id: String, log: IntakeLog) {
if !self.held_logs.contains_key(&request_id) {
while self.held_logs.len() >= HELD_LOGS_MAX_KEYS {
while self.held_logs.len() >= self.held_logs_max_keys {
// Evict the oldest key to ready_logs (without durable context tags).
if let Some(oldest) = self.held_logs_order.pop_front()
&& let Some(evicted) = self.held_logs.remove(&oldest)
Expand Down Expand Up @@ -723,6 +723,16 @@ impl LambdaProcessor {
return;
}

// When the buffer is disabled, skip holding and send logs immediately without
// durable execution context enrichment.
if self.held_logs_max_keys == 0 {
if let Ok(serialized_log) = serde_json::to_string(&log) {
drop(log);
self.ready_logs.push(serialized_log);
}
return;
}

match self.is_durable_function {
// We don't yet know if this is a durable function. Hold the log until we know.
None => {
Expand Down Expand Up @@ -2569,6 +2579,7 @@ mod tests {
#[tokio::test]
async fn test_function_log_without_execution_arn_is_held_in_durable_mode() {
let mut processor = make_processor_for_durable_arn_tests();
processor.held_logs_max_keys = 50;
processor.is_durable_function = Some(true);
// Simulate a known request_id with no durable context yet
processor.invocation_context.request_id = "req-123".to_string();
Expand All @@ -2594,6 +2605,7 @@ mod tests {
(None, serde_json::Value::Null),
] {
let mut processor = make_processor_for_durable_arn_tests();
processor.held_logs_max_keys = 50;
processor.is_durable_function = Some(true);
processor.invocation_context.request_id = "req-end".to_string();
processor.insert_to_durable_context_map(
Expand Down
Loading