From 22c78a79ad6667da7e5f2cc7f4f5a9eed56316cd Mon Sep 17 00:00:00 2001 From: Sean Ghaeli Date: Fri, 8 May 2026 22:36:21 +0000 Subject: [PATCH 1/5] Add XCom read access to callback supervisor comms channel Callbacks running inside the CallbackSubprocess can now read XCom values via the same supervisor IPC mechanism used for Connections and Variables. The caller must supply explicit dag_id, run_id, and task_id since callbacks have no implicit task context. This adds GetXCom to the CallbackToSupervisor union and routes it through a new shared handle_get_xcom handler in request_handlers.py. Only read access (GetXCom) is exposed; SetXCom and DeleteXCom remain out of scope for callbacks. --- .../sdk/execution_time/callback_supervisor.py | 17 +++++++---- .../test_callback_supervisor.py | 30 +++++++++++++++++++ 2 files changed, 41 insertions(+), 6 deletions(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py b/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py index 579833f413df0..72e50b54b18bf 100644 --- a/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py @@ -35,12 +35,14 @@ GetConnection, GetVariable, GetVariableKeys, + GetXCom, MaskSecret, ) from airflow.sdk.execution_time.request_handlers import ( handle_get_connection, handle_get_variable, handle_get_variable_keys, + handle_get_xcom, handle_mask_secret, ) from airflow.sdk.execution_time.supervisor import ( @@ -71,10 +73,10 @@ class _BundleInfoLike(Protocol): # The set of messages that a callback subprocess can send to the supervisor. -# This is a minimal subset of ToSupervisor: read-only access to Connections -# and Variables, plus MaskSecret for the secrets masker. +# This is a minimal subset of ToSupervisor: read-only access to Connections, +# Variables, and XCom values, plus MaskSecret for the secrets masker. CallbackToSupervisor = Annotated[ - GetConnection | GetVariable | GetVariableKeys | MaskSecret, + GetConnection | GetVariable | GetVariableKeys | GetXCom | MaskSecret, Field(discriminator="type"), ] @@ -158,9 +160,10 @@ class CallbackSubprocess(WatchedSubprocess): Uses the WatchedSubprocess infrastructure for fork/monitor/signal handling while keeping a simple lifecycle: start, run callback, exit. - Provides a limited set of comms channels (Connections and Variables) so - that callback code can access runtime services like - ``Connection.get()`` and ``Variable.get()`` via the supervisor's API client. + Provides a limited set of comms channels (Connections, Variables, and XCom) + so that callback code can access runtime services like + ``Connection.get()``, ``Variable.get()``, and ``XCom.get()`` via the + supervisor's API client. """ client: Client # The HTTP client to use for communication with the API server. @@ -288,6 +291,8 @@ def _handle_request(self, msg: CallbackToSupervisor, log: FilteringBoundLogger, resp, dump_opts = handle_get_variable(self.client, msg) elif isinstance(msg, GetVariableKeys): resp, dump_opts = handle_get_variable_keys(self.client, msg) + elif isinstance(msg, GetXCom): + resp, dump_opts = handle_get_xcom(self.client, msg) elif isinstance(msg, MaskSecret): handle_mask_secret(msg) else: diff --git a/task-sdk/tests/task_sdk/execution_time/test_callback_supervisor.py b/task-sdk/tests/task_sdk/execution_time/test_callback_supervisor.py index 523a7d0c60413..9a740d5ad0afa 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_callback_supervisor.py +++ b/task-sdk/tests/task_sdk/execution_time/test_callback_supervisor.py @@ -28,12 +28,14 @@ import pytest import structlog +from airflow.sdk.api.datamodels._generated import XComResponse from airflow.sdk.execution_time.callback_supervisor import CallbackSubprocess, execute_callback from airflow.sdk.execution_time.comms import ( ConnectionResult, GetConnection, GetVariable, GetVariableKeys, + GetXCom, MaskSecret, VariableKeysResult, VariableResult, @@ -189,6 +191,34 @@ class RequestCase: method_path="variables.keys", kwargs={"prefix": "test_", "limit": 1000, "offset": 0}, response=VariableKeysResult(keys=["test_key"], total_entries=1), + message=GetXCom( + key="return_value", + dag_id="test_dag", + run_id="test_run_1", + task_id="upstream_task", + map_index=None, + ), + test_id="get_xcom", + client_mock=ClientMock( + method_path="xcoms.get", + args=("test_dag", "test_run_1", "upstream_task", "return_value", None, False), + response=XComResponse(key="return_value", value="xcom_payload"), + ), + ), + RequestCase( + message=GetXCom( + key="custom_key", + dag_id="dag_a", + run_id="run_42", + task_id="task_b", + map_index=3, + include_prior_dates=True, + ), + test_id="get_xcom_with_map_index", + client_mock=ClientMock( + method_path="xcoms.get", + args=("dag_a", "run_42", "task_b", "custom_key", 3, True), + response=XComResponse(key="custom_key", value={"nested": "data"}), ), ), RequestCase( From 1d10446233a24881b171e29589d4657bc32ae97f Mon Sep 17 00:00:00 2001 From: Sean Ghaeli Date: Thu, 28 May 2026 00:54:55 +0000 Subject: [PATCH 2/5] Fix syntax error in test file from merge conflict resolution --- .../tests/task_sdk/execution_time/test_callback_supervisor.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/task-sdk/tests/task_sdk/execution_time/test_callback_supervisor.py b/task-sdk/tests/task_sdk/execution_time/test_callback_supervisor.py index 9a740d5ad0afa..e5db8f18070b9 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_callback_supervisor.py +++ b/task-sdk/tests/task_sdk/execution_time/test_callback_supervisor.py @@ -191,6 +191,9 @@ class RequestCase: method_path="variables.keys", kwargs={"prefix": "test_", "limit": 1000, "offset": 0}, response=VariableKeysResult(keys=["test_key"], total_entries=1), + ), + ), + RequestCase( message=GetXCom( key="return_value", dag_id="test_dag", From 7994b0e8057e215b97e028d53f224ef4b9157e55 Mon Sep 17 00:00:00 2001 From: Sean Ghaeli Date: Thu, 28 May 2026 01:00:06 +0000 Subject: [PATCH 3/5] Fix GetXCom 403: accept workload tokens on get_xcom endpoint --- .../src/airflow/api_fastapi/execution_api/routes/xcoms.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/xcoms.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/xcoms.py index a7592e4f0f2ec..ab787755d8589 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/xcoms.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/xcoms.py @@ -20,7 +20,7 @@ import logging from typing import Annotated -from fastapi import APIRouter, Body, Depends, HTTPException, Path, Query, Request, Response, status +from fastapi import APIRouter, Body, Depends, HTTPException, Path, Query, Request, Response, Security, status from pydantic import JsonValue from sqlalchemy import delete from sqlalchemy.sql.selectable import Select @@ -32,7 +32,7 @@ XComSequenceIndexResponse, XComSequenceSliceResponse, ) -from airflow.api_fastapi.execution_api.security import CurrentTIToken +from airflow.api_fastapi.execution_api.security import CurrentTIToken, require_auth from airflow.models.taskmap import TaskMap from airflow.models.xcom import XComModel from airflow.utils.db import get_query_count @@ -266,6 +266,7 @@ class GetXcomFilterParams(BaseModel): @router.get( "/{dag_id}/{run_id}/{task_id}/{key:path}", description="Get a single XCom Value", + dependencies=[Security(require_auth, scopes=["token:execution", "token:workload"])], ) def get_xcom( dag_id: str, From 5ea5442be0357a8ad3c05b4bd4569742a1a1d396 Mon Sep 17 00:00:00 2001 From: Sean Ghaeli Date: Thu, 28 May 2026 02:28:01 +0000 Subject: [PATCH 4/5] Fix: accept workload tokens on connections and variables execution API routes Callback subprocesses use workload-scoped JWT tokens, but the connections and variables endpoints defaulted to execution-only token acceptance. Any callback using Connection.get() or Variable.get() would receive a 403. Add Security(require_auth, scopes=[...workload]) at the router level to allow both token types, matching the fix applied to dag_runs in #66608. --- .../api_fastapi/execution_api/routes/connections.py | 9 ++++++--- .../api_fastapi/execution_api/routes/variables.py | 8 +++++--- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/connections.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/connections.py index 8289dcf97fb8a..2166df5d7f283 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/connections.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/connections.py @@ -20,10 +20,10 @@ import logging from typing import Annotated -from fastapi import APIRouter, Depends, HTTPException, Path, status +from fastapi import APIRouter, Depends, HTTPException, Path, Security, status from airflow.api_fastapi.execution_api.datamodels.connection import ConnectionResponse -from airflow.api_fastapi.execution_api.security import CurrentTIToken, get_team_name_dep +from airflow.api_fastapi.execution_api.security import CurrentTIToken, get_team_name_dep, require_auth from airflow.exceptions import AirflowNotFoundException from airflow.models.connection import Connection @@ -50,7 +50,10 @@ async def has_connection_access( router = APIRouter( responses={status.HTTP_404_NOT_FOUND: {"description": "Connection not found"}}, - dependencies=[Depends(has_connection_access)], + dependencies=[ + Security(require_auth, scopes=["token:execution", "token:workload"]), + Depends(has_connection_access), + ], ) log = logging.getLogger(__name__) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/variables.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/variables.py index 631ad35ded17f..cf79aaac7a663 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/variables.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/variables.py @@ -20,7 +20,7 @@ import logging from typing import Annotated -from fastapi import APIRouter, Depends, HTTPException, Path, Query, Request, status +from fastapi import APIRouter, Depends, HTTPException, Path, Query, Request, Security, status from sqlalchemy import func, select from airflow.api_fastapi.common.db.common import SessionDep @@ -29,7 +29,7 @@ VariablePostBody, VariableResponse, ) -from airflow.api_fastapi.execution_api.security import CurrentTIToken, get_team_name_dep +from airflow.api_fastapi.execution_api.security import CurrentTIToken, get_team_name_dep, require_auth from airflow.models.variable import Variable @@ -57,7 +57,9 @@ async def has_variable_access( return True -router = APIRouter() +router = APIRouter( + dependencies=[Security(require_auth, scopes=["token:execution", "token:workload"])], +) log = logging.getLogger(__name__) From 454244a87ea243695d4f20cab0a51870d7197731 Mon Sep 17 00:00:00 2001 From: Sean Ghaeli Date: Thu, 28 May 2026 02:28:46 +0000 Subject: [PATCH 5/5] Retrigger CI: provider compat test flake