We have four WatchedSubprocess subclasses that each implement _handle_request with large if/elif chains dispatching comms messages. Many of these handlers are duplicated inline across multiple supervisors:
- ActivitySubprocess (task-sdk, supervisor.py)
- CallbackSubprocess (task-sdk, callback_supervisor.py)
- TriggerRunnerSupervisor (airflow-core, triggerer_job_runner.py)
- DagFileProcessorProcess (airflow-core, processor.py)
A request_handlers.py module already exists in task-sdk/src/airflow/sdk/execution_time/ with shared handlers for GetConnection, GetVariable, GetAssetByName, GetAssetByUri, and MaskSecret. The Callback and Activity supervisors use these, but the Triggerer and DFP still have inline duplicates.
The Task:
Extract the remaining duplicated handler logic into shared functions in request_handlers.py and have all four supervisors call them.
Note: The implementations across supervisors are similar but not identical. Some include isinstance() guards on the response; some pass different parameter subsets; some set dump_opts while others don't. When extracting, don't just copy one supervisor's version verbatim. Compare all implementations of each handler and produce a single version that incorporates the best practices from each (e.g. proper response type checking, consistent dump_opts, full parameter forwarding).
Here is my suggestion, but you may come up with a better set of rules once you start working on it:
- Use the existing shared handlers as a guide, but don't be afraid to modify them if you come up with a new standard.
- Each new handler should be a standalone function with this signature:
def handle_<message_type>(client: Client, msg: <MessageType>) -> tuple[BaseModel | None, dict[str, bool]]:
- Always guard the response with
isinstance before converting it to a comms result model. The API client can return ErrorResponse on failure; when that happens, pass it through unchanged.
- Always return dump_opts as the second tuple element. Use
{"exclude_unset": True} for result models that wrap API responses to avoid serializing None fields. Add {"by_alias": True} when the result model uses field aliases (currently only ConnectionResult). For simple pass-through responses or error responses, return {}.
- Forward all message fields to the client call. Some supervisors currently pass fewer parameters than others for the same message type (e.g. the Triggerer's
GetXCom omits include_prior_dates). The shared handler should forward every field that the message carries and let the client and server handle defaults for optional fields.
- Always mask secrets. If the response contains sensitive data (passwords, tokens, variable values), call
mask_secret() before returning. Look at handle_get_connection and handle_get_variable for examples.
- Handlers that don't return a response such as
PutVariable and DeleteXCom can use a simpler signature. return (None, {}) or return None directly like handle_mask_secret does. Pick whichever is cleaner for the specific case, but be consistent within a batch.
- Keep handlers stateless. They should not touch subprocess internals (exit codes, terminal states, etc.). If a message type requires updating subprocess state, it belongs inline in that supervisor's
_handle_request, not in a shared handler.
Phase 1: High-overlap handlers (three or more supervisors share this logic)
- GetXCom - Activity, Triggerer, DFP
- PutVariable - Activity, Triggerer, DFP
- DeleteVariable - Activity, Triggerer, DFP
- GetTICount - Activity, Triggerer, DFP
- GetTaskStates - Activity, Triggerer, DFP
- GetPreviousTI - Activity, Triggerer, DFP
Phase 2: Medium-overlap handlers (two supervisors share these)
- SetXCom - Activity, Triggerer
- DeleteXCom - Activity, Triggerer
- GetDRCount - Activity, Triggerer
- GetDagRunState - Activity, Triggerer
- GetPreviousDagRun - Activity, DFP
- GetPrevSuccessfulDagRun - Activity, DFP
- GetXComCount - Activity, DFP
- GetXComSequenceItem - Activity, DFP
- GetXComSequenceSlice - Activity, DFP
Phase 2.5: Callback Supervisor
Comms channels were added to the Callback supervisor in #65269 and all of the comms channels it has as of the time I am writing this are already in shared helpers, but have a look and make sure that is still true while you are doing this.
Phase 3: Migrate DagFileProcessorProcess onto shared handlers
The DFP currently has fully inline versions of GetConnection, GetVariable, and MaskSecret even though shared handlers already exist. One thing to note: the DFP's inline GetConnection handler skips the mask_secret() calls on password and extra that the shared handler performs, which I personally feel is a bug but may require discussion. Maybe there's a reason I'm not aware of.
Out of Scope:
Supervisor-specific messages (e.g. TaskState, DeferTask, TriggerStateChanges, DagFileParsingResult) should either stay inline or move to a supervisor-specific helper/utils module (I vote leave alone, personally) since they interact with internal subprocess state.
We have four
WatchedSubprocesssubclasses that each implement_handle_requestwith large if/elif chains dispatching comms messages. Many of these handlers are duplicated inline across multiple supervisors:A
request_handlers.pymodule already exists intask-sdk/src/airflow/sdk/execution_time/with shared handlers forGetConnection,GetVariable,GetAssetByName,GetAssetByUri, andMaskSecret. The Callback and Activity supervisors use these, but the Triggerer and DFP still have inline duplicates.The Task:
Extract the remaining duplicated handler logic into shared functions in
request_handlers.pyand have all four supervisors call them.Note: The implementations across supervisors are similar but not identical. Some include
isinstance()guards on the response; some pass different parameter subsets; some setdump_optswhile others don't. When extracting, don't just copy one supervisor's version verbatim. Compare all implementations of each handler and produce a single version that incorporates the best practices from each (e.g. proper response type checking, consistent dump_opts, full parameter forwarding).Here is my suggestion, but you may come up with a better set of rules once you start working on it:
isinstancebefore converting it to a comms result model. The API client can returnErrorResponseon failure; when that happens, pass it through unchanged.{"exclude_unset": True}for result models that wrap API responses to avoid serializingNonefields. Add{"by_alias": True}when the result model uses field aliases (currently onlyConnectionResult). For simple pass-through responses or error responses, return{}.GetXComomitsinclude_prior_dates). The shared handler should forward every field that the message carries and let the client and server handle defaults for optional fields.mask_secret()before returning. Look athandle_get_connectionandhandle_get_variablefor examples.PutVariableandDeleteXComcan use a simpler signature.return (None, {})or returnNonedirectly likehandle_mask_secretdoes. Pick whichever is cleaner for the specific case, but be consistent within a batch._handle_request, not in a shared handler.Phase 1: High-overlap handlers (three or more supervisors share this logic)
Phase 2: Medium-overlap handlers (two supervisors share these)
Phase 2.5: Callback Supervisor
Comms channels were added to the Callback supervisor in #65269 and all of the comms channels it has as of the time I am writing this are already in shared helpers, but have a look and make sure that is still true while you are doing this.
Phase 3: Migrate DagFileProcessorProcess onto shared handlers
The DFP currently has fully inline versions of
GetConnection,GetVariable, andMaskSecreteven though shared handlers already exist. One thing to note: the DFP's inlineGetConnectionhandler skips themask_secret()calls onpasswordandextrathat the shared handler performs, which I personally feel is a bug but may require discussion. Maybe there's a reason I'm not aware of.Out of Scope:
Supervisor-specific messages (e.g.
TaskState,DeferTask,TriggerStateChanges,DagFileParsingResult) should either stay inline or move to a supervisor-specific helper/utils module (I vote leave alone, personally) since they interact with internal subprocess state.