Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitlab/datasources/test-suites.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ test_suites:
- name: snapstart
- name: lmi
- name: auth
- name: oom
38 changes: 38 additions & 0 deletions .gitlab/templates/pipeline.yaml.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,40 @@ build node lambdas:
- cd integration-tests
- ./scripts/build-node.sh

build ruby lambdas:
stage: integration-tests
image: registry.ddbuild.io/images/docker:27.3.1
tags: ["docker-in-docker:arm64"]
rules:
- when: on_success
needs: []
artifacts:
expire_in: 1 hour
paths:
- integration-tests/lambda/*/*.rb
script:
- cd integration-tests
- ./scripts/build-ruby.sh

build go lambdas:
stage: integration-tests
image: registry.ddbuild.io/images/docker:27.3.1
tags: ["docker-in-docker:arm64"]
rules:
- when: on_success
needs: []
cache:
key: go-mod-cache-${CI_COMMIT_REF_SLUG}
paths:
- integration-tests/.cache/go-mod/
artifacts:
expire_in: 1 hour
paths:
- integration-tests/lambda/*/bin/bootstrap
script:
- cd integration-tests
- ./scripts/build-go.sh

# Integration Tests - Publish arm64 layer with integration test prefix
publish integration layer (arm64):
stage: integration-tests
Expand Down Expand Up @@ -581,12 +615,16 @@ integration-suite:
- build dotnet lambdas
- build python lambdas
- build node lambdas
- build ruby lambdas
- build go lambdas
dependencies:
- publish integration layer (arm64)
- build java lambdas
- build dotnet lambdas
- build python lambdas
- build node lambdas
- build ruby lambdas
- build go lambdas
variables:
IDENTIFIER: ${CI_COMMIT_SHORT_SHA}
AWS_DEFAULT_REGION: us-east-1
Expand Down
7 changes: 5 additions & 2 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -841,9 +841,12 @@ async fn handle_event_bus_event(
stats_concentrator: StatsConcentratorHandle,
) -> Option<TelemetryEvent> {
match event {
Event::OutOfMemory(event_timestamp) => {
Event::OutOfMemory {
request_id,
timestamp,
} => {
if let Err(e) = invocation_processor_handle
.on_out_of_memory_error(event_timestamp)
.on_out_of_memory_error(request_id, timestamp)
.await
{
error!("Failed to send out of memory error to processor: {}", e);
Expand Down
8 changes: 7 additions & 1 deletion bottlecap/src/event_bus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@ mod constants;
#[derive(Debug)]
pub enum Event {
Telemetry(TelemetryEvent),
OutOfMemory(i64),
OutOfMemory {
/// Lambda `request_id` of the invocation the OOM belongs to, when known.
/// Used by the invocation processor to dedupe against other OOM detection
/// paths (`PlatformRuntimeDone` `error_type`, `PlatformReport` memory equality).
request_id: Option<String>,
timestamp: i64,
},
Tombstone,
}

Expand Down
7 changes: 7 additions & 0 deletions bottlecap/src/lifecycle/invocation/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ pub struct Context {
/// tracing.
///
pub extracted_span_context: Option<SpanContext>,
/// Whether the `aws.lambda.enhanced.out_of_memory` metric has already been
/// emitted for this invocation. Multiple detection paths can fire for the
/// same OOM (runtime log, `Runtime.OutOfMemory` `error_type` in
/// `PlatformRuntimeDone`, `max_memory_used == memory_size` in `PlatformReport`);
/// this flag dedupes them.
pub oom_emitted: bool,
}

/// Struct containing the information needed to reparent a span.
Expand Down Expand Up @@ -94,6 +100,7 @@ impl Default for Context {
snapstart_restore_span: None,
tracer_span: None,
extracted_span_context: None,
oom_emitted: false,
}
}
}
Expand Down
185 changes: 173 additions & 12 deletions bottlecap/src/lifecycle/invocation/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ impl Processor {
debug!(
"Invocation Processor | PlatformRuntimeDone | Got Runtime.OutOfMemory. Incrementing OOM metric."
);
self.enhanced_metrics.increment_oom_metric(timestamp);
self.try_increment_oom_metric(Some(request_id), timestamp);
}
}

Expand Down Expand Up @@ -909,25 +909,25 @@ impl Processor {

/// Handles `OnDemand` mode platform report processing.
///
/// Processes OnDemand-specific metrics including OOM detection for provided.al runtimes
/// and post-runtime duration calculation.
/// Processes OnDemand-specific metrics including OOM detection by memory-size
/// equality and post-runtime duration calculation.
fn handle_ondemand_report(
&mut self,
request_id: &String,
metrics: OnDemandReportMetrics,
timestamp: i64,
) {
// For provided.al runtimes, if the last invocation hit the memory limit, increment the OOM metric.
// We do this for provided.al runtimes because we didn't find another way to detect this under provided.al.
// We don't do this for other runtimes to avoid double counting.
if let Some(runtime) = &self.runtime
&& runtime.starts_with("provided.al")
&& metrics.max_memory_used_mb == metrics.memory_size_mb
{
// If the invocation hit the memory limit, increment the OOM metric. This catches
// OOM-induced failures that don't surface through a runtime-specific log line or a
// `Runtime.OutOfMemory` error_type — most notably the suppressed-init / timeout-at-cap
// pattern reported in datadog-lambda-extension#1237 (Node) and the historical
// provided.al case. Dedup against the other two detection paths is handled by
// `Context::oom_emitted`, which `try_increment_oom_metric` checks and sets.
if metrics.max_memory_used_mb == metrics.memory_size_mb {
debug!(
"Invocation Processor | PlatformReport | Last invocation hit memory limit. Incrementing OOM metric."
);
self.enhanced_metrics.increment_oom_metric(timestamp);
self.try_increment_oom_metric(Some(request_id), timestamp);
}

// Calculate and set post-runtime duration if context is available
Expand Down Expand Up @@ -1395,7 +1395,34 @@ impl Processor {
Some(error_tags)
}

pub fn on_out_of_memory_error(&mut self, timestamp: i64) {
pub fn on_out_of_memory_error(&mut self, request_id: Option<&String>, timestamp: i64) {
self.try_increment_oom_metric(request_id, timestamp);
}

/// Increments the OOM enhanced metric exactly once per `request_id`.
///
/// Several detection paths can fire for the same invocation:
/// 1. A runtime-specific OOM log line (logs processor → `Event::OutOfMemory`)
/// 2. `error_type == "Runtime.OutOfMemory"` in `PlatformRuntimeDone`
/// 3. `max_memory_used_mb == memory_size_mb` in `PlatformReport`
///
/// To avoid double-counting, the per-invocation `Context::oom_emitted` flag is
/// set on the first emission. Subsequent emissions for the same `request_id` are
/// skipped. If `request_id` is `None` (log path saw the OOM outside an active
/// invocation window) or no context is found, we emit best-effort without dedup.
fn try_increment_oom_metric(&mut self, request_id: Option<&String>, timestamp: i64) {
if let Some(rid) = request_id
&& let Some(ctx) = self.context_buffer.get_mut(rid)
{
if ctx.oom_emitted {
debug!(
"Invocation Processor | OOM metric already emitted for request_id {}, skipping",
rid
);
return;
}
ctx.oom_emitted = true;
}
self.enhanced_metrics.increment_oom_metric(timestamp);
}

Expand Down Expand Up @@ -2445,4 +2472,138 @@ mod tests {
"pre-existing _dd.appsec.enabled value must not be overwritten"
);
}

/// Two OOM signals for the same `request_id` increment the metric exactly once.
/// Exercises the `Context::oom_emitted` dedup flag.
#[tokio::test]
async fn test_try_increment_oom_metric_dedupes_same_request_id() {
let mut p = setup();
// Insert the context directly so we don't go through `on_invoke_event`, which
// would populate dynamic tags (`cold_start:true`) and complicate the query.
let request_id = String::from("req-dedup");
p.context_buffer.start_context(&request_id, Span::default());

let now: i64 = std::time::UNIX_EPOCH
.elapsed()
.expect("clock")
.as_secs()
.try_into()
.unwrap_or_default();

p.on_out_of_memory_error(Some(&request_id), now);
p.on_out_of_memory_error(Some(&request_id), now);

let ts = (now / 10) * 10;
let entry = p
.enhanced_metrics
.aggr_handle
.get_entry_by_id(
crate::metrics::enhanced::constants::OUT_OF_MEMORY_METRIC.into(),
None,
ts,
)
.await
.unwrap()
.expect("OOM metric must be emitted at least once");

let sketch = entry.value.get_sketch().expect("distribution sketch");
let sum = sketch.sum().expect("sketch sum");
assert!(
(sum - 1.0).abs() < f64::EPSILON,
"OOM sum must be 1.0 (deduped), got {sum}"
);

// And the context flag should now reflect that we emitted.
assert!(
p.context_buffer
.get(&request_id)
.expect("context")
.oom_emitted,
"oom_emitted flag must be set after the first emission"
);
}

/// OOM signals for different `request_id`s each emit a metric — dedup is scoped
/// per request, not globally.
#[tokio::test]
async fn test_try_increment_oom_metric_distinct_request_ids_emit_separately() {
let mut p = setup();
let req1 = String::from("req-a");
let req2 = String::from("req-b");
p.context_buffer.start_context(&req1, Span::default());
p.context_buffer.start_context(&req2, Span::default());

let now: i64 = std::time::UNIX_EPOCH
.elapsed()
.expect("clock")
.as_secs()
.try_into()
.unwrap_or_default();

p.on_out_of_memory_error(Some(&req1), now);
p.on_out_of_memory_error(Some(&req2), now);

let ts = (now / 10) * 10;
let entry = p
.enhanced_metrics
.aggr_handle
.get_entry_by_id(
crate::metrics::enhanced::constants::OUT_OF_MEMORY_METRIC.into(),
None,
ts,
)
.await
.unwrap()
.expect("OOM metric must be emitted");

let sketch = entry.value.get_sketch().expect("distribution sketch");
let sum = sketch.sum().expect("sketch sum");
assert!(
(sum - 2.0).abs() < f64::EPSILON,
"OOM sum must be 2.0 (one per request_id), got {sum}"
);
}

/// Regression: the `max_memory_used_mb == memory_size_mb` path used to be gated
/// on `runtime.starts_with("provided.al")`. After generalising the rule to all
/// runtimes (with dedup via `Context::oom_emitted`), the equality case must
/// still emit OOM.
#[tokio::test]
async fn test_handle_ondemand_report_emits_oom_on_memory_equality() {
let mut p = setup();
let request_id = String::from("req-eq");
p.context_buffer.start_context(&request_id, Span::default());

let now: i64 = std::time::UNIX_EPOCH
.elapsed()
.expect("clock")
.as_secs()
.try_into()
.unwrap_or_default();

let metrics = OnDemandReportMetrics {
duration_ms: 100.0,
billed_duration_ms: 100,
memory_size_mb: 1024,
max_memory_used_mb: 1024,
init_duration_ms: None,
restore_duration_ms: None,
};
p.handle_ondemand_report(&request_id, metrics, now);

let ts = (now / 10) * 10;
assert!(
p.enhanced_metrics
.aggr_handle
.get_entry_by_id(
crate::metrics::enhanced::constants::OUT_OF_MEMORY_METRIC.into(),
None,
ts
)
.await
.unwrap()
.is_some(),
"OOM must be emitted when max_memory_used_mb == memory_size_mb"
);
}
}
15 changes: 12 additions & 3 deletions bottlecap/src/lifecycle/invocation/processor_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ pub enum ProcessorCommand {
execution_status: Option<String>,
},
OnOutOfMemoryError {
request_id: Option<String>,
timestamp: i64,
},
OnShutdownEvent,
Expand Down Expand Up @@ -407,10 +408,14 @@ impl InvocationProcessorHandle {

pub async fn on_out_of_memory_error(
&self,
request_id: Option<String>,
timestamp: i64,
) -> Result<(), mpsc::error::SendError<ProcessorCommand>> {
self.sender
.send(ProcessorCommand::OnOutOfMemoryError { timestamp })
.send(ProcessorCommand::OnOutOfMemoryError {
request_id,
timestamp,
})
.await
}

Expand Down Expand Up @@ -632,8 +637,12 @@ impl InvocationProcessorService {
)
.await;
}
ProcessorCommand::OnOutOfMemoryError { timestamp } => {
self.processor.on_out_of_memory_error(timestamp);
ProcessorCommand::OnOutOfMemoryError {
request_id,
timestamp,
} => {
self.processor
.on_out_of_memory_error(request_id.as_ref(), timestamp);
}
ProcessorCommand::OnShutdownEvent => {
self.processor.on_shutdown_event();
Expand Down
Loading
Loading