Skip to content

Conversation

@kaxil
Copy link
Member

@kaxil kaxil commented Jul 17, 2025

In Airflow 3.x, sensors inherit from airflow.sdk.BaseOperator instead of airflow.models.BaseOperator. The _ensure_tasks function in SkipMixin was only checking for the models BaseOperator, causing sensors to be filtered out and not properly skipped by branching operators like BranchSQLOperator.

Updated the import logic to use the correct SDK BaseOperator for Airflow 3.x and added comprehensive tests to verify sensors are properly included in branching skip operations.

Closes #52219
Closes #53444
Closes #52869


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@kaxil
Copy link
Member Author

kaxil commented Jul 17, 2025

cc @fweilun @kandharvishnu @Ferdinanddb @dominikhei @AZSJTinus @kyungjunleeme @vatsrahul1001 @atul-astronomer - Could you guys help test it too in case I missed any other case.

@kaxil kaxil added the full tests needed We need to run full set of tests for this PR to merge label Jul 17, 2025
@kyungjunleeme
Copy link
Contributor

kyungjunleeme commented Jul 17, 2025

hello :)

In airflow.providers.standard.utils.skipmixin.SkipMixin.skip_all_except, I suggest enhancing the downstream task resolution logic by replacing:

 downstream_tasks = _ensure_tasks(task.downstream_list) # before

with

 downstream_tasks = _ensure_tasks(task.get_flat_relatives(upstream=False)) # after

This change can also be helpful as a protective measure to ensure all downstream tasks are correctly identified and skipped when necessary.

@kaxil kaxil force-pushed the use-correct-baseop branch from ef2fdd6 to 2bc6a9a Compare July 17, 2025 14:06
@kaxil
Copy link
Member Author

kaxil commented Jul 17, 2025

hello :)

In airflow.providers.standard.utils.skipmixin.SkipMixin.skip_all_except, I suggest enhancing the downstream task resolution logic by replacing:

 downstream_tasks = _ensure_tasks(task.downstream_list) # before

with

 downstream_tasks = _ensure_tasks(task.get_flat_relatives(upstream=False)) # after

This change can also be helpful as a protective measure to ensure all downstream tasks are correctly identified and skipped when necessary.

@kyungjunleeme No, that will cause bug. It should be left to the trigger_rule for the other tasks that aren't direct downstream of the branch task.


Let me explain why:

The Key Difference

  • downstream_list: Returns only immediate/direct downstream tasks
  • get_flat_relatives(upstream=False): Returns ALL downstream tasks recursively (the entire downstream subgraph)

Why This Would Break Diamond/Join Patterns

Consider this common DAG pattern:

 branch_task / \ branch_a branch_b \ / join_task 

Current behavior (correct):

  • downstream_list = ["branch_a", "branch_b"]
  • When choosing branch_a: Skip only branch_b
  • join_task runs because it's downstream of the chosen branch_a

Proposed change (incorrect):

  • get_flat_relatives(upstream=False) = ["branch_a", "branch_b", "join_task"]
  • When choosing branch_a: Skip branch_b AND join_task
  • join_task incorrectly skipped even though it should run!

Why Join Tasks Should Run

  1. Join tasks are designed to run when at least one branch completes successfully
  2. Trigger rules handle this: Join tasks typically use none_failed_min_one_success or similar trigger rules
  3. This is standard DAG design: The diamond pattern is fundamental in workflow orchestration

Evidence from Airflow Documentation

The official Airflow docs explicitly mention this behavior:

When a Task is downstream of both the branching operator and downstream of one or more of the selected tasks, it will not be skipped

Current Code Already Handles This

The existing code even has a comment explaining this exact scenario:

# For a branching workflow that looks like this, when "branch" does skip_all_except("task1"), # we intuitively expect both "task1" and "join" to execute even though strictly speaking, # "join" is also immediately downstream of "branch" and should have been skipped.

The code correctly handles join patterns by using:

for branch_task_id in list(branch_task_id_set): branch_task_id_set.update(dag.get_task(branch_task_id).get_flat_relative_ids(upstream=False))

This ensures that tasks downstream of the chosen branch are also followed (not skipped).

Conclusion

The current implementation correctly delegates the decision of whether downstream tasks should run to their trigger rules, which is the proper architectural approach in Airflow. Changing to get_flat_relatives would:

  1. ❌ Break diamond/join patterns - a fundamental DAG design
  2. ❌ Ignore trigger rules - which are designed to handle downstream task execution
  3. ❌ Skip tasks that should run - violating expected workflow behavior

The suggestion, while well-intentioned, would introduce regressions in existing DAGs that rely on proper join task behavior. The current approach is architecturally sound and follows Airflow's design principles.

@kyungjunleeme
Copy link
Contributor

@kaxil
Thank you so much for the detailed explanation. I really appreciate it, and I’ll make sure to fully understand the reasoning behind it.

@fweilun
Copy link
Contributor

fweilun commented Jul 17, 2025

cc @fweilun @kandharvishnu @Ferdinanddb @dominikhei @AZSJTinus @kyungjunleeme @vatsrahul1001 @atul-astronomer - Could you guys help test it too in case I missed any other case.

It works fine in 3.1.0.

@kaxil kaxil force-pushed the use-correct-baseop branch 2 times, most recently from d5aa29f to 5aaf154 Compare July 17, 2025 14:57
@Ferdinanddb
Copy link
Contributor

Ferdinanddb commented Jul 17, 2025

Hi, I won't have the capacity to test this PR in the coming days unfortunately or maybe during the weekend, but from what I can see: some unit tests have been added and that's great, but could you also add a unit test to verify the behavior when using the ShortCircuitOperator operator please?

In my case (#52869), I discovered the behavior while using the ShortCircuitOperator operator: my direct child task which was a sensor was not skipped while it should have been the case, all the other child tasks (after the sensor) were skipped though.

My workaround was to add an EmptyOperator between my ShortCircuitOperator and my sensor, and then it worked as expected (meaning the sensor was skipped, in addition to all the other operator tasks). Given this, do you think you could reproduce the bug? I am using a custom S3 sensor in my case, but I believe that it can be reproduce with every kind of sensors, including a basic one. Hope that it is clear, otherwise I tried to document everything back then in my issue (#52869).

Thank you for taking care of this :).

@kaxil
Copy link
Member Author

kaxil commented Jul 17, 2025

Hi, I won't have the capacity to test this PR in the coming days unfortunately or maybe during the weekend, but from what I can see: some unit tests have been added and that's great, but could you also add a unit test to verify the behavior when using the ShortCircuitOperator operator please?

In my case (#52869), I discovered the behavior while using the ShortCircuitOperator operator: my direct child task which was a sensor was not skipped while it should have been the case, all the other child tasks (after the sensor) were skipped though.

My workaround was to add an EmptyOperator between my ShortCircuitOperator and my sensor, and then it worked as expected (meaning the sensor was skipped, in addition to all the other operator tasks). Given this, do you think you could reproduce the bug? I am using a custom S3 sensor in my case, but I believe that it can be reproduce with every kind of sensors, including a basic one. Hope that it is clear, otherwise I tried to document everything back then in my issue (#52869).

Thank you for taking care of this :).

Yup, will do

In Airflow 3.x, sensors inherit from airflow.sdk.BaseOperator instead of airflow.models.BaseOperator. The _ensure_tasks function in SkipMixin was only checking for the models BaseOperator, causing sensors to be filtered out and not properly skipped by branching operators like BranchSQLOperator. Updated the import logic to use the correct SDK BaseOperator for Airflow 3.x and added comprehensive tests to verify sensors are properly included in branching skip operations. Fixes apache#52219
@kaxil kaxil force-pushed the use-correct-baseop branch from 5aaf154 to 07e3fa1 Compare July 17, 2025 16:56
@kaxil
Copy link
Member Author

kaxil commented Jul 17, 2025

Test added for ShortcircuitOperator too: 07e3fa1

Was able to reproduce, same underlying issue.

@kaxil kaxil merged commit fc5410c into apache:main Jul 17, 2025
102 checks passed
@kaxil kaxil deleted the use-correct-baseop branch July 17, 2025 18:33
kaxil added a commit to astronomer/airflow that referenced this pull request Jul 17, 2025
In Airflow 3.x, sensors inherit from airflow.sdk.BaseOperator instead of airflow.models.BaseOperator. The _ensure_tasks function in SkipMixin was only checking for the models BaseOperator, causing sensors to be filtered out and not properly skipped by branching operators like BranchSQLOperator. Updated the import logic to use the correct SDK BaseOperator for Airflow 3.x and added comprehensive tests to verify sensors are properly included in branching skip operations. Fixes apache#52219 (cherry picked from commit fc5410c)
kaxil added a commit to astronomer/airflow that referenced this pull request Jul 17, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers full tests needed We need to run full set of tests for this PR to merge provider:standard

8 participants