Skip to content

Conversation

@amoghrajesh
Copy link
Contributor

closes: #53432

Problem

The BaseXCom.get_all() method was directly calling deserialize() from the serialization module instead of using the overridable cls.deserialize_value() method. This broke custom XCom backends that rely on overriding deserialize_value() to implement custom deserialization logic.

Example of broken behavior:

class CustomXCom(BaseXCom): @classmethod def deserialize_value(cls, result): # Custom logic that was being bypassed return f"custom_prefix:{super().deserialize_value(result)}" # get_one() worked correctly - called CustomXCom.deserialize_value() # get_all() was broken - bypassed CustomXCom.deserialize_value() 

Root Cause

The issue comes from inconsistent deserialization patterns:

  • get_one() correctly calls cls.deserialize_value(msg) where msg is an XComResult with a value attribute
  • get_all() was calling deserialize(msg.root) directly, where msg.root is a list[JsonValue]

The deserialize_value() method expects an object with a value attribute, but the sequence slice returns a flat list of serialized values.

  • Applied the same wrapper pattern already established in lazy_sequence.py
  • Its a lightweight wrapper that exposes _XComValueWrapper that provides .value as needed

Testing

DAG:

from airflow import DAG from airflow.providers.standard.operators.python import PythonOperator def push_to_xcom(**kwargs): value = "Hello, XCom!" return value def push_to_xcom2(**kwargs): value = "Hello, XCom2!" return value def pull_from_xcom(**kwargs): ti = kwargs['ti'] xcom_value = ti.xcom_pull(task_ids=["push_xcom_task", "push_xcom_task2"]) print(f"Retrieved XCom Value: {xcom_value}") with DAG( 'xcom_example', schedule=None, catchup=False, ) as dag: push_xcom_task = PythonOperator( task_id='push_xcom_task', python_callable=push_to_xcom, ) push_xcom_task2 = PythonOperator( task_id='push_xcom_task2', python_callable=push_to_xcom2, ) pull_xcom_task = PythonOperator( task_id='pull_xcom_task', python_callable=pull_from_xcom, ) push_xcom_task >> push_xcom_task2 >> pull_xcom_task 

Write a custom xcom backend:

from airflow.sdk.bases.xcom import BaseXCom class MyXCom(BaseXCom): @classmethod def deserialize_value(self, value): return {"from": "custom xcom"} 

Run breeze with:

 export PYTHONPATH="/files/plugins:$PYTHONPATH" export AIRFLOW__CORE__XCOM_BACKEND="myxcom.MyXCom" 

Validate if the right one got loaded:

>>> from airflow.sdk.execution_time.xcom import XCom >>> >>> XCom <class 'myxcom.MyXCom'> >>> XCom.__name__ 'MyXCom' 

Run the dag:

Push task
image

Pull task
image


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@amoghrajesh amoghrajesh requested review from ashb and kaxil as code owners July 28, 2025 12:57
@amoghrajesh amoghrajesh self-assigned this Jul 28, 2025
@amoghrajesh amoghrajesh changed the title Fix custom xcom backend serialize when BaseXCom.get_all is used Fix custom xcom backend deserialize when BaseXCom.get_all is used Jul 28, 2025
@amoghrajesh amoghrajesh added the backport-to-v3-1-test Mark PR with this label to backport to v3-1-test branch label Jul 29, 2025
@amoghrajesh amoghrajesh added this to the Airflow 3.0.4 milestone Jul 29, 2025
@amoghrajesh amoghrajesh merged commit a8c4ba3 into apache:main Jul 29, 2025
77 checks passed
@amoghrajesh amoghrajesh deleted the xcom-rightly-deserialize branch July 29, 2025 09:34
@github-actions
Copy link

Backport failed to create: v3-0-test. View the failure log Run details

Status Branch Result
v3-0-test Commit Link

You can attempt to backport this manually by running:

cherry_picker a8c4ba3 v3-0-test

This should apply the commit to the v3-0-test branch and leave the commit in conflict state marking
the files that need manual conflict resolution.

After you have resolved the conflicts, you can continue the backport process by running:

cherry_picker --continue
RoyLee1224 pushed a commit to RoyLee1224/airflow that referenced this pull request Jul 31, 2025
ferruzzi pushed a commit to aws-mwaa/upstream-to-airflow that referenced this pull request Aug 7, 2025
amoghrajesh added a commit to astronomer/airflow that referenced this pull request Aug 11, 2025
fweilun pushed a commit to fweilun/airflow that referenced this pull request Aug 11, 2025
amoghrajesh added a commit to astronomer/airflow that referenced this pull request Aug 11, 2025
amoghrajesh added a commit to astronomer/airflow that referenced this pull request Aug 11, 2025
kaxil pushed a commit that referenced this pull request Aug 11, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:task-sdk backport-to-v3-1-test Mark PR with this label to backport to v3-1-test branch

3 participants