diff --git a/cloud_pipelines_backend/instrumentation/execution_tracing.py b/cloud_pipelines_backend/instrumentation/execution_tracing.py index a9f8d26..f739e59 100644 --- a/cloud_pipelines_backend/instrumentation/execution_tracing.py +++ b/cloud_pipelines_backend/instrumentation/execution_tracing.py @@ -137,6 +137,13 @@ def _cache_attrs(*, execution: bts.ExecutionNode) -> dict[str, object]: return attrs +def _pipeline_attrs(*, execution: bts.ExecutionNode) -> dict[str, object]: + """Parent execution context for the root execution span.""" + if execution.parent_execution_id is None: + return {} + return {"execution.parent_id": execution.parent_execution_id} + + def _ns(*, dt: datetime.datetime) -> int: """Return *dt* as nanoseconds since the Unix epoch (required by OTel SDK).""" if dt.tzinfo is None: @@ -164,6 +171,7 @@ def emit_execution_trace(*, execution: bts.ExecutionNode) -> None: **_launcher_type_attrs(execution=execution), **_cloud_provider_attrs(execution=execution), **_cache_attrs(execution=execution), + **_pipeline_attrs(execution=execution), }, start_time=_ns(dt=first_time), ) diff --git a/tests/instrumentation/test_execution_tracing.py b/tests/instrumentation/test_execution_tracing.py index fffe3b7..debd4e2 100644 --- a/tests/instrumentation/test_execution_tracing.py +++ b/tests/instrumentation/test_execution_tracing.py @@ -348,7 +348,7 @@ def test_cache_miss_sets_hit_false( self, span_exporter: InMemorySpanExporter ) -> None: execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"]) - execution_tracing.try_emit_execution_trace(execution=execution) + execution_tracing.emit_execution_trace(execution=execution) root = next( s for s in span_exporter.get_finished_spans() if s.name == "execution" @@ -362,7 +362,7 @@ def test_cache_hit_sets_hit_true_and_reused_from_id( statuses=["QUEUED", "SUCCEEDED"], extra={"reused_from_execution_node_id": "source-execution-id"}, ) - execution_tracing.try_emit_execution_trace(execution=execution) + execution_tracing.emit_execution_trace(execution=execution) root = next( s for s in span_exporter.get_finished_spans() if s.name == "execution" @@ -371,3 +371,28 @@ def test_cache_hit_sets_hit_true_and_reused_from_id( assert ( root.attributes["execution.cache.reused_from_id"] == "source-execution-id" ) + + +class TestPipelineAttrs: + def test_root_span_carries_parent_id( + self, span_exporter: InMemorySpanExporter + ) -> None: + execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"]) + execution.parent_execution_id = "parent-exec-id" + execution_tracing.emit_execution_trace(execution=execution) + + root = next( + s for s in span_exporter.get_finished_spans() if s.name == "execution" + ) + assert root.attributes["execution.parent_id"] == "parent-exec-id" + + def test_root_execution_omits_parent_id_when_absent( + self, span_exporter: InMemorySpanExporter + ) -> None: + execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"]) + execution_tracing.emit_execution_trace(execution=execution) + + root = next( + s for s in span_exporter.get_finished_spans() if s.name == "execution" + ) + assert "execution.parent_id" not in (root.attributes or {})