Skip to content

Conversation

@amoghrajesh
Copy link
Contributor

@amoghrajesh amoghrajesh commented Jun 10, 2025

closes: #50686

Problem

The expected behaviour from a downstream task pulling xcoms off of a mapped task would be to pull ALL the xcoms for that mapped task (all map indexes). Right now the problem we are suffering from is the way we treat defaults in our current logic.

Our logic is broken in a way that if map indexes arent specified, we function in a way that we pull from the corresponding map index of upstream task. This is because of the way we handle defaults as of now:

https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/execution_time/task_runner.py#L349-L360

Scenarios that show how things work:

image

This is wrong. We should be pulling from all map indexes of the upstream task if not specified.

Approach

Leveraging: #50117 here. If you pass in the start, end, step as None -- the API is designed to return ALL the xcoms for the task, exactly what we want.

So i am currently changing the logic to "if map_indexes aren't provided, fetch all the map_indexes available".

Testing

Test 1: Checking with DAG provided by @TJaniF

DAG:

from airflow.sdk import dag, task, chain @dag def custom_xcom_backend_test(): @task def provide_map(): return [1, 2, 3, 4] @task def push_xcom(map_num): print(map_num) return [map_num]*2 _push_xcom = push_xcom.expand(map_num=provide_map()) @task def pull_xcom_explicit(**context): my_xcom_unmapped = context["ti"].xcom_pull( dag_id="custom_xcom_backend_test", task_ids=["provide_map"], key="return_value", ) print("XCom from unmapped task:", my_xcom_unmapped) my_xcom = context["ti"].xcom_pull( dag_id="custom_xcom_backend_test", task_ids=["push_xcom"], key="return_value", ) print("XCom from mapped task:", my_xcom) chain( _push_xcom, pull_xcom_explicit(), ) custom_xcom_backend_test() 

image

Test 2: DAG reported by issue reporter

DAG:

from datetime import datetime from textwrap import dedent from airflow import DAG from airflow.decorators import task from airflow.providers.standard.operators.bash import BashOperator @task def build_something(): return "Show this" @task def build_multiple(): return [{"message": "Also this"}, {"message": "Also that"}] with DAG( "mve-xcom-jinja", default_args={ "depends_on_past": False, }, schedule=None, start_date=datetime(2025, 1, 1), catchup=False, ) as dag: something = build_something() multiple = something >> build_multiple() _ = multiple >> BashOperator.partial( task_id="show", bash_command=dedent( """ echo "{{ task_instance.xcom_pull(task_ids='build_something') }}" echo "$message" """ ), ).expand(env=multiple) 

image

Test 3: Created a comprehensive DAG to cover all use cases

from airflow.sdk import dag, task, chain @dag def xcom_pull_test(): """Focused test DAG for key xcom_pull scenarios.""" @task def provide_map(): return [1, 2, 3, 4] @task def unmapped_task(): return "unmapped_value" @task def another_unmapped_task(): return {"key": "another_value"} @task def mapped_task(map_num): return [map_num] * 2 # [1,1], [2,2], [3,3], [4,4] # Create mapped task _mapped_task = mapped_task.expand(map_num=provide_map()) @task def test_xcom_pull(**context): """Test key xcom_pull scenarios.""" ti = context["ti"] print("=== XCOM PULL TESTS ===") # 1. Single task ID as string vs list print("\n1. SINGLE TASK ID:") result1 = ti.xcom_pull(task_ids="unmapped_task") result2 = ti.xcom_pull(task_ids=["unmapped_task"]) print(f" String: {result1}") print(f" List: {result2}") # 2. Multiple task IDs (unmapped) print("\n2. MULTIPLE UNMAPPED TASKS:") result = ti.xcom_pull(task_ids=["unmapped_task", "another_unmapped_task"]) print(f" Result: {result}") # 3. Mapped task WITHOUT map_indexes (NEW: should get all) print("\n3. MAPPED TASK - NO MAP_INDEXES:") result = ti.xcom_pull(task_ids="mapped_task") print(f" All values: {result}") # 4. Mapped task WITH specific map_indexes print("\n4. MAPPED TASK - WITH MAP_INDEXES:") result1 = ti.xcom_pull(task_ids="mapped_task", map_indexes=1) result2 = ti.xcom_pull(task_ids="mapped_task", map_indexes=[0, 2]) print(f" Single index (1): {result1}") print(f" Multiple indexes [0,2]: {result2}") # 5. Multiple task IDs with mapped task (mixed) print("\n5. MULTIPLE TASKS (MIXED):") result = ti.xcom_pull(task_ids=["unmapped_task", "mapped_task"]) print(f" Mixed: {result}") # 6. Multiple mapped tasks print("\n6. MULTIPLE MAPPED TASKS:") result = ti.xcom_pull(task_ids=["mapped_task"]) print(f" Result: {result}") chain( [provide_map(), unmapped_task(), another_unmapped_task(), _mapped_task], test_xcom_pull(), ) xcom_pull_test() 

This dag handles all possible scenarios:

  1. Single task ID as string: xcom value must be returned
  2. Single task ID in a list: xcom must be in a list
  3. Multiple unmapped task ids: result must be in a list
  4. Mapped task without map indexes: fetch all the xcoms for all map indexes
  5. Mapped task but with map indexes provided: fetch only for those map indexes
  6. Multiple task ids with mapped tasks: get the mixed result in a list
  7. Multiple mapped tasks: get the combination.

image

Logs:

[2025-06-11, 12:59:23] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager" [2025-06-11, 12:59:2[3](http://localhost:28080/dags/xcom_pull_test/runs/manual__2025-06-11T07:29:20.014315+00:00/tasks/test_xcom_pull?try_number=1#3)] INFO - Filling up the DagBag from /files/dags/comprehensive-xcom-test.py: source="airflow.models.dagbag.DagBag" [2025-06-11, 12:59:23] INFO - === XCOM PULL TESTS ===: chan="stdout": source="task" [2025-06-11, 12:59:23] INFO - : chan="stdout": source="task" [2025-06-11, 12:59:23] INFO - 1. SINGLE TASK ID:: chan="stdout": source="task" [2025-06-11, 12:59:23] INFO - String: unmapped_value: chan="stdout": source="task" [2025-06-11, 12:59:23] INFO - List: ['unmapped_value']: chan="stdout": source="task" [2025-06-11, 12:59:23] INFO - : chan="stdout": source="task" [2025-06-11, 12:59:23] INFO - 2. MULTIPLE UNMAPPED TASKS:: chan="stdout": source="task" [2025-06-11, 12:59:23] INFO - Result: ['unmapped_value', {'key': 'another_value'}]: chan="stdout": source="task" [2025-06-11, 12:59:23] INFO - : chan="stdout": source="task" [2025-06-11, 12:59:23] INFO - 3. MAPPED TASK - NO MAP_INDEXES:: chan="stdout": source="task" [2025-06-11, 12:59:23] INFO - All values: [[1, 1], [2, 2], [3, 3], [[4](http://localhost:28080/dags/xcom_pull_test/runs/manual__2025-06-11T07:29:20.014315+00:00/tasks/test_xcom_pull?try_number=1#4), 4]]: chan="stdout": source="task" [2025-06-11, 12:59:23] INFO - : chan="stdout": source="task" [202[5](http://localhost:28080/dags/xcom_pull_test/runs/manual__2025-06-11T07:29:20.014315+00:00/tasks/test_xcom_pull?try_number=1#5)-06-11, 12:59:23] INFO - 4. MAPPED TASK - WITH MAP_INDEXES:: chan="stdout": source="task" [2025-0[6](http://localhost:28080/dags/xcom_pull_test/runs/manual__2025-06-11T07:29:20.014315+00:00/tasks/test_xcom_pull?try_number=1#6)-11, 12:59:23] INFO - Single index (1): [2, 2]: chan="stdout": source="task" [2025-06-11, 12:59:23] INFO - Multiple indexes [0,2]: [[1, 1], [3, 3]]: chan="stdout": source="task" [2025-06-11, 12:59:23] INFO - : chan="stdout": source="task" [2025-06-11, 12:5[9](http://localhost:28080/dags/xcom_pull_test/runs/manual__2025-06-11T07:29:20.014315+00:00/tasks/test_xcom_pull?try_number=1#9):23] INFO - 5. MULTIPLE TASKS (MIXED):: chan="stdout": source="task" [2025-06-11, 12:59:23] INFO - Mixed: ['unmapped_value', [1, 1], [2, 2], [3, 3], [4, 4]]: chan="stdout": source="task" [2025-06-[11](http://localhost:28080/dags/xcom_pull_test/runs/manual__2025-06-11T07:29:20.014315+00:00/tasks/test_xcom_pull?try_number=1#11), 12:59:23] INFO - : chan="stdout": source="task" [2025-06-11, 12:59:23] INFO - Done. Returned value was: None: source="airflow.task.operators.airflow.providers.standard.decorators.python._PythonDecoratedOperator" [2025-06-11, [12](http://localhost:28080/dags/xcom_pull_test/runs/manual__2025-06-11T07:29:20.014315+00:00/tasks/test_xcom_pull?try_number=1#12):59:23] INFO - 6. MULTIPLE MAPPED TASKS:: chan="stdout": source="task" [2025-06-11, 12:59:23] INFO - Result: [[1, 1], [2, 2], [3, 3], [4, 4]]: chan="stdout": source="task" 

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@amoghrajesh amoghrajesh requested a review from uranusjr June 10, 2025 11:03
@amoghrajesh amoghrajesh self-assigned this Jun 11, 2025
@amoghrajesh amoghrajesh changed the title Improve xcom_pull to reflect reality for mapped tasks Improve xcom_pull to cover different scenarios for mapped tasks Jun 11, 2025
@amoghrajesh amoghrajesh marked this pull request as ready for review June 11, 2025 07:30
@amoghrajesh amoghrajesh requested review from ashb and kaxil as code owners June 11, 2025 07:30
@amoghrajesh amoghrajesh requested a review from uranusjr June 11, 2025 08:16
amoghrajesh and others added 2 commits June 11, 2025 15:42
Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
@amoghrajesh amoghrajesh added this to the Airflow 3.1.0 milestone Jun 11, 2025
@amoghrajesh
Copy link
Contributor Author

Thanks for the review, merging it.

@amoghrajesh amoghrajesh merged commit 25f7fe3 into apache:main Jun 12, 2025
70 checks passed
@amoghrajesh amoghrajesh deleted the pull-xcom-mapped-task-from-jinja branch June 12, 2025 03:34
@boring-cyborg
Copy link

boring-cyborg bot commented Jun 12, 2025

Awesome work, congrats on your first merged pull request! You are invited to check our Issue Tracker for additional contributions.

@sdg9670f
Copy link

sdg9670f commented Jun 27, 2025

Why are the return values ​​of xcom.get_all and xcom.get_one different? When using the S3 XCOM backend, get_all gives S3 URLs as values, but get_one gives the actual values. Is it because of deserialize_value or LazyXComSequence?

@ashb
Copy link
Member

ashb commented Jul 1, 2025

@sdg9670f Historical compat. Most of the time you shouldn't be using methods on XCom directly anyway

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

5 participants