Introducing Apache Airflow® 2.9

  • Kenten Danas

The Apache Airflow® project continues to move forward at a fast pace, matching innovation in the data ecosystem. Airflow 2.9 was released this week, the first minor release of the year, and the fourth in only the past 12 months. This release contains more than 35 exciting new features, over 70 improvements, and over 30 bug fixes.

Airflow 2.9 builds on the trends and requests in the broader community, including enhancements to data-aware scheduling and dynamic task mapping. According to the 2023 Airflow Survey, these features are each leveraged by over 40% of Airflow users, and are key for common use cases like ETL and MLOps. Not only does 2.9 significantly improve the dynamism and flexibility for these use cases, it also introduces improvements to the UI, additional API endpoints, and so much more.

This blog post will walk you through everything you need to know about this release, so you don’t miss out on any of the exciting additions and changes.

Dataset Enhancements in Airflow 2.9: Advanced Scheduling Options, New Endpoints, and Improved UI

Datasets and data-aware scheduling were originally released in Airflow 2.4, and provide a way for DAGs that access the same data to have explicit, visible relationships, and be scheduled based on updates to these datasets. Using datasets allows you to create smaller DAGs instead of large monolithic ones and allows different teams to maintain their own DAGs, even if data is shared across them, while gaining increased visibility into DAG dependencies.

In the 2023 Airflow survey, almost 50% of users indicated they have adopted dataset functionality. Airflow 2.9 brings the biggest updates yet to this important feature, including some that were highly requested by the Airflow community.

Advanced Scheduling with Conditional Expressions

First, there are more advanced options for scheduling DAGs on datasets with conditional expressions. Previously, you could provide a list of datasets for your DAG’s schedule, but that list could only be evaluated as an AND expression. This meant that your DAG would only run after every dataset had been updated once, which didn’t suit every use case.

For example, if you have a DAG that processes data which arrives from several different sources (e.g. files that are dropped in different S3 buckets by different DAGs), you might want that DAG to run every time a file lands, rather than waiting for a file from each location. This requires OR logic in the schedule, which was not previously available.

Now, you can use AND and OR operators to specify complex dataset dependencies for your DAG’s schedule. For example, this DAG will run whenever one dataset in each category is updated.

@dag(
    start_date=datetime(2024, 3, 1),
    schedule=(
        (Dataset("categoryA_dataset1") | Dataset("categoryA_dataset2"))
        & (Dataset("categoryB_dataset3") | Dataset("categoryB_dataset4"))
    ),  # Use () instead of [] to be able to use conditional dataset scheduling!
)

Dataset and Time Combination for Flexible Scheduling

Additionally, you can now schedule DAGs using a combination of datasets and time, which is useful when you need your DAG to run at a minimum of periodic intervals and also when one or more datasets are updated. To implement this type of schedule you use the new DatasetOrTimeSchedule timetable and pass it a cron expression and a dataset expression.

@dag(
    start_date=datetime(2024, 3, 1),
    schedule=DatasetOrTimeSchedule(
        timetable=CronTriggerTimetable("*/2 * * * *", timezone="UTC"),
        datasets=(Dataset("dataset3") | Dataset("dataset4")),
    ),  # Use () instead of [] to be able to use conditional dataset scheduling!
)

New Endpoints for Enhanced Dataset Event Management

Another update in this release that makes datasets even more functional is the addition of two new endpoints for dataset event management:

  • Create dataset events externally: This new endpoint allows you to mark a dataset as updated without a producing task completing. The update might come from a DAG in another Airflow deployment (as a way of implementing cross-deployment DAG dependencies), or from a program external to Airflow. This endpoint has also been implemented as a button in the Airflow UI to support manual updates.
  • Clear dataset events from the queue: This new endpoint can be used to reset a dataset update. This is useful for scenarios where a DAG has multiple upstream datasets running at different times, and you may need to recover from getting out of sync due to a dag or task failure.

These new endpoints address common challenges with using datasets, and will make them easier to implement for complex use cases at scale.

Revamped UI for an Intuitive Dataset Experience

Moreover, the Airflow 2.9 UI offers an improved dataset experience, including showing datasets in the Graph View:

Dataset events above the task/run details in the Grid View:

And a Datasets View filterable by DAG ID:

Object Storage Support for Datasets

As if all this wasn’t enough to be excited about, datasets are also now supported by object storage (more on updates to this feature below), which means you can now define an object storage path based on a dataset.

bucket_path = ObjectStoragePath(Dataset("s3://my-bucket/"), conn_id=my_aws_conn)

Overall, this release is a huge step forward for datasets and data-aware scheduling, and investment in the evolution of this important feature will continue in future releases.

Dynamic Task Mapping Updates: Custom Task Instance Names

Dynamic task mapping is another commonly used feature that got a big update in Airflow 2.9. This feature was one of the biggest game changers to come out of an Airflow minor release - it opened the door for first class dynamic workflows with very little code required. In the 2023 Airflow survey, more than 40% of users said they leverage this feature.

While dynamic task mapping was widely adopted almost immediately after its release, frequent users know that one challenge was the inability to name mapped task instances. The default Map Index: 5 becomes unwieldy when you have many task instances and are trying to figure out which file broke in the one task instance that failed.

Now, you can easily solve this problem by adding custom names to your mapped task instances with a simple map_index_template.

    @task(map_index_template="{{ my_mapping_variable }}")
    def map_fruits(fruit_info: dict):
        from airflow.operators.python import get_current_context

        fruit_name = fruit_info["name"]
        context = get_current_context()
        context["my_mapping_variable"] = fruit_name
        print(f"{fruit_name} sugar content: {fruit_info['nutritions']['sugar']}")

    map_fruits.expand(fruit_info=get_fruits())

This feature will be transformative for teams using dynamic task mapping for use cases like hyperparameter tuning models, where being able to easily view which mapped task instance equates to which input parameter is critical.

Object Storage Updates: Custom XCom Backends

One of the biggest updates in Airflow 2.8, the last minor release, was the implementation of AIP 58, the Airflow ObjectStore. With this feature, Airflow offers an abstraction layer over common object stores like LocalFilesystem, S3, GCS, and Azure Blob Storage. What this means in practice is you can interact with files stored in any of these systems with the same code, rather than relying on multiple specific operators. This is especially useful when transferring files, so you can avoid using many different XtoYTransferOperator type-operators, which can be difficult to maintain (especially if you have to add your own custom ones).

This feature is still experimental, but it has already proven useful to many Airflow users. Now, in Airflow 2.9, you can use object storage as a backend for XComs, enabling scalable and efficient inter-task communication. This means you can use object storage services like S3, GCS, or Azure Blob Storage for an XCom backend rather than traditional database-backed XCom storage.

AIRFLOW__CORE__XCOM_BACKEND="airflow.providers.common.io.xcom.backend.XComObjectStoreBackend"
AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_PATH="s3://my_aws_conn@my-bucket/xcom"
AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_THRESHOLD="1000"
AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_COMPRESSION="zip"

User Interface Improvements

Nearly every Airflow release brings great UI updates that improve the experience of working with Airflow, and Airflow 2.9 is no different. In addition to the dataset UI updates mentioned above, there are a few great quality of life improvements in this release.

There is a new task duration page that can help you understand how long your task takes to run on average.

And task logs now have collapsible sections, making it easier to find what you’re looking for.

And, excitingly, there is a new button to clear only failed tasks.

And on the backend side, the code for multiple views was migrated from Bootstrap3 and Flask to React, as part of the ongoing AIP 38, which aims to modernize the UI and open the door for additional improvements in future releases.

Additional Noteworthy Features and Updates

There are lots more notable updates in 2.9 to be aware of, including:

  • You can now configure DAGs to pause after failing n times sequentially using the max_consecutive_failed_dag_runs parameter. Have you ever come back to the office on Monday and realized your hourly DAG ran 60 times unnecessarily after starting to fail Friday evening? This parameter is for you (though note that it is experimental).
  • Python 3.12 is now supported.
  • Audit logging has been added to all REST API CRUD operations.
  • Most Listeners are considered stable and can be used in production.
  • There is a new @task.bash decorator for creating and running Bash commands with TaskFlow.
  • There is a new callback for skipped tasks, implemented with on_skipped_callback.

And even these updates barely scratch the surface. Regardless of how you use Airflow, there’s something for you in 2.9.

Get Started with Airflow 2.9

Airflow 2.9 has way more features and improvements than we can cover in a single blog post. To learn more, check out the full release notes, and join us for a webinar on April 11th that will cover the new release in more detail.

To try Airflow 2.9 and see all the great features for yourself, get started with a Free 14-Day Trial of Astro. We offer same-day support for all new Airflow releases, so you don’t have to wait to take advantage of the latest and greatest features.

Ready to Get Started?

See how your team can fuel its data workflows with more power and less complexity than ever before.

Start Free Trial →

Which plan works best for your team?

Learn about pricing →

What can Astronomer do for your organization?

Talk to an expert →