- Notifications
You must be signed in to change notification settings - Fork 16.2k
Description
Apache Airflow version
3.0.2
If "Other Airflow 2 version" selected, which one?
2.10.2
What happened?
Summary
The behavior of TaskInstance.xcom_pull() when task_ids=None has changed between Airflow 2.10.2 and 3.0.2, leading to potentially unexpected results and backward compatibility issues.
airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py
Lines 302 to 303 in 79838ba
:param task_ids: Only XComs from tasks with matching ids will be pulled. Pass *None* to remove the filter.
airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py
Lines 302 to 303 in 79838ba
| :param task_ids: Only XComs from tasks with matching ids will be | |
| pulled. Pass *None* to remove the filter. |
Behavior in Airflow 2.10.2
In 2.10.2, calling:
ti.xcom_pull(key="some_key")would internally delegate to _xcom_pull() and not apply any task_ids filter—effectively pulling the XCom with the given key from any available task, often returning the latest match. This was useful for DAG authors who just wanted to pull a known key from anywhere upstream, especially when only one upstream task returned the value.
airflow/airflow/models/taskinstance.py
Lines 610 to 617 in 35087d7
query = XCom.get_many( key=key, run_id=ti.run_id, dag_ids=dag_id, task_ids=task_ids, map_indexes=map_indexes, include_prior_dates=include_prior_dates, session=session,
airflow/airflow/models/taskinstance.py
Lines 610 to 617 in 35087d7
| query = XCom.get_many( | |
| key=key, | |
| run_id=ti.run_id, | |
| dag_ids=dag_id, | |
| task_ids=task_ids, | |
| map_indexes=map_indexes, | |
| include_prior_dates=include_prior_dates, | |
| session=session, |
Behavior in Airflow 3.0.2
In 3.0.2, the implementation changed, and now this call:
ti.xcom_pull(key="some_key") defaults to:
task_ids = [self.task_id] Which means it only pulls the XCom written by the current task itself, not from any upstream tasks. This is a significant change in default behavior and can silently break DAGs or plugins written for older versions.
airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py
Lines 340 to 342 in 79838ba
if task_ids is None: # default to the current task if not provided task_ids = [self.task_id]
airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py
Lines 340 to 342 in 79838ba
| if task_ids is None: | |
| # default to the current task if not provided | |
| task_ids = [self.task_id] |
Example Impact In 2.10.2:
# Pulls 'result' from any upstream task that pushed it ti.xcom_pull(key="result") In 3.0.2:
# Only pulls 'result' from the current task (likely returns None) ti.xcom_pull(key="result") # Will NOT find upstream result unless task_ids is specified What you think should happen instead?
The default behavior should remain consistent across versions to avoid unexpected regressions.
How to reproduce
It's straightforward to reproduce the issue. Define a DAG with two tasks where TaskA runs before TaskB (TaskA >> TaskB), and then:
Have TaskA push an XCom value with key="some_key" and value="some_value".
In TaskB, call xcom_pull("some_key") without specifying task_ids.
In Airflow 2.10.2, this correctly retrieves "some_value" from TaskA.
In Airflow 3.0.2, the same call returns None, since it defaults to pulling from TaskB itself.
Operating System
Linux
Versions of Apache Airflow Providers
No response
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct