Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions airflow-core/newsfragments/63491.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
Deprecate ``BaseExecutor.queued_tasks``, ``queued_callbacks``, ``supports_callbacks``, ``trigger_tasks``, and ``order_queued_tasks_by_priority``

Executor workload state is now stored on the unified ``BaseExecutor.executor_queues`` mapping
keyed by ``WorkloadType``, and scheduling is driven by ``trigger_workloads`` /
``_get_workloads_to_schedule``. The previous per-type attributes and entrypoints are kept as
backward-compatible shims that emit ``RemovedInAirflow4Warning`` and will be removed in Airflow 4.0.

**Migration:**

- Replace ``executor.queued_tasks`` with ``executor.executor_queues[WorkloadType.EXECUTE_TASK]``.
- Replace ``executor.queued_callbacks`` with ``executor.executor_queues[WorkloadType.EXECUTE_CALLBACK]``.
- Replace ``supports_callbacks = True`` class declarations with
``supported_workload_types = frozenset({WorkloadType.EXECUTE_TASK, WorkloadType.EXECUTE_CALLBACK})``.
- Replace ``executor.trigger_tasks(open_slots)`` with ``executor.trigger_workloads(open_slots)``.
- Replace ``executor.order_queued_tasks_by_priority()`` with
``executor._get_workloads_to_schedule(open_slots)``.

Legacy ``supports_callbacks = True`` class attributes on out-of-tree executors are still honored:
``BaseExecutor.__init_subclass__`` detects them and synthesizes the corresponding
``supported_workload_types`` entry while emitting a deprecation warning.

Deprecation warnings for these compat entrypoints are emitted at most once per executor class to
avoid flooding scheduler heartbeat logs.
215 changes: 138 additions & 77 deletions airflow-core/src/airflow/executors/base_executor.py

Large diffs are not rendered by default.

11 changes: 5 additions & 6 deletions airflow-core/src/airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import structlog

from airflow.executors.base_executor import BaseExecutor, get_execution_api_server_url
from airflow.executors.workloads import WorkloadType

# add logger to parameter of setproctitle to support logging
if sys.platform == "darwin":
Expand Down Expand Up @@ -125,7 +126,9 @@ class LocalExecutor(BaseExecutor):

supports_multi_team: bool = True
serve_logs: bool = True
supports_callbacks: bool = True
supported_workload_types: frozenset[WorkloadType] = frozenset(
{WorkloadType.EXECUTE_TASK, WorkloadType.EXECUTE_CALLBACK}
)

activity_queue: SimpleQueue[ExecutorWorkload | None]
result_queue: SimpleQueue[WorkloadResultType]
Expand Down Expand Up @@ -275,11 +278,7 @@ def terminate(self):
def _process_workloads(self, workload_list):
for workload in workload_list:
self.activity_queue.put(workload)
# A valid workload will exist in exactly one of these dicts.
# One pop will succeed, the other will return None gracefully.
removed = self.queued_tasks.pop(workload.key, None) or self.queued_callbacks.pop(
workload.key, None
)
removed = self.executor_queues[workload.type].pop(workload.key, None)
if not removed:
raise KeyError(f"Workload {workload.key} was not found in any queue")
with self._unread_messages:
Expand Down
4 changes: 3 additions & 1 deletion airflow-core/src/airflow/executors/workloads/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from pydantic import Field

from airflow.executors.workloads.base import BaseWorkload, BundleInfo
from airflow.executors.workloads.base import WORKLOAD_TYPE_PRIORITY, BaseWorkload, BundleInfo, WorkloadType
from airflow.executors.workloads.callback import CallbackFetchMethod, ExecuteCallback
from airflow.executors.workloads.task import ExecuteTask, TaskInstanceDTO
from airflow.executors.workloads.trigger import RunTrigger
Expand Down Expand Up @@ -50,4 +50,6 @@
"ExecutorWorkload",
"TaskInstance",
"TaskInstanceDTO",
"WORKLOAD_TYPE_PRIORITY",
"WorkloadType",
]
37 changes: 37 additions & 0 deletions airflow-core/src/airflow/executors/workloads/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import os
from abc import ABC, abstractmethod
from collections.abc import Hashable
from enum import Enum
from typing import TYPE_CHECKING

from pydantic import BaseModel, ConfigDict, Field
Expand All @@ -32,6 +33,30 @@
from airflow.executors.workloads.types import WorkloadState


class WorkloadType(str, Enum):
Comment thread
anishgirianish marked this conversation as resolved.
"""Central registry of executor workload types."""

EXECUTE_TASK = "ExecuteTask"
EXECUTE_CALLBACK = "ExecuteCallback"


# Central executor priority registry: tuple is ordered from highest priority to lowest.
#
# Adding a new workload type is a three-place change that must stay in sync:
# 1. ``WorkloadType`` — declare the enum member.
# 2. ``_workload_type_priority_order`` — insert it at the right priority slot.
# 3. ``airflow.executors.workloads.QueueableWorkload`` — extend the discriminated union
# so ``queue_workload`` can accept the new schema.
_workload_type_priority_order = (
WorkloadType.EXECUTE_CALLBACK,
WorkloadType.EXECUTE_TASK,
)

WORKLOAD_TYPE_PRIORITY: dict[WorkloadType, int] = {
name: idx for idx, name in enumerate(_workload_type_priority_order)
}


class BaseWorkload:
"""
Mixin for ORM models that can be scheduled as workloads.
Expand Down Expand Up @@ -161,3 +186,15 @@ def running_state(self) -> WorkloadState | None:
no intermediate state is emitted.
"""
return None

@property
def sort_key(self) -> int:
"""
Return the sort key for ordering workloads within the same priority.

The default of ``0`` gives FIFO behaviour (Python's stable sort preserves
insertion order among equal keys). Override in subclasses that need
priority ordering within their priority group — for example, ``ExecuteTask`` returns
``self.ti.priority_weight`` so that lower-weight tasks are scheduled first.
"""
return 0
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/executors/workloads/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import structlog
from pydantic import BaseModel, Field, field_validator

from airflow.executors.workloads.base import BaseDagBundleWorkload, BundleInfo
from airflow.executors.workloads.base import BaseDagBundleWorkload, BundleInfo, WorkloadType
from airflow.utils.state import CallbackState

if TYPE_CHECKING:
Expand Down Expand Up @@ -75,7 +75,7 @@ class ExecuteCallback(BaseDagBundleWorkload):

callback: CallbackDTO

type: Literal["ExecuteCallback"] = Field(init=False, default="ExecuteCallback")
type: Literal[WorkloadType.EXECUTE_CALLBACK] = Field(init=False, default=WorkloadType.EXECUTE_CALLBACK)

@property
def key(self) -> CallbackKey:
Expand Down
9 changes: 7 additions & 2 deletions airflow-core/src/airflow/executors/workloads/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from pydantic import BaseModel, Field

from airflow.executors.workloads.base import BaseDagBundleWorkload, BundleInfo
from airflow.executors.workloads.base import BaseDagBundleWorkload, BundleInfo, WorkloadType
from airflow.utils.state import TaskInstanceState

if TYPE_CHECKING:
Expand Down Expand Up @@ -84,13 +84,18 @@ class ExecuteTask(BaseDagBundleWorkload):
ti: TaskInstanceDTO
sentry_integration: str = ""

type: Literal["ExecuteTask"] = Field(init=False, default="ExecuteTask")
type: Literal[WorkloadType.EXECUTE_TASK] = Field(init=False, default=WorkloadType.EXECUTE_TASK)

@property
def key(self) -> TaskInstanceKey:
"""Return the TaskInstanceKey for this workload."""
return self.ti.key

@property
def sort_key(self) -> int:
"""Return the task priority weight for sorting (lower = higher priority)."""
return self.ti.priority_weight
Comment thread
anishgirianish marked this conversation as resolved.

@property
def display_name(self) -> str:
"""Return the task instance ID as a display name."""
Expand Down
7 changes: 7 additions & 0 deletions airflow-core/src/airflow/executors/workloads/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,20 @@
from airflow.utils.state import CallbackState, TaskInstanceState

if TYPE_CHECKING:
from airflow.executors.workloads.callback import ExecuteCallback
from airflow.executors.workloads.task import ExecuteTask

# Type aliases for workload keys and states (used by executor layer)
WorkloadKey: TypeAlias = TaskInstanceKey | CallbackKey
WorkloadState: TypeAlias = TaskInstanceState | CallbackState

# Type alias for executor workload results (used by executor implementations)
WorkloadResultType: TypeAlias = tuple[WorkloadKey, WorkloadState, Exception | None]

# Workload types that flow through executor queues (have key and sort_key).
# Update this union when adding a new queueable workload type.
QueueableWorkload: TypeAlias = ExecuteTask | ExecuteCallback

# Type alias for scheduler workloads (ORM models that can be routed to executors)
# Must be outside TYPE_CHECKING for use in function signatures
SchedulerWorkload: TypeAlias = TaskInstance | ExecutorCallback
Expand Down
Loading
Loading