diff --git a/cloud_pipelines_backend/instrumentation/execution_tracing.py b/cloud_pipelines_backend/instrumentation/execution_tracing.py index e92bf72..2afacb9 100644 --- a/cloud_pipelines_backend/instrumentation/execution_tracing.py +++ b/cloud_pipelines_backend/instrumentation/execution_tracing.py @@ -18,12 +18,16 @@ _logger = logging.getLogger(__name__) _tracer = trace.get_tracer("tangle.orchestrator") +_CLOUD_PROVIDER_ANNOTATION_KEY = "cloud-pipelines.net/orchestration/cloud_provider" + _HISTORY_KEY = bts.EXECUTION_NODE_EXTRA_DATA_STATUS_HISTORY_KEY _TERMINAL_STATUSES = frozenset(s.value for s in bts.CONTAINER_STATUSES_ENDED) -_ERROR_TERMINAL_STATUSES = frozenset({ - bts.ContainerExecutionStatus.FAILED, - bts.ContainerExecutionStatus.SYSTEM_ERROR, -}) +_ERROR_TERMINAL_STATUSES = frozenset( + { + bts.ContainerExecutionStatus.FAILED, + bts.ContainerExecutionStatus.SYSTEM_ERROR, + } +) def _error_attrs(*, execution: bts.ExecutionNode, status: str) -> dict[str, object]: @@ -81,6 +85,46 @@ def _launcher_pod_attrs( return attrs +def _launcher_type_attrs(*, execution: bts.ExecutionNode) -> dict[str, object]: + """Launcher type and cluster identity for the root execution span. + + Uses the top-level key of launcher_data as the launcher type — forward-compatible + with any launcher implementation. For k8s-family launchers adds cluster_server + so GKE and Nebius clusters (which share the same launcher class in oasis-backend) + can be distinguished by URL pattern. + """ + if execution.container_execution_id is None: + return {} + ce = execution.container_execution + if ce is None or not ce.launcher_data: + return {} + launcher_key = next(iter(ce.launcher_data)) + attrs: dict[str, object] = {"execution.launcher": launcher_key} + inner = ce.launcher_data[launcher_key] + if isinstance(inner, dict) and (cluster_url := inner.get("cluster_server")): + attrs["k8s.cluster.url"] = cluster_url + return attrs + + +def _cloud_provider_attrs(*, execution: bts.ExecutionNode) -> dict[str, object]: + """Cloud provider for the root execution span, read from task_spec annotations. + + Returns ``{"execution.cloud_provider": value}`` when the + ``cloud-pipelines.net/orchestration/cloud_provider`` annotation is present, + otherwise an empty dict. Callers (e.g. oasis-backend's MultiLauncherContainerLauncher) + set this annotation at routing time; tangle launchers with a fixed cloud affinity + can set it too. + """ + provider = ( + (execution.task_spec or {}) + .get("annotations", {}) + .get(_CLOUD_PROVIDER_ANNOTATION_KEY) + ) + if provider is None: + return {} + return {"execution.cloud_provider": provider} + + def _ns(*, dt: datetime.datetime) -> int: """Return *dt* as nanoseconds since the Unix epoch (required by OTel SDK).""" if dt.tzinfo is None: @@ -103,7 +147,11 @@ def emit_execution_trace(*, execution: bts.ExecutionNode) -> None: root = _tracer.start_span( "execution", - attributes={"execution.id": execution.id}, + attributes={ + "execution.id": execution.id, + **_launcher_type_attrs(execution=execution), + **_cloud_provider_attrs(execution=execution), + }, start_time=_ns(dt=first_time), ) root_ctx = trace.set_span_in_context(root) diff --git a/tests/instrumentation/test_execution_tracing.py b/tests/instrumentation/test_execution_tracing.py index 8b6bf37..9ccef8e 100644 --- a/tests/instrumentation/test_execution_tracing.py +++ b/tests/instrumentation/test_execution_tracing.py @@ -277,3 +277,67 @@ def test_non_pending_span_has_no_k8s_attributes( for span in span_exporter.get_finished_spans(): assert "k8s.pod.name" not in (span.attributes or {}) + + +class TestLauncherTypeAttrs: + def test_root_span_carries_launcher_type_and_cluster_url( + self, span_exporter: InMemorySpanExporter + ) -> None: + execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"]) + ce = bts.ContainerExecution( + status=bts.ContainerExecutionStatus.SUCCEEDED, + launcher_data={ + "kubernetes": { + "pod_name": "exec-abc-pod", + "namespace": "tangle", + "cluster_server": "https://gke.example.com", + } + }, + ) + execution.container_execution = ce + execution.container_execution_id = "ce-test-id" + execution_tracing.emit_execution_trace(execution=execution) + + root = next( + s for s in span_exporter.get_finished_spans() if s.name == "execution" + ) + assert root.attributes["execution.launcher"] == "kubernetes" + assert root.attributes["k8s.cluster.url"] == "https://gke.example.com" + + def test_root_span_omits_launcher_without_container_execution( + self, span_exporter: InMemorySpanExporter + ) -> None: + execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"]) + execution_tracing.emit_execution_trace(execution=execution) + + root = next( + s for s in span_exporter.get_finished_spans() if s.name == "execution" + ) + assert "execution.launcher" not in (root.attributes or {}) + + def test_root_span_carries_cloud_provider_when_annotation_set( + self, span_exporter: InMemorySpanExporter + ) -> None: + execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"]) + execution.task_spec = { + "annotations": { + "cloud-pipelines.net/orchestration/cloud_provider": "nebius" + } + } + execution_tracing.emit_execution_trace(execution=execution) + + root = next( + s for s in span_exporter.get_finished_spans() if s.name == "execution" + ) + assert root.attributes["execution.cloud_provider"] == "nebius" + + def test_root_span_omits_cloud_provider_when_annotation_absent( + self, span_exporter: InMemorySpanExporter + ) -> None: + execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"]) + execution_tracing.emit_execution_trace(execution=execution) + + root = next( + s for s in span_exporter.get_finished_spans() if s.name == "execution" + ) + assert "execution.cloud_provider" not in (root.attributes or {})