- Notifications
You must be signed in to change notification settings - Fork 16.2k
Fix sensor skipping in Airflow 3.x branching operators #53455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| cc @fweilun @kandharvishnu @Ferdinanddb @dominikhei @AZSJTinus @kyungjunleeme @vatsrahul1001 @atul-astronomer - Could you guys help test it too in case I missed any other case. |
| hello :) In airflow.providers.standard.utils.skipmixin.SkipMixin.skip_all_except, I suggest enhancing the downstream task resolution logic by replacing: downstream_tasks = _ensure_tasks(task.downstream_list) # beforewith downstream_tasks = _ensure_tasks(task.get_flat_relatives(upstream=False)) # afterThis change can also be helpful as a protective measure to ensure all downstream tasks are correctly identified and skipped when necessary. |
ef2fdd6 to 2bc6a9a Compare
@kyungjunleeme No, that will cause bug. It should be left to the Let me explain why: The Key Difference
Why This Would Break Diamond/Join PatternsConsider this common DAG pattern: Current behavior (correct):
Proposed change (incorrect):
Why Join Tasks Should Run
Evidence from Airflow DocumentationThe official Airflow docs explicitly mention this behavior:
Current Code Already Handles ThisThe existing code even has a comment explaining this exact scenario: # For a branching workflow that looks like this, when "branch" does skip_all_except("task1"), # we intuitively expect both "task1" and "join" to execute even though strictly speaking, # "join" is also immediately downstream of "branch" and should have been skipped.The code correctly handles join patterns by using: for branch_task_id in list(branch_task_id_set): branch_task_id_set.update(dag.get_task(branch_task_id).get_flat_relative_ids(upstream=False))This ensures that tasks downstream of the chosen branch are also followed (not skipped). ConclusionThe current implementation correctly delegates the decision of whether downstream tasks should run to their trigger rules, which is the proper architectural approach in Airflow. Changing to
The suggestion, while well-intentioned, would introduce regressions in existing DAGs that rely on proper join task behavior. The current approach is architecturally sound and follows Airflow's design principles. |
| @kaxil |
It works fine in 3.1.0. |
d5aa29f to 5aaf154 Compare | Hi, I won't have the capacity to test this PR in the coming days unfortunately or maybe during the weekend, but from what I can see: some unit tests have been added and that's great, but could you also add a unit test to verify the behavior when using the In my case (#52869), I discovered the behavior while using the My workaround was to add an Thank you for taking care of this :). |
Yup, will do |
In Airflow 3.x, sensors inherit from airflow.sdk.BaseOperator instead of airflow.models.BaseOperator. The _ensure_tasks function in SkipMixin was only checking for the models BaseOperator, causing sensors to be filtered out and not properly skipped by branching operators like BranchSQLOperator. Updated the import logic to use the correct SDK BaseOperator for Airflow 3.x and added comprehensive tests to verify sensors are properly included in branching skip operations. Fixes apache#52219
5aaf154 to 07e3fa1 Compare | Test added for Was able to reproduce, same underlying issue. |
In Airflow 3.x, sensors inherit from airflow.sdk.BaseOperator instead of airflow.models.BaseOperator. The _ensure_tasks function in SkipMixin was only checking for the models BaseOperator, causing sensors to be filtered out and not properly skipped by branching operators like BranchSQLOperator. Updated the import logic to use the correct SDK BaseOperator for Airflow 3.x and added comprehensive tests to verify sensors are properly included in branching skip operations. Fixes apache#52219 (cherry picked from commit fc5410c)
This has a single bug fix - apache#53455
In Airflow 3.x, sensors inherit from
airflow.sdk.BaseOperatorinstead ofairflow.models.BaseOperator. The_ensure_tasksfunction inSkipMixinwas only checking for the modelsBaseOperator, causing sensors to be filtered out and not properly skipped by branching operators likeBranchSQLOperator.Updated the import logic to use the correct SDK BaseOperator for Airflow 3.x and added comprehensive tests to verify sensors are properly included in branching skip operations.
Closes #52219
Closes #53444
Closes #52869
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.