Relational database columns have strict size limits (e.g., BLOB or TEXT limits). Attempting to push an asset larger than the database column capacity will hard-fail the task with serialization or database write errors. 3. The TaskFlow API: Modernizing XComs

Introduced in Airflow 2.0, the TaskFlow API abstracts the backend plumbing. Returning a value from a @task decorated function automatically creates an XCom, and passing that function's output as an argument to another task automatically performs an xcom_pull .

import json import uuid from airflow.models.xcom import BaseXCom from airflow.providers.amazon.aws.hooks.s3 import S3Hook class S3XComBackend(BaseXCom): PREFIX = "s3://" BUCKET_NAME = "my-company-airflow-xcom-bucket" @staticmethod def serialize_value(value, **kwargs): # We only want to intercept complex data; plain strings/dicts can use default if needed, # but for safety, we push all substantial data to S3. s3_hook = S3Hook(aws_conn_id="aws_default") key = f"xcom/uuid.uuid4().json" # Serialize your data to string/bytes string_data = json.dumps(value) # Upload to S3 s3_hook.load_string( string_data, key=key, bucket_name=S3XComBackend.BUCKET_NAME, replace=True ) # This return value is what gets written to the Airflow Metadata DB return BaseXCom.serialize_value(f"S3XComBackend.PREFIXS3XComBackend.BUCKET_NAME/key") @staticmethod def deserialize_value(result): # Extract the DB stored string stored_uri = BaseXCom.deserialize_value(result) # Check if it points to our S3 backend if isinstance(stored_uri, str) and stored_uri.startswith(S3XComBackend.PREFIX): s3_hook = S3Hook(aws_conn_id="aws_default") # Parse the bucket and key out of the URI path = stored_uri.replace(S3XComBackend.PREFIX, "") bucket, key = path.split("/", 1) # Download file file_content = s3_hook.read_key(key, bucket_name=bucket) return json.loads(file_content) return stored_uri Use code with caution. Step 2: Configure Airflow to Use Your Backend

# Pulls a specific key from a specific task count = ti.xcom_pull(task_ids='process_data', key='record_count')