Set up a custom XCom backend using object storage

By default, Airflow uses the metadata database to store XComs, which works well for local development but has limited performance. For production environments that use XCom to pass data between tasks, Astronomer recommends using a custom XCom backend. Custom XCom backends allow you to configure where Airflow stores information that is passed between tasks using XComs.

The Object Storage XCom Backend available in the Common IO provider is the easiest way to store XComs in a remote object storage solution.

This tutorial will show you how to set up a custom XCom backend using object storage for AWS S3, GCP Cloud Storage or Azure Blob Storage.

To learn more about other options for setting custom XCom backends, see Strategies for custom XCom backends in Airflow.

While a custom XCom backend allows you to store virtually unlimited amounts of data as XComs, you will also need to scale other Airflow components to pass large amounts of data between tasks. For help running Airflow at scale, reach out to Astronomer.

Object storage is currently considered experimental and might be subject to breaking changes in future releases. For more information see AIP-58.

Time to complete

This tutorial takes approximately 45 minutes to complete.

Assumed knowledge

To get the most out of this tutorial, make sure you have an understanding of:

Prerequisites

  • The Astro CLI with an Astro project running Astro Runtime 11.5.0 or higher (Airflow 2.9.2 or higher). To set up a custom XCom backend with older versions of Airflow, see Custom XCom backends.
  • An account in either AWS, GCP, or Azure with permissions to create and configure an object storage container.

Step 1: Set up your object storage container

First, you need to set up the object storage container in your cloud provider where Airflow will store the XComs.

Aws
  1. Log into your AWS account and create a new S3 bucket. Ensure that public access to the bucket is blocked. You do not need to enable bucket versioning.

  2. Create a new IAM policy for Airflow to access your bucket. You can use the JSON configuration below or use the AWS GUI to replicate what you see in the screenshot. Replace <your-bucket-name> with the name of your S3 bucket.

    1{
    2 "Version": "2012-10-17",
    3 "Statement": [
    4 {
    5 "Sid": "VisualEditor0",
    6 "Effect": "Allow",
    7 "Action": [
    8 "s3:ReplicateObject",
    9 "s3:PutObject",
    10 "s3:GetObject",
    11 "s3:RestoreObject",
    12 "s3:ListBucket",
    13 "s3:DeleteObject"
    14 ],
    15 "Resource": [
    16 "arn:aws:s3:::<your-bucket-name>/*",
    17 "arn:aws:s3:::<your-bucket-name>"
    18 ]
    19 }
    20 ]
    21}
  3. Save your policy under the name AirflowXComBackendAWSS3.

  4. Create an IAM user called airflow-xcom with the AWS credential type Access key - Programmatic access and attach the AirflowXComBackendAWSS3 policy to this user.

  5. Create an access key of the type Third-party-service for your airflow-xcom user. Make sure to save the Access Key ID and the Secret Access Key in a secure location to use in Step 3.

For other ways to set up a connection between Airflow and AWS, see the Amazon provider documentation.

Gcp
  1. Log into your Google Cloud account and create a new project.

  2. Create a new bucket in your project with Uniform Access Control. Enforce public access prevention.

  3. Create a custom IAM role called AirflowXComBackendGCS for Airflow to access your bucket. Assign 6 permissions:

    • storage.buckets.list
    • storage.objects.create
    • storage.objects.delete
    • storage.objects.get
    • storage.objects.list
    • storage.objects.update
  4. Create a new service account called airflow-xcom and grant it access to your project by granting it the AirflowXComBackendGCS role.

  5. Create a new key for your airflow-xcom service account and make sure to download the credentials in JSON format.

For other ways to set up a connection between Airflow and Google Cloud, see the Google provider documentation.

Azure
  1. Log into your Azure account and create a storage account. Ensure that public access to the bucket is blocked.

  2. In the storage account, create a new container.

  3. Create a shared access token for your container.

    In the Permissions dropdown menu, enable the following permissions:

    • Read
    • Add
    • Create
    • Write
    • Delete
    • List

    Set the duration the token will be valid and set Allowed Protocols to HTTPS only. Provide the IP address of your Airflow instance. If you are running Airflow locally with the Astro CLI, use the IP address of your computer.

  4. Go to your Storage account and navigate to Access keys. Copy the Key and Connection string values and save them in a secure location to use in step 3.

For other ways to set up a connection between Airflow and Azure Blob Storage, see the Microsoft Azure provider documentation.

Step 2: Install the required provider packages

To use the Object Storage XCom Backend, you need to install the Common IO provider package and the provider package for your object storage container provider.

Aws

Add the Common IO and Amazon provider packages to your requirements.txt file. Note that you need to install the s3fs extra to use the Amazon provider package with the object storage feature.

apache-airflow-providers-common-io==1.3.2
apache-airflow-providers-amazon[s3fs]==8.19.0
Gcp

Add the Common IO and Google provider packages to your requirements.txt file.

apache-airflow-providers-common-io==1.3.2
apache-airflow-providers-google==10.17.0
Azure

Add the Common IO and Microsoft Azure provider packages to your requirements.txt file.

apache-airflow-providers-common-io==1.3.2
apache-airflow-providers-microsoft-azure==9.0.0

Step 3: Set up your Airflow connection

An Airflow connection is necessary to connect Airflow with your object storage container provider. In this tutorial, you’ll use the Airflow UI to configure your connection.

  1. Start your Astro project by running:

    $astro dev start
Aws
  1. In the Airflow UI navigate to Admin -> Connections and click on Create. Fill in the following fields:

    • Conn Id: my_aws_conn
    • Conn Type: Amazon Web Services
    • AWS Access Key ID: <your access key>
    • AWS Secret Access Key: <your secret key>

    To learn more about configuration options for the AWS connection, see the Amazon provider documentation.

Gcp
  1. In the Airflow UI navigate to Admin -> Connections and click on Create. Fill in the following fields:

    • Conn Id: my_gcp_conn
    • Conn Type: Google Cloud
    • Project Id: <your project id>
    • Keyfile JSON: <the contents from your keyfile JSON that you downloaded in step 1>

    To learn more about configuration options for the Google connection, see the Google provider documentation.

Azure
  1. In the Airflow UI navigate to Admin -> Connections and click on Create. Fill in the following fields:

    • Conn Id: my_azure_conn
    • Conn Type: Microsoft Azure Blob Storage
    • Account URL (Active Directory Auth): <the URL of your Azure Storage account>
    • Blob Storage Key (optional): <access key to your Azure Storage account>
    • Blob Storage Connection String (optional): <connection string to your Azure Storage account>

    To learn more about configuration options for the Azure connection, see the Microsoft Azure provider documentation.

Step 4: Configure your custom XCom backend

Configuring a custom XCom backend with object storage can be done by setting environment variables in your Astro project.

If you are setting up a custom XCom backend for an Astro deployment, you have to set the following environment variables for your deployment. See Environment variables for instructions.

  1. Add the AIRFLOW__CORE__XCOM_BACKEND environment variable to your .env file. It defines the class to use for the custom XCom backend implementation.

    AIRFLOW__CORE__XCOM_BACKEND="airflow.providers.common.io.xcom.backend.XComObjectStorageBackend"
Aws
  1. Add the AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_PATH environment variable to your .env file to define the path in your S3 bucket where the XComs will be stored in the form of <connection id>@<bucket name>/<path>. Use the connection id of the Airflow connection you defined in step 2 and replace <my-bucket> with your S3 bucket name.

    AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_PATH="s3://my_aws_conn@<my-bucket>/xcom"
Gcp
  1. Add the AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_PATH environment variable to your .env file to define the path in your GCS bucket where the XComs will be stored in the form of <connection id>@<bucket name>/<path>. Use the connection id of the Airflow connection you defined in step 2 and replace <my-bucket> with your GCS bucket name.

    AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_PATH="gs://my_gcp_conn@<my-bucket>/xcom"
Azure
  1. Add the AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_PATH environment variable to your .env file to define the path in your Azure blob container where the XComs will be stored in the form of <connection id>@<blob name>/<path>. Use the connection id of the Airflow connection you defined in step 2 and replace <my-blob> with your Azure blob container name.

    AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_PATH="abfs://my_azure_conn@<my-blob>/xcom"
  1. Add the AIRFLOW__COMMON.IO__XCOM_OBJECTSTORAGE_THRESHOLD environment variable to your .env file to determine when Airflow will store XComs in the object storage vs the metadata database. The default value is -1 which will store all XComs in the metadata database. Set the value to 0 to store all XComs in the object storage. Any positive value means any XCom with a byte size greater than the threshold will be stored in the object storage and any XCom with a size equal to or less than the threshold will be stored in the metadata database. For this tutorial we will set the threshold to 1000 bytes, which means any XCom larger than 1KB will be stored in the object storage.

    AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_THRESHOLD="1000"
  2. Optional. Define the AIRFLOW__COMMON_IO__XCOM_OBJECTSTORE_COMPRESSION environment variable to compress the XComs stored in the object storage with fsspec supported compression algorithms like zip. The default value is None.

    AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_COMPRESSION="zip"
  3. Restart your Airflow project by running:

    $astro dev restart

Step 5: Test your custom XCom backend

We will use a simple DAG to test your custom XCom backend.

  1. Create a new file in the dags directory of your Astro project called custom_xcom_backend_test.py and add the following code:

    1"""
    2## Toy DAG to show size dependant custom XCom serialization
    3
    4This DAG pushes two dicts to XCom, one below, one above 1000 bytes.
    5It then pulls them and prints their sizes.
    6"""
    7
    8from airflow.decorators import dag, task
    9from airflow.models.baseoperator import chain
    10
    11
    12@dag(
    13 start_date=None,
    14 schedule=None,
    15 catchup=False,
    16 doc_md=__doc__,
    17 tags=["xcom", "2-9", "toy"],
    18)
    19def custom_xcom_backend_test():
    20 @task
    21 def push_objects(**context) -> None:
    22 """Create a small and a big dictionary, print their sizes and push them to XCom."""
    23
    24 small_obj = {"a": 23}
    25 big_obj = {f"key{i}": "x" * 100 for i in range(100)}
    26 print(f"Size of small object: {small_obj.__sizeof__()}")
    27 print(f"Size of big object: {big_obj.__sizeof__()}")
    28
    29 context["ti"].xcom_push(key="small_obj", value=small_obj)
    30 context["ti"].xcom_push(key="big_obj", value=big_obj)
    31
    32 @task
    33 def pull_objects(**context) -> None:
    34 """Pull the small and big dictionaries from XCom and print their sizes."""
    35
    36 small_obj = context["ti"].xcom_pull(task_ids="push_objects", key="small_obj")
    37 big_obj = context["ti"].xcom_pull(task_ids="push_objects", key="big_obj")
    38
    39 print(f"Size of small object: {small_obj.__sizeof__()}")
    40 print(f"Size of big object: {big_obj.__sizeof__()}")
    41
    42 chain(push_objects(), pull_objects())
    43
    44
    45custom_xcom_backend_test()
  2. Manually trigger the custom_xcom_backend_test DAG in the Airflow UI and navigate to the XCom tab of the push_objects task. You should see that the small_obj XCom shows its value, meaning it was stored in the metadata database, since it is smaller than 1KB. The big_dict XCom shows shows the path to the object in the object storage containing the serialized value of the XCom.

    XCom tab of the push_objects task showing two key-value pairs showing the "big_obj" being serialized to the custom XCom backend and the "small_obj": a dictionary containing'a': 23, which was stored in the metadata database.

Conclusion

Congratulations, you learned how to set up a custom XCom backend using object storage! Learn more about other options to set up custom XCom backends in the Strategies for custom XCom backends in Airflow guide.