- Notifications
You must be signed in to change notification settings - Fork 16.2k
Description
Apache Airflow version
3.0.2
If "Other Airflow 2 version" selected, which one?
No response
What happened?
In Airflow 3.x, when a SqlSensor is used as one of the downstream tasks of a BranchSQLOperator, it is not being skipped as expected based on the branch condition. Even when the sensor task is not selected by the branching logic, it still runs instead of being skipped.
This behavior breaks the expected semantics of branching, where only one downstream path should execute and others should be marked as skipped.
Airflow 2.X Log and Graph view
[2025-06-25, 05:14:37 UTC] {sql.py:1223} INFO - Query returns 0, type '<class 'int'>' [2025-06-25, 05:14:37 UTC] {skipmixin.py:233} INFO - Following branch ('task_2',) [2025-06-25, 05:14:37 UTC] {skipmixin.py:281} INFO - Skipping tasks [('task_1', -1)] Airflow 3.X Log and Graph view
[2025-06-25, 10:36:45] INFO - Query returns 0, type '<class 'int'>': source="airflow.task.operators.airflow.providers.common.sql.operators.sql.BranchSQLOperator" [2025-06-25, 10:36:45] INFO - Following branch {'task_2'}: source="airflow.task.operators.airflow.providers.common.sql.operators.sql.BranchSQLOperator" [2025-06-25, 10:36:45] INFO - Skipping tasks []: source="airflow.task.operators.airflow.providers.common.sql.operators.sql.BranchSQLOperator" If we add an EmptyOperator between BranchSQLOperator and Sensor task, the EmptyOperator and sensor tasks are skipped.
What you think should happen instead?
Only the task(s) that are selected by the BranchSQLOperator should run. All other downstream tasks, including sensors, should be skipped automatically. This was the observed and expected behavior in Airflow 2.x.
How to reproduce
Use the following minimal DAG code:
from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.common.sql.operators.sql import BranchSQLOperator from airflow.providers.common.sql.sensors.sql import SqlSensor from datetime import datetime CONN_ID = "postgres_conn" def task_2_func(): print("Branch: TASK - 2 – Executing this task.") with DAG( dag_id="example_branch_sql_with_python", start_date=datetime(2024, 1, 1), schedule=None, catchup=False, default_args={"owner": "airflow"}, ) as dag: branch_task = BranchSQLOperator( task_id="branch_sql", sql="SELECT CASE WHEN 1=1 THEN 0 ELSE 1 END", conn_id=CONN_ID, follow_task_ids_if_true=["task_1"], follow_task_ids_if_false=["task_2"], ) task_1 = SqlSensor( task_id="task_1", sql="SELECT CASE WHEN 1=1 THEN 1 ELSE 0 END", conn_id=CONN_ID, ) task_2 = PythonOperator( task_id="task_2", python_callable=task_2_func, ) branch_task >> [task_1, task_2] In this example, task_1 should be skipped since the SQL in BranchSQLOperator returns 0, selecting task_2. But in Airflow 3.x, both tasks are executed.
Operating System
Debian
Versions of Apache Airflow Providers
No response
Deployment
Astronomer
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct

